service

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2024 License: MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNotAvailable = fmt.Errorf("not available")

Functions

func WithCodec

func WithCodec(c options.Codec) options.Option

WithCodec will configure the codec.

func WithContext

func WithContext(ctx context.Context) options.Option

WithContext sets the context for the publisher.

func WithKnownIdentities

func WithKnownIdentities(ids ...string) options.Option

WithKnownIdentities will add known identities as base58 encoded ed25519 public keys. Only Messages from publishers identified by these keys will be accepted.

func WithKnownPublicKey

func WithKnownPublicKey(keys ...ed25519.PublicKey) options.Option

WithKnownPublicKey will add known public keys. Only Messages from publishers identified by these keys will be accepted.

func WithLogger added in v0.2.0

func WithLogger(logger *slog.Logger) options.Option

WithLogger sets the logger for the publisher.

func WithNKeySeed

func WithNKeySeed(contents string) options.Option

WithNKeySeed will decode decorated NATS NKey and use it for identity.

func WithName

func WithName(name string) options.Option

WithName sets name of the publisher. The subject is in the form of {prefix}.{name}. Subscriptions and publishing will use the subject constructed from prefix and name.

func WithNats

func WithNats(nc options.NatsConn) options.Option

WithNats sets up preconfigured NATS connector for publishing and subscribing.

func WithParam

func WithParam(key string, val any) options.Option

WithParam adds a key-value pair to the configuration of the publisher.

func WithPemPrivateKey

func WithPemPrivateKey(keyFile string) options.Option

WithPemPrivateKey will load ED25519 private key from a PEM file and use it for identity.

func WithPrefix

func WithPrefix(prefix string) options.Option

WithPrefix sets the prefix for the subject. The subject is in the form of {prefix}.{name}. Subscriptions and publishing will use the subject constructed from prefix and name.

func WithPrivateKey

func WithPrivateKey(pkey crypto.PrivateKey) options.Option

WithPrivateKey will configure identity using ED25519 crypto.PrivateKey.

func WithPubNats

func WithPubNats(nc options.NatsConn) options.Option

WithPubNats sets up preconfigured NATS connector specifically for publishing.

func WithPublishQueueSize

func WithPublishQueueSize(n int) options.Option

WithPublishQueueSize will configure the size of the publish queue.

func WithSubNats

func WithSubNats(nc options.NatsConn) options.Option

WithSubNats sets up preconfigured NATS connector speficivally for subscribing.

func WithTelemetryPeriod

func WithTelemetryPeriod(p time.Duration) options.Option

WithTelemetryPeriod will configure verbosity level.

func WithUserCreds

func WithUserCreds(path string) options.Option

WithUserCreds will load NKey from NATS User Credentials file and use it for identity.

func WithVerbose

func WithVerbose(v bool) options.Option

WithVerbose will configure verbosity level.

Types

type JetStreamer added in v0.2.0

type JetStreamer interface {
	JetStream(opts ...nats.JSOpt) (nats.JetStreamContext, error)
}

type Message

type Message interface {
	Ack(opts ...nats.AckOpt) error
	AckSync(opts ...nats.AckOpt) error
	Equal(msg Message) bool
	InProgress(opts ...nats.AckOpt) error
	Metadata() (*nats.MsgMetadata, error)
	Nak(opts ...nats.AckOpt) error
	NakWithDelay(delay time.Duration, opts ...nats.AckOpt) error
	Respond(any) error
	Term(opts ...nats.AckOpt) error

	Message() *nats.Msg
	Subject() string
	Reply() string
	Data() []byte
	Header() nats.Header
	QueueName() string
}

type MessageHandler

type MessageHandler func(msg Message)

type Service

type Service struct {
	options.Options
	// contains filtered or unexported fields
}

Service is the base publisher structure. It must be embedded in the publisher to benefit from the common implementation. Config method must be called on this embedded struct in order to properly set it up.

func (*Service) AddStatusCallback

func (b *Service) AddStatusCallback(callback StatusFunc)

AddStatusCallback will register a status callback.

func (*Service) AddStream added in v0.2.0

func (b *Service) AddStream(maxMsgs, maxBytes uint64, age time.Duration, subjects ...string) error

AddStream is an experimental feature that creates a durable stream. It is possible to subscribe to this durable stream using regular Subscribe or SubscribeTo methods given that the subject is included in the created stream.

The interface for this feature is experimental and it should be expected to change.

NOTE: Messages are automatically acknowledged after handler returns.

func (*Service) Close

func (b *Service) Close() error

Close should be closed to clean-up the publisher.

func (*Service) Configure

func (b *Service) Configure(opts ...options.Option) error

Configure must be called by the publisher implementation.

func (*Service) Fail

func (b *Service) Fail(err error)

Fail is a convenience function that allows to asynchronously propagate errors.

func (*Service) GetStreamId

func (b *Service) GetStreamId(nmsg *nats.Msg, suffixes ...string) ([]string, string)

GetStreamId will return the stream parts as well as joined stream id. For example it will return (`[query, part1, part2, part3]`, `part1.part2.part3`) if the message was received on subject `prefix.name.query.part1.part2.part3`.

func (*Service) GetStreamIdParts

func (b *Service) GetStreamIdParts(nmsg *nats.Msg, suffixes ...string) []string

GetStreamIdParts returns subject parts that are considered as a stream identifier. For example, if you receive messages on `prefix.name.query.>`, then every subtopic in place of `>` will be considered as a stream id. Suffixes argument in this case will be `query.>`. In this example, GetStreamIdParts will return `query.part1.part2.part3` if the message was received on subject `prefix.name.query.part1.part2.part3`.

func (*Service) Publish

func (b *Service) Publish(msg any, suffixes ...string) error

Publish will sign the message and publish it. It will sign the message in place. Also it will update the timestamp and identity fields.

func (*Service) PublishBuf

func (b *Service) PublishBuf(buf []byte, suffixes ...string) error

PublishBuf will publish the raw bytes.

func (*Service) PublishBufTo

func (b *Service) PublishBufTo(buf []byte, suffixes ...string) error

PublishBufTo will publish the raw bytes to a specific subject.

func (*Service) PublishTo

func (b *Service) PublishTo(msg any, suffixes ...string) error

PublishTo will sign the message and publish it to a specific subject. It will sign the message in place. Also it will update the timestamp and identity fields.

func (*Service) RemoveStatusCallback

func (b *Service) RemoveStatusCallback(callback StatusFunc)

RemoveStatusCallback will remove a status callback.

func (*Service) RemoveStream added in v0.2.0

func (b *Service) RemoveStream(subjects ...string) error

RemoveStream will attempt to remove consumers and streams based on a list of subjects. List of subjects must be exactly the same as was used in AddStream since js Stream and js Consumer names are based on the subjects.

func (*Service) RequestBufFrom

func (b *Service) RequestBufFrom(ctx context.Context, buf []byte, suffixes ...string) (*nats.Msg, error)

RequestBufFrom requests a reply or a stream from a subject using subscribing NATS connection. This a synchronous operation that does not involve publisher queue.

func (*Service) RequestFrom

func (b *Service) RequestFrom(ctx context.Context, msg any, suffixes ...string) (*nats.Msg, error)

RequestFrom requests a reply or a stream from a subject using subscribing NATS connection. This a synchronous operation that does not involve publisher queue.

func (*Service) Respond

func (b *Service) Respond(msg *nats.Msg, buf []byte, suffixes ...string) error

func (*Service) RespondBuf

func (b *Service) RespondBuf(msg *nats.Msg, buf []byte, suffixes ...string) error

func (*Service) Sign

func (b *Service) Sign(msg []byte) (signature []byte, publicKey []byte, err error)

Sign will sign the bytes.

func (*Service) Start

func (b *Service) Start() context.Context

Start will start the publisher base implementation that includes telemetry and other internal goroutines.

func (*Service) Subscribe

func (b *Service) Subscribe(handler MessageHandler, suffixes ...string) (*nats.Subscription, error)

Subscribe will subscribe to a subject constructed from {prefix}.{name}.{...suffixes}, where suffixes are joined using ".".

func (*Service) SubscribeTo

func (b *Service) SubscribeTo(handler MessageHandler, suffixes ...string) (*nats.Subscription, error)

SubscribeTo will subscribe to a subject constructed {...suffixes}, where suffixes are joined using ".".

Experimental: When a stream was registered with AddStream SubscribeTo will use durable stream instead of realtime.

func (*Service) Unmarshal

func (b *Service) Unmarshal(nmsg Message, msg any) (nats.Header, error)

Unmarshal is a convenience function that first verifies any signatures in the message and unmarshals bytes into a message.

func (*Service) Verify

func (b *Service) Verify(nmsg Message) error

Verify will marshal the unmarshalled payload back to bytes and verifies the signature with that.

type StatusFunc

type StatusFunc func() map[string]any

StatusFunc is a type for callback func. This will be called periodically to construct telemetry status. "uptime", "goroutines", "period" keys will be overriden internally.

type Subject added in v0.2.0

type Subject string

Subject represents a NATS subject which can include wildcards.

func (Subject) Match added in v0.2.0

func (s Subject) Match(subject Subject) bool

Match tries to pattern-match the subject against another subject. It considers NATS wildcard rules where '*' matches any token at a level, and '>' matches all subsequent tokens.

func (Subject) String added in v0.2.0

func (s Subject) String() string

String converts the Subject back to its string representation.

func (Subject) SymmetricMatch added in v0.2.0

func (s Subject) SymmetricMatch(subject Subject) bool

SymmetricMatch tries to pattern-match a subject against another subject in both directions. It considers a match if either subject matches the other according to NATS wildcard rules.

func (Subject) Tokens added in v0.2.0

func (s Subject) Tokens() []string

Tokens splits the Subject into its constituent parts, divided by '.'.

func (Subject) Validate added in v0.2.0

func (s Subject) Validate() error

Validate checks if the Subject contains any characters that are not allowed.

type SubjectMap added in v0.2.0

type SubjectMap map[Subject]int

SubjectMap maps subjects (as strings) to indices to an array that holds stream and consumer configs.

func (SubjectMap) Add added in v0.2.0

func (m SubjectMap) Add(subject Subject, idx int) error

Add inserts a subject and its associated index into the map. It returns an error if the subject is invalid, but in the current implementation, it always succeeds.

func (SubjectMap) Get added in v0.2.0

func (m SubjectMap) Get(subject Subject) (int, bool)

Get returns the index stored

func (SubjectMap) Search added in v0.2.0

func (m SubjectMap) Search(subject Subject) (Subject, int, bool)

Search looks for a subject in the map that matches the given subject according to NATS pattern matching rules. Returns the matching subject, its associated index, and true if a match is found. If no match is found, it returns empty string, 0, and false.

func (SubjectMap) SymmetricSearch added in v0.2.0

func (m SubjectMap) SymmetricSearch(subject Subject) (Subject, int, bool)

SymmetricSearch looks for a subject in the map that symmetrically matches the given subject. Symmetric matching means either the map's subject matches the given subject or vice versa. Returns the matching subject, its associated index, and true if a match is found. If no match is found, it returns empty string, 0, and false.

type Telemetry

type Telemetry struct {
	Nonce  string         `json:"nonce"`
	Status map[string]any `json:"status"`
}

type TelemetryPing

type TelemetryPing struct {
	Nonce     string `json:"nonce"`
	Timestamp string `json:"timestamp"`
}

type TelemetryPong

type TelemetryPong struct {
	Nonce     string `json:"nonce"`
	Timestamp string `json:"timestamp"`
	Owl       string `json:"owl"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL