kafka

package
v1.0.402 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2023 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewKafka

func NewKafka(tag string, tracing tracing.Tracing,
	lo logger.Logger, config database.KafkaProviderConfig) database.Kafka

Types

type Consumer

type Consumer struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(tracing tracing.Tracing, lg logger.Logger, config database.KafkaProviderConfig,
	options database.KafkaOptions,
	callback database.ConsumerCallback,
	store *Stores) *Consumer

func (*Consumer) Run

func (c *Consumer) Run()

type Kafka

type Kafka struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*Kafka) Consumer

func (c *Kafka) Consumer(options database.KafkaOptions, callback database.ConsumerCallback)

func (*Kafka) Producer

func (c *Kafka) Producer(isReady database.ProducerIsReady)

func (*Kafka) Push

func (c *Kafka) Push(ctx context.Context, id string, options database.KafkaOptions, body interface{}, cb database.ConsumerCallback) error

type MsgSend

type MsgSend struct {
	ID      string
	Data    []byte
	Name    string
	Headers map[string]interface{}
}

type Producer

type Producer struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(lg logger.Logger,
	config database.KafkaProviderConfig, store *Stores) *Producer

func (*Producer) Run

func (c *Producer) Run(isReady database.ProducerIsReady) (err error)

func (*Producer) SendingData

func (c *Producer) SendingData(id string, options database.KafkaOptions, body interface{}, headers map[string]interface{}, cb database.ConsumerCallback)

type Stores

type Stores struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewStore

func NewStore() *Stores

func (*Stores) Delete

func (c *Stores) Delete(id string)

func (*Stores) Get

func (c *Stores) Get(id string) (cb database.ConsumerCallback, ok bool)

func (*Stores) LoadClient

func (c *Stores) LoadClient(id string) (client *Kafka, ok bool)

func (*Stores) LoadClientByTag

func (c *Stores) LoadClientByTag(ta string) (client *Kafka, ok bool)

func (*Stores) Put

func (c *Stores) Put(id string, cb database.ConsumerCallback)

func (*Stores) StoreClient

func (c *Stores) StoreClient(client *Kafka)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL