co

package module
v0.15.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2022 License: GPL-3.0 Imports: 10 Imported by: 0

README

Go

Co

Co is a concurrency project with GENERIC SUPPORTED, dedicate to three things:

  1. Providing mechanism to dealing data in ReactiveX fashion and related transform algorithm.
  • Async sequence with transform functions such as map, filter, multicast, buffer_time and others.
  1. Providing a round trip pipe on top of Async sequence.

  2. Helping developer to ease the pain of dealing goroutine and channel with less than 2 lines code with:

  • mimicked promising functions: AwaitAll, AwaitAny, AwaitRace
  • high performance none blocking queue: Queue, MultiReceiverQueue
  • high performance worker pool: WorkerPool, DispatchePool

Motivation on Go with Reactive programming

I consider ReactiveX programming pattern as a data stream friendly way to dealing with never ending data. However, most common scenario for such a pattern is in client side programming, for example, in AngularX, and I actually never saw any backend project using the pattern. Actually, it's quite easy to understand. The common pattern in backend simply is to push something to controller (and it will send some data to database) and listen to the callback.

But, considering a case where server should continue to receive time series data, such as user log. The normal pattern would be having an API server to listen to the data, process it, and then send it to some message queue. Usually it's one to one pattern, namely each incoming user log mapped to one request to message queue. With the size of log data, the size of request message and combing with load to send data in TCP pipe, we could just save the user log to some size and send them all together. In this case we have an optimized user log processing pipeline.

I believe that ReactiveX is a good way to deal with such a case. Of course, increasing people are using or considering Go for client side programming, which I believe definitely should apply Co.

However, even though I have mentioned a lot of ReactiveX patterns above. I do not want to create something with exact API. It's due to 1. I found it's originally API to be hard to understand; 2. The server side programming usually don't require that much of time based algorithm.

APIs

https://godoc.org/go.tempura.ink/co

Promising functions:
  • AwaitAll: wait for all promises to be resolved or failed.
  • AwaitRace: wait for any promises to be resolved.
  • AwaitAny: wait for any promises to be resolved or failed.
Queue:
  • Queue: a none blocking queue with unlimited size.
  • MultiReceiverQueue: multiple receiver version of Queue.
Async Sequence:
Combination
  • AsyncCombineLatest: combine latest of multiple async sequence with different types.
  • AsyncMerged: merge multiple async sequence with same type.
  • AsyncMultiCast: broadcasting async sequence to multiple successor sequences.
  • AsyncPartition: horizontally partition elements of multiple async sequence.
  • AsyncZip: get the latest result of all multiple async sequence with different type.
  • AsyncAny: wait for any async sequence to be resolved or failed.
Transform
  • AsyncAdjacentFilter: filter adjacent elements.
  • AsyncBufferTime: buffer elements for a certain time.
  • AsyncCompacted: remove empty value form elements.
  • AsyncFlatten: flatten nested async sequence.
  • AsyncMap: map elements to other type / value.
Time based transform
  • AsyncDebounce: discard elements inside or outside a given sliding windows.
Creating asynchronous sequence
  • OfList: create an asynchronous sequence from a list.
  • FromChan: create an asynchronous sequence from a channel; also can be created with a buffered channel.
  • FromChanBuffered: create an asynchronous sequence from a channel with unlimited buffer size.
  • FromFn: create an asynchronous sequence from closure function.
  • AsyncSubject: create an asynchronous sequence with a Next/Error/Complete method.
Round Trip
  • AsyncRoundTripper: create an asynchronous manager with a round trip, which mean sender can receive callback from handler, it can be used to create an HTTP server.

Getting started

Navigate to your project base and go get go.tempura.ink/co

Examples

Parallel
p := co.NewParallel[bool](10)// worker size
for i := 0; i < 10000; i++ {
    func(idx int) {
        p.Process(func() bool {
            actual[idx] = true
           return true
        })
    }(i)
}

// Wait doesn't indicate a Run, the job will run once added
// convey.So, you could ignore Wait() in some cases
vals := p.Wait()
Awaits
handlers := make([]func() (int, error), 0)
for i := 0; i < 1000; i++ {
    i := i
    handlers = append(handlers, func() (int, error) {
        return i + 1, nil
    })
}

responses := co.AwaitAll[int](handlers...)
Async Sequence
numbers := []int{1, 4, 5, 6, 7}
aList := co.OfListWith(numbers...)

numbers2 := []int{2, 4, 7, 0, 21}
aList2 := co.OfListWith(numbers2...)
mList := co.NewAsyncMapSequence[int](aList, func(v int) int {
    return v + 1
})

pList := co.NewAsyncMergedSequence[int](mList, aList2)

result := []int{}
for data := range pList.Iter() {
    result = append(result, data)
}

with time based transformation

queued := []int{1, 4, 5, 6, 7, 2, 2, 3, 4, 5, 12, 4, 2, 3, 43, 127, 37598, 34, 34, 123, 123}
sourceCh := make(chan int)

oChannel := co.FromChan(sourceCh)
bList := co.NewAsyncBufferTimeSequence[int](oChannel, time.Second)

// simulate handling on other go routine
go func() {
    time.Sleep(time.Second)
    for i, val := range queued {
        sourceCh <- val
        time.Sleep(time.Millisecond * (100 + time.Duration(i)*10))
    }
    oChannel.Complete()
}()

result := [][]int{}
for data := range bList.Iter() {
    result = append(acturesultal, data)
}

Benchmark

Pool benchmark

goos: darwin
goarch: amd64
pkg: github.com/tempura-shrimp/co/pool
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkWorkPoolWithFib-12                10000            178232 ns/op
BenchmarkDispatchPoolWithFib-12            10000            176316 ns/op
BenchmarkFibWithAwaitAll-12                10000            194145 ns/op
BenchmarkFibWithTunny-12                   10000            907005 ns/op
BenchmarkFibSequence-12                    10000            959778 ns/op
PASS

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AnyET

func AnyET[T comparable](ele []T, target T) bool

func AwaitAll

func AwaitAll[R any](fns ...func() (R, error)) []*data[R]

func AwaitAny

func AwaitAny[R any](fns ...func() (R, error)) *data[R]

func AwaitRace

func AwaitRace[R any](fns ...func() (R, error)) R

func CastOrNil

func CastOrNil[T any](el any) T

func Copy

func Copy[T any](v *T) *T

func EvertET

func EvertET[T comparable](ele []T, target T) bool

func EvertGET

func EvertGET[T constraints.Ordered](ele []T, target T) bool

func NewAsyncRoundTrip

func NewAsyncRoundTrip[R any, E any, T seqItem[R]]() *asyncRoundTrip[R, E, T]

func NewAsyncSequence

func NewAsyncSequence[R any](it AsyncSequenceable[R]) *asyncSequence[R]

func NewAsyncSequenceIterator

func NewAsyncSequenceIterator[T any](it Iterator[T]) *asyncSequenceIterator[T]

func NewData

func NewData[R any]() *data[R]

func NewDataWith

func NewDataWith[R any](val R, err error) *data[R]

func NewExecutablesList

func NewExecutablesList[R any]() *executablesList[R]

func NewExecutor

func NewExecutor[R any]() *executable[R]

func NewIterativeList

func NewIterativeList[R any]() *iterativeList[R]

func NewParallel

func NewParallel[R any](maxWorkers int) *parallel[R]

Types

type Action

type Action[E any] struct {
	// contains filtered or unexported fields
}

func All

func All[R any](list *executablesList[R]) *Action[*data[R]]

func Any

func Any[R any](list *executablesList[R]) *Action[*data[R]]

func MapAction

func MapAction[T1, T2 any](a1 *Action[T1], fn func(T1) T2) *Action[T2]

func NewAction

func NewAction[E any]() *Action[E]

func Race

func Race[R any](list *executablesList[R]) *Action[R]

func (*Action[E]) DiscardData

func (a *Action[E]) DiscardData() *Action[E]

func (*Action[E]) GetData

func (a *Action[E]) GetData() []E

func (*Action[E]) Iter

func (a *Action[E]) Iter() chan E

func (*Action[E]) PeakData

func (a *Action[E]) PeakData() E

type AsyncAdjacentFilterSequence

type AsyncAdjacentFilterSequence[R any] struct {
	// contains filtered or unexported fields
}

func NewAsyncAdjacentFilterSequence

func NewAsyncAdjacentFilterSequence[R any](it AsyncSequenceable[R], fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (AsyncAdjacentFilterSequence) AdjacentFilter

func (a AsyncAdjacentFilterSequence) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (AsyncAdjacentFilterSequence) EIter

func (a AsyncAdjacentFilterSequence) EIter() <-chan *data[R]

func (AsyncAdjacentFilterSequence) Iter

func (a AsyncAdjacentFilterSequence) Iter() <-chan R

func (AsyncAdjacentFilterSequence) Merge

func (a AsyncAdjacentFilterSequence) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

func (*AsyncAdjacentFilterSequence[R]) SetPredicator

func (c *AsyncAdjacentFilterSequence[R]) SetPredicator(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

type AsyncAnySequence

type AsyncAnySequence[R any] struct {
	// contains filtered or unexported fields
}

func NewAsyncAnySequence

func NewAsyncAnySequence[R any](its ...AsyncSequenceable[R]) *AsyncAnySequence[R]

func (AsyncAnySequence) AdjacentFilter

func (a AsyncAnySequence) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (AsyncAnySequence) EIter

func (a AsyncAnySequence) EIter() <-chan *data[R]

func (AsyncAnySequence) Iter

func (a AsyncAnySequence) Iter() <-chan R

func (AsyncAnySequence) Merge

func (a AsyncAnySequence) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

type AsyncBufferTimeSequence

type AsyncBufferTimeSequence[R any, T []R] struct {
	// contains filtered or unexported fields
}

func NewAsyncBufferTimeSequence

func NewAsyncBufferTimeSequence[R any, T []R](it AsyncSequenceable[R], interval time.Duration) *AsyncBufferTimeSequence[R, T]

func (AsyncBufferTimeSequence) AdjacentFilter

func (a AsyncBufferTimeSequence) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (AsyncBufferTimeSequence) EIter

func (a AsyncBufferTimeSequence) EIter() <-chan *data[R]

func (AsyncBufferTimeSequence) Iter

func (a AsyncBufferTimeSequence) Iter() <-chan R

func (AsyncBufferTimeSequence) Merge

func (a AsyncBufferTimeSequence) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

func (*AsyncBufferTimeSequence[R, T]) SetInterval

func (a *AsyncBufferTimeSequence[R, T]) SetInterval(interval time.Duration) *AsyncBufferTimeSequence[R, T]

type AsyncBufferedChan

type AsyncBufferedChan[R any] struct {
	// contains filtered or unexported fields
}

func FromChanBuffered

func FromChanBuffered[R any](ch chan R) *AsyncBufferedChan[R]

func (AsyncBufferedChan) AdjacentFilter

func (a AsyncBufferedChan) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (*AsyncBufferedChan[R]) Complete

func (a *AsyncBufferedChan[R]) Complete() *AsyncBufferedChan[R]

func (AsyncBufferedChan) EIter

func (a AsyncBufferedChan) EIter() <-chan *data[R]

func (AsyncBufferedChan) Iter

func (a AsyncBufferedChan) Iter() <-chan R

func (AsyncBufferedChan) Merge

func (a AsyncBufferedChan) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

type AsyncChannel

type AsyncChannel[R any] struct {
	// contains filtered or unexported fields
}

func FromChan

func FromChan[R any](ch chan R) *AsyncChannel[R]

func (AsyncChannel) AdjacentFilter

func (a AsyncChannel) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (*AsyncChannel[R]) Complete

func (a *AsyncChannel[R]) Complete() *AsyncChannel[R]

func (AsyncChannel) EIter

func (a AsyncChannel) EIter() <-chan *data[R]

func (AsyncChannel) Iter

func (a AsyncChannel) Iter() <-chan R

func (AsyncChannel) Merge

func (a AsyncChannel) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

type AsyncCombineLatestSequence

type AsyncCombineLatestSequence[R any] struct {
	// contains filtered or unexported fields
}

func CombineLatest

func CombineLatest[T1, T2 any](seq1 AsyncSequenceable[T1], seq2 AsyncSequenceable[T2]) *AsyncCombineLatestSequence[Type2[T1, T2]]

func CombineLatest3

func CombineLatest3[T1, T2, T3 any](seq1 AsyncSequenceable[T1], seq2 AsyncSequenceable[T2], seq3 AsyncSequenceable[T3]) *AsyncCombineLatestSequence[Type3[T1, T2, T3]]

func NewAsyncCombineLatestSequence

func NewAsyncCombineLatestSequence[R any](its []iteratorAny) *AsyncCombineLatestSequence[R]

func (AsyncCombineLatestSequence) AdjacentFilter

func (a AsyncCombineLatestSequence) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (AsyncCombineLatestSequence) EIter

func (a AsyncCombineLatestSequence) EIter() <-chan *data[R]

func (AsyncCombineLatestSequence) Iter

func (a AsyncCombineLatestSequence) Iter() <-chan R

func (AsyncCombineLatestSequence) Merge

func (a AsyncCombineLatestSequence) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

type AsyncCompactedSequence

type AsyncCompactedSequence[R comparable] struct {
	// contains filtered or unexported fields
}

func NewAsyncCompactedSequence

func NewAsyncCompactedSequence[R comparable](it AsyncSequenceable[R]) *AsyncCompactedSequence[R]

func (AsyncCompactedSequence) AdjacentFilter

func (a AsyncCompactedSequence) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (AsyncCompactedSequence) EIter

func (a AsyncCompactedSequence) EIter() <-chan *data[R]

func (AsyncCompactedSequence) Iter

func (a AsyncCompactedSequence) Iter() <-chan R

func (AsyncCompactedSequence) Merge

func (a AsyncCompactedSequence) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

func (*AsyncCompactedSequence[R]) SetPredicator

func (c *AsyncCompactedSequence[R]) SetPredicator(fn func(R) bool) *AsyncCompactedSequence[R]

type AsyncData

type AsyncData[R any] struct {
	// contains filtered or unexported fields
}

func NewAsyncData

func NewAsyncData[R any](val R) *AsyncData[R]

func (*AsyncData[R]) IsComplete

func (as *AsyncData[R]) IsComplete() bool

func (*AsyncData[R]) IsIdel

func (as *AsyncData[R]) IsIdel() bool

func (*AsyncData[R]) IsPending

func (as *AsyncData[R]) IsPending() bool

func (*AsyncData[R]) TransiteTo

func (as *AsyncData[R]) TransiteTo(status asyncStatus) *AsyncData[R]

type AsyncDebounceSequence

type AsyncDebounceSequence[R any] struct {
	// contains filtered or unexported fields
}

func NewAsyncDebounceSequence

func NewAsyncDebounceSequence[R any](it AsyncSequenceable[R], interval time.Duration) *AsyncDebounceSequence[R]

func (AsyncDebounceSequence) AdjacentFilter

func (a AsyncDebounceSequence) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (AsyncDebounceSequence) EIter

func (a AsyncDebounceSequence) EIter() <-chan *data[R]

func (AsyncDebounceSequence) Iter

func (a AsyncDebounceSequence) Iter() <-chan R

func (AsyncDebounceSequence) Merge

func (a AsyncDebounceSequence) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

func (*AsyncDebounceSequence[R]) SetInterval

func (a *AsyncDebounceSequence[R]) SetInterval(interval time.Duration) *AsyncDebounceSequence[R]

func (*AsyncDebounceSequence[R]) SetTolerance

func (a *AsyncDebounceSequence[R]) SetTolerance(tolerance time.Duration) *AsyncDebounceSequence[R]

type AsyncFlattenSequence

type AsyncFlattenSequence[R any, T []R] struct {
	// contains filtered or unexported fields
}

func NewAsyncFlattenSequence

func NewAsyncFlattenSequence[R any, T []R](it AsyncSequenceable[T]) *AsyncFlattenSequence[R, T]

func (AsyncFlattenSequence) AdjacentFilter

func (a AsyncFlattenSequence) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (AsyncFlattenSequence) EIter

func (a AsyncFlattenSequence) EIter() <-chan *data[R]

func (AsyncFlattenSequence) Iter

func (a AsyncFlattenSequence) Iter() <-chan R

func (AsyncFlattenSequence) Merge

func (a AsyncFlattenSequence) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

type AsyncFns

type AsyncFns[R any] struct {
	// contains filtered or unexported fields
}

func FromFn

func FromFn[R any]() *AsyncFns[R]

func (*AsyncFns[R]) AddFns

func (c *AsyncFns[R]) AddFns(fns ...func() (R, error)) *AsyncFns[R]

func (AsyncFns) AdjacentFilter

func (a AsyncFns) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (AsyncFns) EIter

func (a AsyncFns) EIter() <-chan *data[R]

func (AsyncFns) Iter

func (a AsyncFns) Iter() <-chan R

func (AsyncFns) Merge

func (a AsyncFns) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

type AsyncInterval

type AsyncInterval[R any] struct {
	// contains filtered or unexported fields
}

func Interval

func Interval(period int) *AsyncInterval[int]

func (AsyncInterval) AdjacentFilter

func (a AsyncInterval) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (*AsyncInterval[R]) Complete

func (a *AsyncInterval[R]) Complete() *AsyncInterval[R]

func (AsyncInterval) EIter

func (a AsyncInterval) EIter() <-chan *data[R]

func (AsyncInterval) Iter

func (a AsyncInterval) Iter() <-chan R

func (AsyncInterval) Merge

func (a AsyncInterval) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

type AsyncList

type AsyncList[R any] struct {
	// contains filtered or unexported fields
}

func OfList

func OfList[R any]() *AsyncList[R]

func OfListWith

func OfListWith[R any](val ...R) *AsyncList[R]

func (*AsyncList[R]) Add

func (it *AsyncList[R]) Add(e ...R) *AsyncList[R]

func (AsyncList) AdjacentFilter

func (a AsyncList) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (AsyncList) EIter

func (a AsyncList) EIter() <-chan *data[R]

func (AsyncList) Iter

func (a AsyncList) Iter() <-chan R

func (AsyncList) Merge

func (a AsyncList) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

type AsyncMapSequence

type AsyncMapSequence[R, T any] struct {
	// contains filtered or unexported fields
}

func NewAsyncMapSequence

func NewAsyncMapSequence[R, T any](p AsyncSequenceable[R], fn func(R) T) *AsyncMapSequence[R, T]

func (AsyncMapSequence) AdjacentFilter

func (a AsyncMapSequence) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (AsyncMapSequence) EIter

func (a AsyncMapSequence) EIter() <-chan *data[R]

func (AsyncMapSequence) Iter

func (a AsyncMapSequence) Iter() <-chan R

func (AsyncMapSequence) Merge

func (a AsyncMapSequence) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

func (*AsyncMapSequence[R, T]) SetPredicator

func (c *AsyncMapSequence[R, T]) SetPredicator(fn func(R) T) *AsyncMapSequence[R, T]

type AsyncMergedSequence

type AsyncMergedSequence[R any] struct {
	// contains filtered or unexported fields
}

func NewAsyncMergedSequence

func NewAsyncMergedSequence[R any](as ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

func (AsyncMergedSequence) AdjacentFilter

func (a AsyncMergedSequence) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (AsyncMergedSequence) EIter

func (a AsyncMergedSequence) EIter() <-chan *data[R]

func (AsyncMergedSequence) Iter

func (a AsyncMergedSequence) Iter() <-chan R

func (AsyncMergedSequence) Merge

func (a AsyncMergedSequence) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

type AsyncMulticastConnector

type AsyncMulticastConnector[R any] struct {
	*AsyncMulticastSequence[R]
	// contains filtered or unexported fields
}

func (AsyncMulticastConnector) AdjacentFilter

func (a AsyncMulticastConnector) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (AsyncMulticastConnector) EIter

func (a AsyncMulticastConnector) EIter() <-chan *data[R]

func (AsyncMulticastConnector) Iter

func (a AsyncMulticastConnector) Iter() <-chan R

func (AsyncMulticastConnector) Merge

func (a AsyncMulticastConnector) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

type AsyncMulticastSequence

type AsyncMulticastSequence[R any] struct {
	// contains filtered or unexported fields
}

func NewAsyncMulticastSequence

func NewAsyncMulticastSequence[R any](it AsyncSequenceable[R]) *AsyncMulticastSequence[R]

func (*AsyncMulticastSequence[R]) Connect

func (a *AsyncMulticastSequence[R]) Connect() *AsyncMulticastConnector[R]

type AsyncPairwiseSequence

type AsyncPairwiseSequence[R any, T []R] struct {
	// contains filtered or unexported fields
}

func NewAsyncPairwiseSequence

func NewAsyncPairwiseSequence[R any, T []R](it AsyncSequenceable[R]) *AsyncPairwiseSequence[R, T]

func (AsyncPairwiseSequence) AdjacentFilter

func (a AsyncPairwiseSequence) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (AsyncPairwiseSequence) EIter

func (a AsyncPairwiseSequence) EIter() <-chan *data[R]

func (AsyncPairwiseSequence) Iter

func (a AsyncPairwiseSequence) Iter() <-chan R

func (AsyncPairwiseSequence) Merge

func (a AsyncPairwiseSequence) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

type AsyncPartitionSequence

type AsyncPartitionSequence[R any] struct {
	// contains filtered or unexported fields
}

func NewAsyncPartitionSequence

func NewAsyncPartitionSequence[R any](size int, its ...AsyncSequenceable[R]) *AsyncPartitionSequence[R]

func (AsyncPartitionSequence) AdjacentFilter

func (a AsyncPartitionSequence) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (AsyncPartitionSequence) EIter

func (a AsyncPartitionSequence) EIter() <-chan *data[R]

func (AsyncPartitionSequence) Iter

func (a AsyncPartitionSequence) Iter() <-chan R

func (AsyncPartitionSequence) Merge

func (a AsyncPartitionSequence) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

func (*AsyncPartitionSequence[R]) SetSize

func (a *AsyncPartitionSequence[R]) SetSize(size int) *AsyncPartitionSequence[R]

type AsyncSequenceable

type AsyncSequenceable[R any] interface {
	// contains filtered or unexported methods
}

type AsyncSubject

type AsyncSubject[R any] struct {
	// contains filtered or unexported fields
}

func NewAsyncSubject

func NewAsyncSubject[R any]() *AsyncSubject[R]

func (AsyncSubject) AdjacentFilter

func (a AsyncSubject) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (*AsyncSubject[R]) Complete

func (c *AsyncSubject[R]) Complete() *AsyncSubject[R]

func (AsyncSubject) EIter

func (a AsyncSubject) EIter() <-chan *data[R]

func (*AsyncSubject[R]) Error

func (c *AsyncSubject[R]) Error(err error) *AsyncSubject[R]

func (AsyncSubject) Iter

func (a AsyncSubject) Iter() <-chan R

func (AsyncSubject) Merge

func (a AsyncSubject) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

func (*AsyncSubject[R]) Next

func (c *AsyncSubject[R]) Next(val R) *AsyncSubject[R]

type AsyncZipSequence

type AsyncZipSequence[R any] struct {
	// contains filtered or unexported fields
}

func NewAsyncZipSequence

func NewAsyncZipSequence[R any](its []iteratorAny) *AsyncZipSequence[R]

func Zip

func Zip[T1, T2 any](seq1 AsyncSequenceable[T1], seq2 AsyncSequenceable[T2]) *AsyncZipSequence[Type2[T1, T2]]

func Zip3

func Zip3[T1, T2, T3 any](seq1 AsyncSequenceable[T1], seq2 AsyncSequenceable[T2], seq3 AsyncSequenceable[T3]) *AsyncZipSequence[Type3[T1, T2, T3]]

func (AsyncZipSequence) AdjacentFilter

func (a AsyncZipSequence) AdjacentFilter(fn func(R, R) bool) *AsyncAdjacentFilterSequence[R]

func (AsyncZipSequence) EIter

func (a AsyncZipSequence) EIter() <-chan *data[R]

func (AsyncZipSequence) Iter

func (a AsyncZipSequence) Iter() <-chan R

func (AsyncZipSequence) Merge

func (a AsyncZipSequence) Merge(its ...AsyncSequenceable[R]) *AsyncMergedSequence[R]

type CondCh added in v0.15.7

type CondCh = syncx.CondCh

type ErrorMode

type ErrorMode int
const (
	ErrorModeSkip ErrorMode = iota
	ErrorModeStop
)

type Iterator

type Iterator[T any] interface {
	Iter() <-chan T
	EIter() <-chan *data[T]
	// contains filtered or unexported methods
}

type List

type List[R any] struct {
	// contains filtered or unexported fields
}

func NewList

func NewList[R any]() *List[R]

type Optional

type Optional[R any] struct {
	// contains filtered or unexported fields
}

func NewOptionalEmpty

func NewOptionalEmpty[R any]() *Optional[R]

func OptionalIf

func OptionalIf[R any](val R) *Optional[R]

func OptionalOf

func OptionalOf[R any](val R) *Optional[R]

func (Optional[R]) AsOptional

func (op Optional[R]) AsOptional() *Optional[any]

func (Optional[R]) Equals

func (op Optional[R]) Equals(value R) bool

func (Optional[R]) Get

func (op Optional[R]) Get() R

func (Optional[R]) IsPresent

func (op Optional[R]) IsPresent() bool

func (Optional[R]) String

func (op Optional[R]) String() string

type Type2

type Type2[T1, T2 any] struct {
	V1 T1
	V2 T2
}

type Type3

type Type3[T1, T2, T3 any] struct {
	V1 T1
	V2 T2
	V3 T3
}

Directories

Path Synopsis
ds
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL