Documentation ¶
Overview ¶
engine contain implementation of canopsis engine.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func IsConnectionError ¶
IsConnectionError uses to check if stop engine or continue work.
Types ¶
type Consumer ¶
Consumer interface is used to implement AMQP consumer of engine. If Consume returns error engine will be stopped.
func NewDefaultConsumer ¶
func NewDefaultConsumer( name, queue string, consumePrefetchCount, consumePrefetchSize int, purgeQueue bool, nextExchange, nextQueue, fifoExchange, fifoQueue string, connection libamqp.Connection, processor MessageProcessor, logger zerolog.Logger, ) Consumer
NewDefaultConsumer creates consumer.
func NewRPCServer ¶
func NewRPCServer( name, queue string, consumePrefetchCount, consumePrefetchSize int, connection libamqp.Connection, processor MessageProcessor, logger zerolog.Logger, ) Consumer
NewRPCServer creates consumer.
type Engine ¶
type Engine interface { // AddConsumer adds AMQP consumer to engine. AddConsumer(Consumer) // AddPeriodicalWorker adds periodical worker to engine. AddPeriodicalWorker(PeriodicalWorker) // Run starts goroutines for all consumers and periodical workers. // Engine stops if one of consumer or periodical worker return error. Run(context.Context) error }
Engine interface is used to implement canopsis engine.
type MessageProcessor ¶
type MessageProcessor interface {
Process(ctx context.Context, d amqp.Delivery) (newMessage []byte, err error)
}
MessageProcessor interface is used to implement AMQP message processor of consumer. If Process returns error engine will be stopped.
type PeriodicalWorker ¶
PeriodicalWork interface is used to implement engine periodical worker. If Work returns error engine will be stopped.
func NewRunInfoPeriodicalWorker ¶
func NewRunInfoPeriodicalWorker( periodicalInterval time.Duration, manager RunInfoManager, info RunInfo, logger zerolog.Logger, ) PeriodicalWorker
type RPCClient ¶
type RPCClient interface { // Consumer receives RPC responses from AMQP queue. Consumer // Call receives RPC request and publishes it to AMQP queue. Call(m RPCMessage) error }
RPCClient interface is used to implement AMQP RPC client.
func NewRPCClient ¶
func NewRPCClient( name, serverQueueName, clientQueueName string, consumePrefetchCount, consumePrefetchSize int, processor RPCMessageProcessor, amqpChannel libamqp.Channel, logger zerolog.Logger, ) RPCClient
NewRPCClient creates new AMQP RPC client.
type RPCMessage ¶
RPCMessage is AMQP RPC request or response.
type RPCMessageProcessor ¶
type RPCMessageProcessor interface {
Process(RPCMessage) error
}
RPCMessageProcessor interface is used to implement AMQP RPC response processor of consumer. If Process returns error engine will be stopped.
type RunInfo ¶
type RunInfo struct { Name string `json:"name"` ConsumeQueue string `json:"input_queue"` PublishQueue string `json:"output_queue"` PublishExchange string `json:"output_exchange,omitempty"` }
RunInfo is engine run information to detect engines order.
type RunInfoGraph ¶
type RunInfoManager ¶
type RunInfoManager interface { Save(ctx context.Context, info RunInfo, expiration time.Duration) error Get(ctx context.Context, engineName string) (*RunInfo, error) GetAll(ctx context.Context) ([]RunInfo, error) GetGraph(ctx context.Context) (*RunInfoGraph, error) ClearAll(ctx context.Context) error }
RunInfoManager interface is used to implement engine run info storage.
func NewRunInfoManager ¶
func NewRunInfoManager(client redis.Cmdable, key ...string) RunInfoManager
NewRunInfoManager creates new run info manager.