Versions in this module Expand all Collapse all v1 v1.9.1 Aug 14, 2020 Changes in this version + const MaxReplicationFactor + var ErrAckTimeout = errors.New("publish ack timeout") + var ErrEndOfReadonlyPartition = errors.New("end of readonly partition reached") + var ErrNoSuchPartition = errors.New("stream partition does not exist") + var ErrNoSuchStream = errors.New("stream does not exist") + var ErrPartitionPaused = errors.New("stream partition has been paused") + var ErrStreamDeleted = errors.New("stream has been deleted") + var ErrStreamExists = errors.New("stream already exists") + func NewMessage(value []byte, options ...MessageOption) []byte + type Ack struct + func UnmarshalAck(data []byte) (*Ack, error) + func (a *Ack) AckInbox() string + func (a *Ack) AckPolicy() AckPolicy + func (a *Ack) CorrelationID() string + func (a *Ack) MessageSubject() string + func (a *Ack) Offset() int64 + func (a *Ack) PartitionSubject() string + func (a *Ack) Stream() string + type AckHandler func(ack *Ack, err error) + type AckPolicy int32 + type BrokerInfo struct + func (b *BrokerInfo) Addr() string + func (b *BrokerInfo) Host() string + func (b *BrokerInfo) ID() string + func (b *BrokerInfo) Port() int32 + type Client interface + Close func() error + CreateStream func(ctx context.Context, subject, name string, opts ...StreamOption) error + DeleteStream func(ctx context.Context, name string) error + FetchMetadata func(ctx context.Context) (*Metadata, error) + PauseStream func(ctx context.Context, name string, opts ...PauseOption) error + Publish func(ctx context.Context, stream string, value []byte, opts ...MessageOption) (*Ack, error) + PublishAsync func(ctx context.Context, stream string, value []byte, ackHandler AckHandler, ...) error + PublishToSubject func(ctx context.Context, subject string, value []byte, opts ...MessageOption) (*Ack, error) + SetStreamReadonly func(ctx context.Context, name string, opts ...ReadonlyOption) error + Subscribe func(ctx context.Context, stream string, handler Handler, ...) error + func Connect(addrs []string, options ...ClientOption) (Client, error) + type ClientOption func(*ClientOptions) error + func AckWaitTime(wait time.Duration) ClientOption + func KeepAliveTime(keepAlive time.Duration) ClientOption + func MaxConnsPerBroker(max int) ClientOption + func ResubscribeWaitTime(wait time.Duration) ClientOption + func TLSCert(cert string) ClientOption + func TLSConfig(config *tls.Config) ClientOption + type ClientOptions struct + AckWaitTime time.Duration + Brokers []string + KeepAliveTime time.Duration + MaxConnsPerBroker int + ResubscribeWaitTime time.Duration + TLSCert string + TLSConfig *tls.Config + func DefaultClientOptions() ClientOptions + func (o ClientOptions) Connect() (Client, error) + type Handler func(msg *Message, err error) + type Message struct + func UnmarshalMessage(data []byte) (*Message, error) + func (m *Message) Headers() map[string][]byte + func (m *Message) Key() []byte + func (m *Message) Offset() int64 + func (m *Message) Partition() int32 + func (m *Message) ReplySubject() string + func (m *Message) Stream() string + func (m *Message) Subject() string + func (m *Message) Timestamp() time.Time + func (m *Message) Value() []byte + type MessageOption func(*MessageOptions) + func AckInbox(ackInbox string) MessageOption + func AckPolicyAll() MessageOption + func AckPolicyLeader() MessageOption + func AckPolicyNone() MessageOption + func CorrelationID(correlationID string) MessageOption + func Header(name string, value []byte) MessageOption + func Headers(headers map[string][]byte) MessageOption + func Key(key []byte) MessageOption + func PartitionBy(partitioner Partitioner) MessageOption + func PartitionByKey() MessageOption + func PartitionByRoundRobin() MessageOption + func ToPartition(partition int32) MessageOption + type MessageOptions struct + AckInbox string + AckPolicy AckPolicy + CorrelationID string + Headers map[string][]byte + Key []byte + Partition *int32 + Partitioner Partitioner + type Metadata struct + func (m *Metadata) Addrs() []string + func (m *Metadata) Brokers() []*BrokerInfo + func (m *Metadata) GetStream(name string) *StreamInfo + func (m *Metadata) GetStreams() map[string]*StreamInfo + func (m *Metadata) LastUpdated() time.Time + func (m *Metadata) PartitionCountForStream(stream string) int32 + type PartitionInfo struct + func (p *PartitionInfo) ID() int32 + func (p *PartitionInfo) ISR() []*BrokerInfo + func (p *PartitionInfo) Leader() *BrokerInfo + func (p *PartitionInfo) Replicas() []*BrokerInfo + type Partitioner interface + Partition func(stream string, key, value []byte, metadata *Metadata) int32 + type PauseOption func(*PauseOptions) error + func PausePartitions(partitions ...int32) PauseOption + func ResumeAll() PauseOption + type PauseOptions struct + Partitions []int32 + ResumeAll bool + type ReadonlyOption func(*ReadonlyOptions) error + func Readonly(readonly bool) ReadonlyOption + func ReadonlyPartitions(partitions ...int32) ReadonlyOption + type ReadonlyOptions struct + Partitions []int32 + Readwrite bool + type StartPosition int32 + type StreamInfo struct + func (s *StreamInfo) GetPartition(id int32) *PartitionInfo + func (s *StreamInfo) Partitions() map[int32]*PartitionInfo + type StreamOption func(*StreamOptions) error + func CleanerInterval(val time.Duration) StreamOption + func CompactEnabled(val bool) StreamOption + func CompactMaxGoroutines(val int32) StreamOption + func Group(group string) StreamOption + func MaxReplication() StreamOption + func Partitions(partitions int32) StreamOption + func ReplicationFactor(replicationFactor int32) StreamOption + func RetentionMaxAge(val time.Duration) StreamOption + func RetentionMaxBytes(val int64) StreamOption + func RetentionMaxMessages(val int64) StreamOption + func SegmentMaxAge(val time.Duration) StreamOption + func SegmentMaxBytes(val int64) StreamOption + type StreamOptions struct + CleanerInterval *time.Duration + CompactEnabled *bool + CompactMaxGoroutines *int32 + Group string + Partitions int32 + ReplicationFactor int32 + RetentionMaxAge *time.Duration + RetentionMaxBytes *int64 + RetentionMaxMessages *int64 + SegmentMaxAge *time.Duration + SegmentMaxBytes *int64 + type SubscriptionOption func(*SubscriptionOptions) error + func Partition(partition int32) SubscriptionOption + func ReadISRReplica() SubscriptionOption + func StartAtEarliestReceived() SubscriptionOption + func StartAtLatestReceived() SubscriptionOption + func StartAtOffset(offset int64) SubscriptionOption + func StartAtTime(start time.Time) SubscriptionOption + func StartAtTimeDelta(ago time.Duration) SubscriptionOption + type SubscriptionOptions struct + Partition int32 + ReadISRReplica bool + StartOffset int64 + StartPosition StartPosition + StartTimestamp time.Time