stream

package module
v0.0.0-...-7e090e3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2023 License: MIT Imports: 3 Imported by: 6

README

Stream

Build codecov

English / 中文

Stream is a stream processing library based on Go 1.18+ Generics. It supports parallel processing of data in the stream.

Features

  • Parallel: Parallel processing of data in the stream, keeping the original order of the elements in the stream
  • Pipeline: combine multiple operations to reduce element loops, short-circuiting earlier
  • Lazy Invocation: intermediate operations are lazy

Installation

Requires Go 1.18+ version installed

import "github.com/xyctruth/stream"

Getting Started

s := stream.NewSliceByOrdered([]string{"d", "a", "b", "c", "a"}).
    Filter(func(s string) bool { return s != "b" }).
    Map(func(s string) string { return "class_" + s }).
    Sort().
    Distinct().
    ToSlice()

Type Constraints

any accepts elements of any type, so you cannot use == != > < to compare elements, which will prevent you from using Sort(), Find()... functions, but you can use SortFunc(fn), FindFunc(fn)... instead

stream.NewSlice([]int{1, 2, 3, 7, 1})

comparable accepts type can use == != to compare elements, but still can't use > < to compare elements, so you can't use Sort(), Min()... functions, but you can use SortFunc(fn), MinFunc()... instead

stream.NewSliceByComparable([]int{1, 2, 3, 7, 1})

constraints.Ordered accepts types that can use == != > < to compare elements, so can use all functions

stream.NewSliceByOrdered([]int{1, 2, 3, 7, 1})

Type Conversion

Sometimes we need to use Map , Reduce to convert the type of slice elements, but unfortunately Golang currently does not support structure methods with additional type parameters, all type parameters must be declared in the structure. We work around this with a temporary workaround until Golang supports it.

// SliceMappingStream  Need to convert the type of slice elements.
// - E elements type
// - MapE map elements type
// - ReduceE reduce elements type
type SliceMappingStream[E any, MapE any, ReduceE any] struct {
    SliceStream[E]
}

s := stream.NewSliceByMapping[int, string, string]([]int{1, 2, 3, 4, 5}).
    Filter(func(v int) bool { return v >3 }).
    Map(func(v int) string { return "mapping_" + strconv.Itoa(v) }).
    Reduce(func(r string, v string) string { return r + v })

Parallel

The Parallel function accept a goroutines int parameter. If goroutines>1, open Parallel , otherwise close Parallel, the stream Parallel is off by default.

Parallel will divide the elements in the stream into multiple partitions equally, and create the same number of goroutine to execute, and it will ensure that the elements in the stream remain in the original order after processing is complete.

s := stream.NewSliceByOrdered([]string{"d", "a", "b", "c", "a"}).
    Parallel(10).
    Filter(func(s string) bool {
    // some time-consuming operations
    return s != "b"
    }).
    Map(func(s string) string {
    // some time-consuming operations
    return "class_" + s
    }).
    ForEach(
    func(index int, s string) {
    // some time-consuming operations
    },
    ).ToSlice()
Parallel Type
  • First: parallel processing ends as soon as the first return value is obtained. For: AllMatch, AnyMatch, FindFunc
  • ALL: All elements need to be processed in parallel, all return values are obtained, and then the parallel is ended. For: Map, Filter
  • Action: All elements need to be processed in parallel, no return value required. For: ForEach, Action
Parallel Goroutines Number

The number of parallel goroutines has different choices for CPU operations and IO operations. Generally, the number of goroutines does not need to be set larger than the number of CPU cores for CPU operations, while the number of goroutines for IO operations can be set to be much larger than the number of CPU cores.

CPU Operations

BenchmarkParallelByCPU

NewSlice(s).Parallel(tt.goroutines).ForEach(func(i int, v int) {
    sort.Ints(newArray(1000)) // Simulate time-consuming CPU operations
})

Benchmark with 6 cpu cores

go test -run=^$ -benchtime=5s -cpu=6  -bench=^BenchmarkParallelByCPU

goos: darwin
goarch: amd64
pkg: github.com/xyctruth/stream
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkParallelByCPU/no_Parallel(0)-6         	     710	   8265106 ns/op
BenchmarkParallelByCPU/goroutines(2)-6          	    1387	   4333929 ns/op
BenchmarkParallelByCPU/goroutines(4)-6          	    2540	   2361783 ns/op
BenchmarkParallelByCPU/goroutines(6)-6          	    3024	   2100158 ns/op
BenchmarkParallelByCPU/goroutines(8)-6          	    2347	   2531435 ns/op
BenchmarkParallelByCPU/goroutines(10)-6         	    2622	   2306752 ns/op
IO Operations

BenchmarkParallelByIO

NewSlice(s).Parallel(tt.goroutines).ForEach(func(i int, v int) {
    time.Sleep(time.Millisecond) // Simulate time-consuming IO operations
})

Benchmark with 6 cpu cores

go test -run=^$ -benchtime=5s -cpu=6  -bench=^BenchmarkParallelByIO

goos: darwin
goarch: amd64
pkg: github.com/xyctruth/stream
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkParallelByIO/no_parallel(0)-6          	      52	 102023558 ns/op
BenchmarkParallelByIO/goroutines(2)-6           	     100	  55807303 ns/op
BenchmarkParallelByIO/goroutines(4)-6           	     214	  27868725 ns/op
BenchmarkParallelByIO/goroutines(6)-6           	     315	  18925789 ns/op
BenchmarkParallelByIO/goroutines(8)-6           	     411	  14439700 ns/op
BenchmarkParallelByIO/goroutines(10)-6          	     537	  11164758 ns/op
BenchmarkParallelByIO/goroutines(50)-6          	    2629	   2310602 ns/op
BenchmarkParallelByIO/goroutines(100)-6         	    5094	   1221887 ns/op

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type MapperFunc

type MapperFunc[E any] func(E) E

type Parallel

type Parallel[E any, R any] struct {
	// contains filtered or unexported fields
}

func (Parallel[E, R]) Run

func (p Parallel[E, R]) Run() []R

type Pipeline

type Pipeline[E any] struct {
	// contains filtered or unexported fields
}

func (*Pipeline[E]) AddStage

func (pipe *Pipeline[E]) AddStage(s2 Stage[E, E])

type SliceComparableStream

type SliceComparableStream[E comparable] struct {
	SliceStream[E]
}

SliceComparableStream Generics constraints based on comparable

func NewSliceByComparable

func NewSliceByComparable[E comparable](source []E) SliceComparableStream[E]

NewSliceByComparable new stream instance, generics constraints based on comparable

func (SliceComparableStream[E]) Distinct

func (stream SliceComparableStream[E]) Distinct() SliceComparableStream[E]

Distinct Returns a stream consisting of the distinct elements of this stream. Remove duplicate according to map comparable.

func (SliceComparableStream[E]) Equal

func (stream SliceComparableStream[E]) Equal(dest []E) bool

Equal Returns whether the source in the stream is equal to the destination source. Equal according to the slices.Equal.

func (SliceComparableStream[E]) Filter

func (stream SliceComparableStream[E]) Filter(predicate func(E) bool) SliceComparableStream[E]

Filter See: SliceStream.Filter

func (SliceComparableStream[E]) Find

func (stream SliceComparableStream[E]) Find(dest E) int

Find Returns the index of the first element in the stream that matches the target element. If not found then -1 is returned.

func (SliceComparableStream[E]) ForEach

func (stream SliceComparableStream[E]) ForEach(action func(int, E)) SliceComparableStream[E]

ForEach See: SliceStream.ForEach

func (SliceComparableStream[E]) Limit

func (stream SliceComparableStream[E]) Limit(maxSize int) SliceComparableStream[E]

Limit See: SliceStream.Limit

func (SliceComparableStream[E]) Map

func (stream SliceComparableStream[E]) Map(mapper func(E) E) SliceComparableStream[E]

Map See: SliceStream.Map

func (SliceComparableStream[E]) Parallel

func (stream SliceComparableStream[E]) Parallel(goroutines int) SliceComparableStream[E]

Parallel See: SliceStream.Parallel

func (SliceComparableStream[E]) SortFunc

func (stream SliceComparableStream[E]) SortFunc(less func(a, b E) bool) SliceComparableStream[E]

SortFunc See: SliceStream.SortFunc

type SliceMappingStream

type SliceMappingStream[E any, MapE any, ReduceE any] struct {
	SliceStream[E]
}

SliceMappingStream Need to convert the type of source elements. - E elements type - MapE map elements type - ReduceE reduce elements type

func NewSliceByMapping

func NewSliceByMapping[E any, MapE any, ReduceE any](source []E) SliceMappingStream[E, MapE, ReduceE]

NewSliceByMapping new stream instance, Need to convert the type of source elements.

- E elements type - MapE map elements type - ReduceE reduce elements type

func (SliceMappingStream[E, MapE, ReduceE]) Filter

func (stream SliceMappingStream[E, MapE, ReduceE]) Filter(predicate func(E) bool) SliceMappingStream[E, MapE, ReduceE]

Filter See: SliceStream.Filter

func (SliceMappingStream[E, MapE, ReduceE]) ForEach

func (stream SliceMappingStream[E, MapE, ReduceE]) ForEach(action func(int, E)) SliceMappingStream[E, MapE, ReduceE]

ForEach See: SliceStream.ForEach

func (SliceMappingStream[E, MapE, ReduceE]) Limit

func (stream SliceMappingStream[E, MapE, ReduceE]) Limit(maxSize int) SliceMappingStream[E, MapE, ReduceE]

Limit See: SliceStream.Limit

func (SliceMappingStream[E, MapE, ReduceE]) Map

func (stream SliceMappingStream[E, MapE, ReduceE]) Map(mapper func(E) MapE) SliceMappingStream[MapE, MapE, ReduceE]

Map Returns a stream consisting of the results of applying the given function to the elements of this stream.

Support Parallel.

func (SliceMappingStream[E, MapE, ReduceE]) Parallel

func (stream SliceMappingStream[E, MapE, ReduceE]) Parallel(goroutines int) SliceMappingStream[E, MapE, ReduceE]

Parallel See: SliceStream.Parallel

func (SliceMappingStream[E, MapE, ReduceE]) Reduce

func (stream SliceMappingStream[E, MapE, ReduceE]) Reduce(result ReduceE, accumulator func(result ReduceE, elem E) ReduceE) ReduceE

Reduce Returns a source consisting of the elements of this stream.

func (SliceMappingStream[E, MapE, ReduceE]) SortFunc

func (stream SliceMappingStream[E, MapE, ReduceE]) SortFunc(less func(a, b E) bool) SliceMappingStream[E, MapE, ReduceE]

SortFunc See: SliceStream.SortFunc

type SliceOrderedStream

type SliceOrderedStream[E constraints.Ordered] struct {
	SliceComparableStream[E]
}

SliceOrderedStream Generics constraints based on constraints.Ordered

func NewSliceByOrdered

func NewSliceByOrdered[E constraints.Ordered](source []E) SliceOrderedStream[E]

NewSliceByOrdered new stream instance, generics constraints based on constraints.Ordered

func (SliceOrderedStream[E]) Distinct

func (stream SliceOrderedStream[E]) Distinct() SliceOrderedStream[E]

Distinct See SliceComparableStream.Distinct

func (SliceOrderedStream[E]) Filter

func (stream SliceOrderedStream[E]) Filter(predicate func(E) bool) SliceOrderedStream[E]

Filter See: SliceStream.Filter

func (SliceOrderedStream[E]) ForEach

func (stream SliceOrderedStream[E]) ForEach(action func(int, E)) SliceOrderedStream[E]

ForEach See: SliceStream.ForEach

func (SliceOrderedStream[E]) IsSorted

func (stream SliceOrderedStream[E]) IsSorted() bool

IsSorted reports whether x is sorted in ascending order. Compare according to the constraints.Ordered. If the source is empty or nil then true is returned.

func (SliceOrderedStream[E]) Limit

func (stream SliceOrderedStream[E]) Limit(maxSize int) SliceOrderedStream[E]

Limit See: SliceStream.Limit

func (SliceOrderedStream[E]) Map

func (stream SliceOrderedStream[E]) Map(mapper func(E) E) SliceOrderedStream[E]

Map See: SliceStream.Map

func (SliceOrderedStream[E]) Max

func (stream SliceOrderedStream[E]) Max() (max E, ok bool)

Max Returns the maximum element of this stream. Compare according to the constraints.Ordered. If the source is empty or nil then E Type default value is returned. ok return false

func (SliceOrderedStream[E]) Min

func (stream SliceOrderedStream[E]) Min() (min E, ok bool)

Min Returns the minimum element of this stream. Compare according to the constraints.Ordered. If the source is empty or nil then E Type default value is returned. ok return false

func (SliceOrderedStream[E]) MinFunc

func (stream SliceOrderedStream[E]) MinFunc(less func(a, b E) bool) (min E, ok bool)

MinFunc Returns the minimum element of this stream. - less: return a < b If the source is empty or nil then E Type default value is returned. ok return false

func (SliceOrderedStream[E]) Parallel

func (stream SliceOrderedStream[E]) Parallel(goroutines int) SliceOrderedStream[E]

Parallel See: SliceStream.Parallel

func (SliceOrderedStream[E]) Sort

func (stream SliceOrderedStream[E]) Sort() SliceOrderedStream[E]

Sort Returns a sorted stream consisting of the elements of this stream. Sorted according to slices.Sort.

func (SliceOrderedStream[E]) SortFunc

func (stream SliceOrderedStream[E]) SortFunc(less func(a, b E) bool) SliceOrderedStream[E]

SortFunc See: SliceStream.SortFunc

type SliceStream

type SliceStream[E any] struct {
	*Pipeline[E]
}

SliceStream Generics constraints based on any

func NewSlice

func NewSlice[E any](source []E) SliceStream[E]

NewSlice new stream instance, generics constraints based on any.

func (SliceStream[E]) AllMatch

func (stream SliceStream[E]) AllMatch(predicate func(E) bool) bool

AllMatch Returns whether all elements in the stream match the provided predicate. If the source is empty or nil then true is returned.

Support Parallel.

func (SliceStream[E]) AnyMatch

func (stream SliceStream[E]) AnyMatch(predicate func(E) bool) bool

AnyMatch Returns whether any elements in the stream match the provided predicate. If the source is empty or nil then false is returned.

Support Parallel.

func (SliceStream[E]) Append

func (stream SliceStream[E]) Append(elements ...E) SliceStream[E]

Append appends elements to the end of this stream

func (SliceStream[E]) At

func (stream SliceStream[E]) At(index int) (elem E, ok bool)

At Returns the element at the given index. Accepts negative integers, which count back from the last item. Out of index range ok return false

func (SliceStream[E]) Count

func (stream SliceStream[E]) Count() int

Count Returns the count of elements in this stream.

func (SliceStream[E]) Delete

func (stream SliceStream[E]) Delete(i, j int) SliceStream[E]

Delete Removes the elements s[i:j] from this stream, returning the modified stream. If j > len(slice) then j = len(slice) If i > j then swap i, j = j, i If the source is empty or nil then do nothing

func (SliceStream[E]) EqualFunc

func (stream SliceStream[E]) EqualFunc(dest []E, equal func(E, E) bool) bool

EqualFunc Returns whether the source in the stream is equal to the destination source. Equal according to the slices.EqualFunc

func (SliceStream[E]) Filter

func (stream SliceStream[E]) Filter(predicate func(E) bool) SliceStream[E]

Filter Returns a stream consisting of the elements of this stream that match the given predicate.

Support Parallel.

func (SliceStream[E]) FindFunc

func (stream SliceStream[E]) FindFunc(predicate func(E) bool) int

FindFunc Returns the index of the first element in the stream that matches the provided predicate. If not found then -1 is returned.

Support Parallel. Parallel side effect is that the element found may not be the first to appear

func (SliceStream[E]) First

func (stream SliceStream[E]) First() (elem E, ok bool)

First Returns the first element in the stream. If the source is empty or nil then E Type default value is returned. ok return false

func (SliceStream[E]) ForEach

func (stream SliceStream[E]) ForEach(action func(int, E)) SliceStream[E]

ForEach Performs an action for each element of this stream.

Support Parallel. Parallel side effects are not executed in the original order of stream elements.

func (SliceStream[E]) Insert

func (stream SliceStream[E]) Insert(index int, elements ...E) SliceStream[E]

Insert inserts the values source... into s at index If index is out of range then use Append to the end

func (SliceStream[E]) IsSortedFunc

func (stream SliceStream[E]) IsSortedFunc(less func(a, b E) bool) bool

IsSortedFunc Returns whether stream is sorted in ascending order. Compare according to the less function - less: return a > b If the source is empty or nil then true is returned.

func (SliceStream[E]) Limit

func (stream SliceStream[E]) Limit(maxSize int) SliceStream[E]

Limit Returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length.

func (SliceStream[E]) Map

func (stream SliceStream[E]) Map(mapper MapperFunc[E]) SliceStream[E]

Map Returns a stream consisting of the results of applying the given function to the elements of this stream.

Support Parallel.

func (SliceStream[E]) MaxFunc

func (stream SliceStream[E]) MaxFunc(less func(a, b E) bool) (max E, ok bool)

MaxFunc Returns the maximum element of this stream. - less: return a > b If the source is empty or nil then E Type default value is returned. ok return false

func (SliceStream[E]) Parallel

func (stream SliceStream[E]) Parallel(goroutines int) SliceStream[E]

Parallel Goroutines > 1 enable parallel, Goroutines <= 1 disable parallel

func (SliceStream[E]) Reduce

func (stream SliceStream[E]) Reduce(result E, accumulator func(result E, elem E) E) E

Reduce Returns a source consisting of the elements of this stream.

func (SliceStream[E]) SortFunc

func (stream SliceStream[E]) SortFunc(less func(a, b E) bool) SliceStream[E]

SortFunc Returns a sorted stream consisting of the elements of this stream. Sorted according to slices.SortFunc.

func (SliceStream[E]) ToSlice

func (stream SliceStream[E]) ToSlice() []E

ToSlice Returns a source in the stream

type Stage

type Stage[E any, R any] func(index int, e E) (isReturn bool, isComplete bool, ret R)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL