Documentation ¶
Index ¶
- Variables
- func WithCodec(c options.Codec) options.Option
- func WithContext(ctx context.Context) options.Option
- func WithKnownIdentities(ids ...string) options.Option
- func WithKnownPublicKey(keys ...ed25519.PublicKey) options.Option
- func WithLogger(logger *slog.Logger) options.Option
- func WithNKeySeed(contents string) options.Option
- func WithName(name string) options.Option
- func WithNats(nc options.NatsConn) options.Option
- func WithParam(key string, val any) options.Option
- func WithPemPrivateKey(keyFile string) options.Option
- func WithPrefix(prefix string) options.Option
- func WithPrivateKey(pkey crypto.PrivateKey) options.Option
- func WithPubNats(nc options.NatsConn) options.Option
- func WithPublishQueueSize(n int) options.Option
- func WithSubNats(nc options.NatsConn) options.Option
- func WithTelemetryPeriod(p time.Duration) options.Option
- func WithUserCreds(path string) options.Option
- func WithVerbose(v bool) options.Option
- type JetStreamer
- type Message
- type MessageHandler
- type Service
- func (b *Service) AddStatusCallback(callback StatusFunc)
- func (b *Service) AddStream(maxMsgs, maxBytes uint64, age time.Duration, subjects ...string) error
- func (b *Service) Close() error
- func (b *Service) Configure(opts ...options.Option) error
- func (b *Service) Fail(err error)
- func (b *Service) GetStreamId(nmsg *nats.Msg, suffixes ...string) ([]string, string)
- func (b *Service) GetStreamIdParts(nmsg *nats.Msg, suffixes ...string) []string
- func (b *Service) Publish(msg any, suffixes ...string) error
- func (b *Service) PublishBuf(buf []byte, suffixes ...string) error
- func (b *Service) PublishBufTo(buf []byte, suffixes ...string) error
- func (b *Service) PublishTo(msg any, suffixes ...string) error
- func (b *Service) RemoveStatusCallback(callback StatusFunc)
- func (b *Service) RemoveStream(subjects ...string) error
- func (b *Service) RequestBufFrom(ctx context.Context, buf []byte, suffixes ...string) (*nats.Msg, error)
- func (b *Service) RequestFrom(ctx context.Context, msg any, suffixes ...string) (*nats.Msg, error)
- func (b *Service) Respond(msg *nats.Msg, buf []byte, suffixes ...string) error
- func (b *Service) RespondBuf(msg *nats.Msg, buf []byte, suffixes ...string) error
- func (b *Service) Sign(msg []byte) (signature []byte, publicKey []byte, err error)
- func (b *Service) Start() context.Context
- func (b *Service) Subscribe(handler MessageHandler, suffixes ...string) (*nats.Subscription, error)
- func (b *Service) SubscribeTo(handler MessageHandler, suffixes ...string) (*nats.Subscription, error)
- func (b *Service) Unmarshal(nmsg Message, msg any) (nats.Header, error)
- func (b *Service) Verify(nmsg Message) error
- type StatusFunc
- type Subject
- type SubjectMap
- type Telemetry
- type TelemetryPing
- type TelemetryPong
Constants ¶
This section is empty.
Variables ¶
var ErrNotAvailable = fmt.Errorf("not available")
Functions ¶
func WithContext ¶
WithContext sets the context for the publisher.
func WithKnownIdentities ¶
WithKnownIdentities will add known identities as base58 encoded ed25519 public keys. Only Messages from publishers identified by these keys will be accepted.
func WithKnownPublicKey ¶
WithKnownPublicKey will add known public keys. Only Messages from publishers identified by these keys will be accepted.
func WithLogger ¶ added in v0.2.0
WithLogger sets the logger for the publisher.
func WithNKeySeed ¶
WithNKeySeed will decode decorated NATS NKey and use it for identity.
func WithName ¶
WithName sets name of the publisher. The subject is in the form of {prefix}.{name}. Subscriptions and publishing will use the subject constructed from prefix and name.
func WithPemPrivateKey ¶
WithPemPrivateKey will load ED25519 private key from a PEM file and use it for identity.
func WithPrefix ¶
WithPrefix sets the prefix for the subject. The subject is in the form of {prefix}.{name}. Subscriptions and publishing will use the subject constructed from prefix and name.
func WithPrivateKey ¶
func WithPrivateKey(pkey crypto.PrivateKey) options.Option
WithPrivateKey will configure identity using ED25519 crypto.PrivateKey.
func WithPubNats ¶
WithPubNats sets up preconfigured NATS connector specifically for publishing.
func WithPublishQueueSize ¶
WithPublishQueueSize will configure the size of the publish queue.
func WithSubNats ¶
WithSubNats sets up preconfigured NATS connector speficivally for subscribing.
func WithTelemetryPeriod ¶
WithTelemetryPeriod will configure verbosity level.
func WithUserCreds ¶
WithUserCreds will load NKey from NATS User Credentials file and use it for identity.
func WithVerbose ¶
WithVerbose will configure verbosity level.
Types ¶
type JetStreamer ¶ added in v0.2.0
type JetStreamer interface {
JetStream(opts ...nats.JSOpt) (nats.JetStreamContext, error)
}
type Message ¶
type Message interface { Ack(opts ...nats.AckOpt) error AckSync(opts ...nats.AckOpt) error Equal(msg Message) bool InProgress(opts ...nats.AckOpt) error Metadata() (*nats.MsgMetadata, error) Nak(opts ...nats.AckOpt) error NakWithDelay(delay time.Duration, opts ...nats.AckOpt) error Respond(any) error Term(opts ...nats.AckOpt) error Message() *nats.Msg Subject() string Reply() string Data() []byte Header() nats.Header QueueName() string }
type MessageHandler ¶
type MessageHandler func(msg Message)
type Service ¶
Service is the base publisher structure. It must be embedded in the publisher to benefit from the common implementation. Config method must be called on this embedded struct in order to properly set it up.
func (*Service) AddStatusCallback ¶
func (b *Service) AddStatusCallback(callback StatusFunc)
AddStatusCallback will register a status callback.
func (*Service) AddStream ¶ added in v0.2.0
AddStream is an experimental feature that creates a durable stream. It is possible to subscribe to this durable stream using regular Subscribe or SubscribeTo methods given that the subject is included in the created stream.
The interface for this feature is experimental and it should be expected to change.
NOTE: Messages are automatically acknowledged after handler returns.
func (*Service) Fail ¶
Fail is a convenience function that allows to asynchronously propagate errors.
func (*Service) GetStreamId ¶
GetStreamId will return the stream parts as well as joined stream id. For example it will return (`[query, part1, part2, part3]`, `part1.part2.part3`) if the message was received on subject `prefix.name.query.part1.part2.part3`.
func (*Service) GetStreamIdParts ¶
GetStreamIdParts returns subject parts that are considered as a stream identifier. For example, if you receive messages on `prefix.name.query.>`, then every subtopic in place of `>` will be considered as a stream id. Suffixes argument in this case will be `query.>`. In this example, GetStreamIdParts will return `query.part1.part2.part3` if the message was received on subject `prefix.name.query.part1.part2.part3`.
func (*Service) Publish ¶
Publish will sign the message and publish it. It will sign the message in place. Also it will update the timestamp and identity fields.
func (*Service) PublishBuf ¶
PublishBuf will publish the raw bytes.
func (*Service) PublishBufTo ¶
PublishBufTo will publish the raw bytes to a specific subject.
func (*Service) PublishTo ¶
PublishTo will sign the message and publish it to a specific subject. It will sign the message in place. Also it will update the timestamp and identity fields.
func (*Service) RemoveStatusCallback ¶
func (b *Service) RemoveStatusCallback(callback StatusFunc)
RemoveStatusCallback will remove a status callback.
func (*Service) RemoveStream ¶ added in v0.2.0
RemoveStream will attempt to remove consumers and streams based on a list of subjects. List of subjects must be exactly the same as was used in AddStream since js Stream and js Consumer names are based on the subjects.
func (*Service) RequestBufFrom ¶
func (b *Service) RequestBufFrom(ctx context.Context, buf []byte, suffixes ...string) (*nats.Msg, error)
RequestBufFrom requests a reply or a stream from a subject using subscribing NATS connection. This a synchronous operation that does not involve publisher queue.
func (*Service) RequestFrom ¶
RequestFrom requests a reply or a stream from a subject using subscribing NATS connection. This a synchronous operation that does not involve publisher queue.
func (*Service) RespondBuf ¶
func (*Service) Start ¶
Start will start the publisher base implementation that includes telemetry and other internal goroutines.
func (*Service) Subscribe ¶
func (b *Service) Subscribe(handler MessageHandler, suffixes ...string) (*nats.Subscription, error)
Subscribe will subscribe to a subject constructed from {prefix}.{name}.{...suffixes}, where suffixes are joined using ".".
func (*Service) SubscribeTo ¶
func (b *Service) SubscribeTo(handler MessageHandler, suffixes ...string) (*nats.Subscription, error)
SubscribeTo will subscribe to a subject constructed {...suffixes}, where suffixes are joined using ".".
Experimental: When a stream was registered with AddStream SubscribeTo will use durable stream instead of realtime.
type StatusFunc ¶
StatusFunc is a type for callback func. This will be called periodically to construct telemetry status. "uptime", "goroutines", "period" keys will be overriden internally.
type Subject ¶ added in v0.2.0
type Subject string
Subject represents a NATS subject which can include wildcards.
func (Subject) Match ¶ added in v0.2.0
Match tries to pattern-match the subject against another subject. It considers NATS wildcard rules where '*' matches any token at a level, and '>' matches all subsequent tokens.
func (Subject) String ¶ added in v0.2.0
String converts the Subject back to its string representation.
func (Subject) SymmetricMatch ¶ added in v0.2.0
SymmetricMatch tries to pattern-match a subject against another subject in both directions. It considers a match if either subject matches the other according to NATS wildcard rules.
type SubjectMap ¶ added in v0.2.0
SubjectMap maps subjects (as strings) to indices to an array that holds stream and consumer configs.
func (SubjectMap) Add ¶ added in v0.2.0
func (m SubjectMap) Add(subject Subject, idx int) error
Add inserts a subject and its associated index into the map. It returns an error if the subject is invalid, but in the current implementation, it always succeeds.
func (SubjectMap) Get ¶ added in v0.2.0
func (m SubjectMap) Get(subject Subject) (int, bool)
Get returns the index stored
func (SubjectMap) Search ¶ added in v0.2.0
func (m SubjectMap) Search(subject Subject) (Subject, int, bool)
Search looks for a subject in the map that matches the given subject according to NATS pattern matching rules. Returns the matching subject, its associated index, and true if a match is found. If no match is found, it returns empty string, 0, and false.
func (SubjectMap) SymmetricSearch ¶ added in v0.2.0
func (m SubjectMap) SymmetricSearch(subject Subject) (Subject, int, bool)
SymmetricSearch looks for a subject in the map that symmetrically matches the given subject. Symmetric matching means either the map's subject matches the given subject or vice versa. Returns the matching subject, its associated index, and true if a match is found. If no match is found, it returns empty string, 0, and false.