pipe

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2023 License: MIT Imports: 9 Imported by: 0

README

pipe

pipe provides versatile channel transformers for Golang. Currently it can:

  • Converge values from multiple upstream channels into single downstream channel;
  • Broadcast values from single upstream channels to multiple downstream channels.

Channel Broadcasting

Simple Broadcaster

Use Broadcast to create a simple broadcaster

upstream := make(chan int)
b := pipe.Broadcast(upstream)

Broadcaster provides .Listen and .Bind methods for registering downstream listeners

l := make(chan int)
canceler := b.Bind(l)
// equivalent to
l2, canceler2 := b.Listen()
upstream <- 42
l3, canceler3 := b.Listen()
close(upstream)

// value 42 is multiplexed to all listeners
<-l  // 42
<-l2 // 42

// no more values after upstream was closed, broadcaster will pipe
// pending values out to all listeners and close them.
_, ok := <-l   // ok == false
_, ok2 := <-l2 // ok2 == false

// since l3 was registered after value 42 sent, it was closed without
// receiving any value
_, ok3 := <-l3 // ok3 == false

The returned canceler can be used for canceling subscription

l, canceler := b.Listen()
upstream <- 42
<-l          // 42
canceler()
_, ok := <-l // ok == false, since l was closed after cancellation

Broadcaster gaurantees that sending to upstream channel will not block, even if there's no listeners

upstream := make(chan int)
b := pipe.Broadcast(upstream)
upstream <- 42 // won't block

Use .Detach if you want to stop the broadcaster prematurely before the upstream closed, after which sending to upstream will block again

upstream := make(chan int)
b := pipe.Broadcast(upstream)
b.Detach()
upstream <- 42 // will block
Memorizable Broadcaster

Sometimes you may expect newly registered listener to be immediately fed with the latest value from upstream. For this scenario, we use the BroadcastM constructor

upstream := make(chan int)
b := pipe.BroadcastM(upstream, 0) // 0 is an initial value
l, _ := b.Listen()
<-l  // 0
upstream <- 42
l2, _ := b.Listen()
<-l2 // 42

Memorizable broadcaster provides .Current method to retrieve the latest value

upstream := make(chan int)
b := pipe.BroadcastM(upstream, 0)
b.Current() // 0
upstream <- 42
b.Current() // 42
Broadcaster with comparable element type

For upstream with a comparable element type (int, string, etc.), we can use BroadcastC to create a broadcaster that provides additional useful methods.

Currently, such broadcaster provides .Until(targets...) method, which will block until one of the values in targets shown up from the upstream.

upstream := make(chan int)
b := pipe.BroadcastC(upstream)
go func() {
    for i := 0; i < 5; i++ {
        time.Sleep(1 * time.Second)
        b <- i
    }
}()
// block until value 3 or 4 shown up from the upstream, which should be
// approximately 4 seconds
b.Until(3, 4)

.Until has variants like .UntilCh and .UntilContext.

And also we have BroadcastCM, which combines the functionality of BroadcastC and BroadcastM. For more details please refer to godoc.

Controller and Listener

A Controller bundles an upstream and a broadcaster, which is handy in some cases

con := pipe.NewController[int]()
l, _ := con.Listen()

// con.Sink() exposes the upstream channel
con.Sink() <- 42
close(con.Sink())

<-l // 42

It's recommended to store controllers as private fields and expose them as Listenable interface, to which users can bind listeners. Consider an imaginary scenario

// library-side
type State int

type Service struct {
    state *pipe.Controller[State]
}

func (ser *Service) State() Listenable[State] { return ser.state }
func (ser *Service) businessLogic() {
    // ...
    ser.state.Sink() <- SomeState
}

// user-side
listener, _ := service.State().Listen()

Similarly, there are variants like Controller(C|M|CM) and Listenable(C|M|CM).

Channel Converging

The method Converge2, Converge3 and ConvergeN implements the channel converging logic

// Converge 2 channels
a := make(chan int)
b := make(chan string)
r := pipe.Converge2(a, b)
go func() {
    a <- 42
    b <- "foo"
    close(a)
    close(b)
}()
for x := range r {
    println(x)
}
// Output:
// 42
// foo
// Converge arbitary number of channels
a := make(chan int)
b := make(chan string)
c := make(chan float64)
d := make(chan byte)
r := pipe.ConvergeN(a, b, c, d)
go func() {
    a <- 42
    b <- "foo"
    c <- 3.14
    d <- byte(127)
    close(a)
    close(b)
    close(c)
    close(d)
}()
for x := range r {
    println(x)
}
// Output:
// 42
// foo
// 3.14
// 127

License

The library is licensed under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Converge2

func Converge2[A, B any](ch1 <-chan A, ch2 <-chan B) <-chan any

Converge2 converges values from ch1 and ch2 into returned channel.

func Converge3

func Converge3[A, B, C any](ch1 <-chan A, ch2 <-chan B, ch3 <-chan C) <-chan any

Converge2 converges values from ch1, ch3 and ch2 into returned channel.

func ConvergeN

func ConvergeN(chans ...any) <-chan any

ConvergeN converges values from arbitary number of channels. Each of chans should be of type <-chan T for some T.

func Until

func Until[T comparable, P Listenable[T]](b P, targets ...T)

Until blocks until one of the conditions satisfies: 1) one of the value from b shows up in targets; 2) b does not accept new listeners (either b is detached or upstream channel closed).

func UntilCh

func UntilCh[T comparable, P Listenable[T]](b P, targets ...T) (signalCh <-chan struct{}, canceller func())

UntilCh is the asynchronous version of Until. The returned signalCh will be closed when one of the conditions satisfies: 1) one of the value from b shows up in targets; 2) b does not accept new listeners (either b is detached or upstream channel closed); 3) canceller is called.

func UntilContext

func UntilContext[T comparable, P Listenable[T]](ctx context.Context, b P, targets ...T)

UntilContext blocks until one of the conditions satisfies: 1) one of the value from b shows up in targets; 2) b does not accept new listeners (either b is detached or upstream channel closed); 3) ctx is canceled.

Types

type Broadcaster

type Broadcaster[T any] struct {
	// contains filtered or unexported fields
}

func Broadcast

func Broadcast[T any](upstream <-chan T) *Broadcaster[T]

Broadcast returns a Broadcaster that pipes values from upstream channel into listeners. Broadcaster gaurantees upstream <- val from outside will NOT block, but if it's detached prematurely, upstream <- val will block again.

func (*Broadcaster) Detach

func (b *Broadcaster) Detach()

Detach prematurely detaches the broadcaster from the upstream channel. No more values from upstream channel would be broadcasted, and no more new listeners should be registered.

type BroadcasterC

type BroadcasterC[T comparable] struct {
	// contains filtered or unexported fields
}

func BroadcastC

func BroadcastC[T comparable](in <-chan T) *BroadcasterC[T]

BroadcastC returns a broadcaster with a comparable type T as element type. This allows methods like b.Until(targets...) to be called instead of Until(b, targets...), which helps auto type inference and sometimes saves the typing of type variables.

func (*BroadcasterC) Until

func (b *BroadcasterC) Until(targets ...T)

Shorthand for Until(b, targets...)

func (*BroadcasterC) UntilCh

func (b *BroadcasterC) UntilCh(targets ...T) (<-chan struct{}, func())

Shorthand for UntilCh(b, targets...)

func (*BroadcasterC) UntilContext

func (b *BroadcasterC) UntilContext(ctx context.Context, targets ...T)

Shorthand for UntilContext(ctx, b, targets...)

type BroadcasterCM

type BroadcasterCM[T comparable] struct {
	// contains filtered or unexported fields
}

func BroadcastCM

func BroadcastCM[T comparable](in <-chan T, initial T) *BroadcasterCM[T]

BroadcastCM returns a broadcaster with comparable element type and also is able to memorize the latest value.

func (*BroadcasterCM[T]) Current

func (b *BroadcasterCM[T]) Current() T

Current returns the latest value that the broadcaster memorizes.

func (*BroadcasterCM) Detach added in v0.2.0

func (b *BroadcasterCM) Detach()

Detach prematurely detaches the broadcaster from the upstream channel. No more values from upstream channel would be broadcasted, and no more new listeners should be registered.

type BroadcasterM

type BroadcasterM[T any] struct {
	// contains filtered or unexported fields
}

func BroadcastM

func BroadcastM[T any](upstream <-chan T, initial T) *BroadcasterM[T]

BroadcastM returns a broadcaster that memorizes the latest value from upstream. Newly registered listener will be firstly fed with the memorized latest value, then subsequent values from upstream. If no value coming out of upstream yet, initial is fed. The latest value is stored by value (instead of by reference).

func (*BroadcasterM[T]) Current

func (b *BroadcasterM[T]) Current() T

Current returns the latest value that the broadcaster memorizes.

func (*BroadcasterM) Detach

func (b *BroadcasterM) Detach()

Detach prematurely detaches the broadcaster from the upstream channel. No more values from upstream channel would be broadcasted, and no more new listeners should be registered.

type Controller

type Controller[T any] struct {
	// contains filtered or unexported fields
}

A Controller bundles a sink channel and a broadcaster.

func NewController

func NewController[T any]() *Controller[T]

func (*Controller) Bind

func (b *Controller) Bind(out chan<- T) (cancel func())

Bind registers out as a new listener, which receives subsequent values from the upstream channel. If the input channel closed or the broadcaster detached, out will be closed immediately. A canceller is returned for canceling the subscription. When called, out will be unregistered and closed.

func (*Controller) BindContext added in v0.2.3

func (b *Controller) BindContext(ctx context.Context, out chan<- T)

BindContext is similar to Bind, but will cancel when ctx is done.

func (*Controller) Listen

func (b *Controller) Listen() (<-chan T, func())

Listen creates a new output channel and registers it as a new listener. The output channel and corresponding canceller is returned.

func (*Controller) ListenContext added in v0.2.3

func (b *Controller) ListenContext(ctx context.Context) <-chan T

ListenContext is similar to Listen, but will cancel when ctx is done.

func (*Controller[T]) Send added in v0.2.0

func (c *Controller[T]) Send(value T) (ok bool)

Send sends value to the sink channel.

func (*Controller[T]) Sink

func (c *Controller[T]) Sink() chan<- T

Sink returns the sink channel of the controller.

type ControllerC

type ControllerC[T comparable] struct {
	// contains filtered or unexported fields
}

A Controller with a comparable element type.

func NewControllerC

func NewControllerC[T comparable]() *ControllerC[T]

func (*ControllerC[T]) Send added in v0.2.0

func (c *ControllerC[T]) Send(value T) (ok bool)

Send sends value to the sink channel.

func (*ControllerC[T]) Sink

func (c *ControllerC[T]) Sink() chan<- T

Sink returns the sink channel of the controller.

func (*ControllerC) Until

func (b *ControllerC) Until(targets ...T)

Shorthand for Until(b, targets...)

func (*ControllerC) UntilCh

func (b *ControllerC) UntilCh(targets ...T) (<-chan struct{}, func())

Shorthand for UntilCh(b, targets...)

func (*ControllerC) UntilContext

func (b *ControllerC) UntilContext(ctx context.Context, targets ...T)

Shorthand for UntilContext(ctx, b, targets...)

type ControllerCM

type ControllerCM[T comparable] struct {
	// contains filtered or unexported fields
}

func NewControllerCM

func NewControllerCM[T comparable](initial T, dedup bool) *ControllerCM[T]

A Controller with a comparable element type and memorizable broadcaster.

func (*ControllerCM[T]) Current

func (c *ControllerCM[T]) Current() T

Current returns the latest value that the broadcaster memorizes.

func (*ControllerCM[T]) Send added in v0.2.0

func (c *ControllerCM[T]) Send(value T) (ok bool)

Send sends value to the sink channel.

func (*ControllerCM[T]) Sink

func (c *ControllerCM[T]) Sink() chan<- T

Sink returns the sink channel of the controller.

func (*ControllerCM) Until

func (b *ControllerCM) Until(targets ...T)

Shorthand for Until(b, targets...)

func (*ControllerCM) UntilCh

func (b *ControllerCM) UntilCh(targets ...T) (<-chan struct{}, func())

Shorthand for UntilCh(b, targets...)

func (*ControllerCM) UntilContext

func (b *ControllerCM) UntilContext(ctx context.Context, targets ...T)

Shorthand for UntilContext(ctx, b, targets...)

type ControllerM

type ControllerM[T any] struct {
	// contains filtered or unexported fields
}

A Controller with a memorizable broadcaster.

func NewControllerM

func NewControllerM[T any](initial T) *ControllerM[T]

func (*ControllerM) Bind

func (b *ControllerM) Bind(out chan<- T) (cancel func())

Bind registers out as a new listener, which receives subsequent values from the upstream channel. If the input channel closed or the broadcaster detached, out will be closed immediately. A canceller is returned for canceling the subscription. When called, out will be unregistered and closed.

func (*ControllerM) BindContext added in v0.2.3

func (b *ControllerM) BindContext(ctx context.Context, out chan<- T)

BindContext is similar to Bind, but will cancel when ctx is done.

func (*ControllerM[T]) Current

func (c *ControllerM[T]) Current() T

Current returns the latest value that the broadcaster memorizes.

func (*ControllerM) Listen

func (b *ControllerM) Listen() (<-chan T, func())

Listen creates a new output channel and registers it as a new listener. The output channel and corresponding canceller is returned.

func (*ControllerM) ListenContext added in v0.2.3

func (b *ControllerM) ListenContext(ctx context.Context) <-chan T

ListenContext is similar to Listen, but will cancel when ctx is done.

func (*ControllerM[T]) Send added in v0.2.0

func (c *ControllerM[T]) Send(value T) (ok bool)

Send sends value to the sink channel.

func (*ControllerM[T]) Sink

func (c *ControllerM[T]) Sink() chan<- T

Sink returns the sink channel of the controller.

type Listenable

type Listenable[T any] interface {
	Bind(out chan<- T) func()
	Listen() (out <-chan T, cancel func())
}

A listenable object that one can bind listeners to.

type ListenableC

type ListenableC[T comparable] interface {
	Listenable[T]
	Until(...T)
	UntilCh(...T) (<-chan struct{}, func())
	UntilContext(context.Context, ...T)
}

A listenable object with comparable element type. This allows additional methods Until, UntilCh and UntilContext to be called.

type ListenableCM

type ListenableCM[T comparable] interface {
	ListenableC[T]
	Current() T
}

A listenable object with comparable element type and memorizes the latest value.

type ListenableM

type ListenableM[T any] interface {
	Listenable[T]
	Current() T
}

A listenable object that also memorizes the latest value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL