Documentation ¶
Overview ¶
Package channel provides helpers for easier work with channels.
Example ¶
package main import ( "context" "fmt" "dexm.lol/channel" ) func main() { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() type processInput struct { message string shouldFailProcess bool shouldFailConsume bool } type processOutput struct { message string shouldFailConsume bool } chIn := make(chan processInput, 3) chIn <- processInput{message: "message 1", shouldFailProcess: false, shouldFailConsume: false} chIn <- processInput{message: "message 2", shouldFailProcess: false, shouldFailConsume: true} chIn <- processInput{message: "message 3", shouldFailProcess: true, shouldFailConsume: true} close(chIn) chProcessRes, chProcessErr := channel.Process(ctx, 2, chIn, func(ctx context.Context, in processInput) (processOutput, error) { if in.shouldFailProcess { return processOutput{}, fmt.Errorf("error processing message: %s", in.message) } res := processOutput{ message: fmt.Sprintf("processed message: %s", in.message), shouldFailConsume: in.shouldFailConsume, } return res, nil }) chConsumeErr := channel.Consume(ctx, 2, chProcessRes, func(ctx context.Context, in processOutput) error { if in.shouldFailConsume { return fmt.Errorf("error consuming message: %s", in.message) } fmt.Println("Consumed message:", in.message) return nil }) chErr := channel.Merge(ctx, chProcessErr, chConsumeErr) for err := range chErr { fmt.Println("Error received:", err.Error()) } }
Output: Consumed message: processed message: message 1 Error received: error consuming message: processed message: message 2 Error received: error processing message: message 3
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Consume ¶
func Consume[T any]( ctx context.Context, concurrency int, channel <-chan T, f func(context.Context, T) error, ) <-chan error
Consume channel concurrently. Concurrency must be greater than 0, but it makes no sense to have it less than 2. You must close input channel for error channel to be closed.
Example ¶
package main import ( "context" "fmt" "dexm.lol/channel" ) func main() { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() type input struct { message string shouldFail bool } chIn := make(chan input, 2) chIn <- input{message: "message 1", shouldFail: false} chIn <- input{message: "message 2", shouldFail: true} close(chIn) chErr := channel.Consume(ctx, 2, chIn, func(ctx context.Context, in input) error { if in.shouldFail { return fmt.Errorf("error consuming message: %s", in.message) } fmt.Println("Consumed message:", in.message) return nil }) for err := range chErr { fmt.Println("Error received:", err.Error()) } }
Output: Consumed message: message 1 Error received: error consuming message: message 2
func Merge ¶
Merge multiple channels into a single one.
Example ¶
package main import ( "context" "fmt" "dexm.lol/channel" ) func main() { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() ch1 := make(chan string, 2) ch1 <- "message 1" ch1 <- "message 2" close(ch1) ch2 := make(chan string, 2) ch2 <- "message 3" ch2 <- "message 4" close(ch2) ch3 := channel.Merge(ctx, ch1, ch2) for message := range ch3 { fmt.Println("Received message:", message) } }
Output: Received message: message 1 Received message: message 2 Received message: message 3 Received message: message 4
func Process ¶
func Process[T, R any]( ctx context.Context, concurrency int, channel <-chan T, f func(context.Context, T) (R, error), ) (<-chan R, <-chan error)
Process channel concurrently. Concurrency must be greater than 0, but it makes no sense to have it less than 2. You must close input channel for output and error channels to be closed.
Example ¶
package main import ( "context" "fmt" "dexm.lol/channel" ) func main() { ctx, cancel := context.WithCancel(context.TODO()) defer cancel() type input struct { message string shouldFail bool } chIn := make(chan input, 2) chIn <- input{message: "message 1", shouldFail: false} chIn <- input{message: "message 2", shouldFail: true} close(chIn) chRes, chErr := channel.Process(ctx, 2, chIn, func(ctx context.Context, in input) (string, error) { if in.shouldFail { return "", fmt.Errorf("error processing message: %s", in.message) } return fmt.Sprintf("processed message: %s", in.message), nil }) res := <-chRes fmt.Println("Result received:", res) err := <-chErr fmt.Println("Error received:", err.Error()) }
Output: Result received: processed message: message 1 Error received: error processing message: message 2
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.