elasticqueue

package module
v0.0.0-...-5a76644 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2018 License: MIT Imports: 11 Imported by: 1

README

elasticqueue GoDoc Build Status

A utility library around olivere/elastic which lets you insert records to be written into a queue, which then gets bulked up when it meets a certain condition and sent to ElasticSearch. ElasticSearch operations are cheaper in bulk.

See the Godoc reference for full usage details.

// Create your ElasticSearch client as you normally would.
client, err := elastic.NewClient()
if err != nil {
    panic(err)
}
defer client.Stop()

// Define a new queue. Here, our queue will send when either we don't
// write any new documents for 10 seconds, or we have 100 documents waiting
// to be submitted. We also define a backoff policy to automatically rewrite writes.
queue := elasticqueue.NewQueue(client,
    elasticqueue.WithCondition(elasticqueue.WriteAfterIdle(10*time.Second)),
    elasticqueue.WithCondition(elasticqueue.WriteAfterLength(100)),
    elasticqueue.WithBackoff(elastic.NewExponentialBackoff(time.Second, time.Second*10)))

// Make sure to gracefully close the queue before your application exits
// so that any pending items get written out!
defer queue.Close()

// your logic...
for i := 0; i < 10; i++ {
    queue.Store("my-index", "my-type", MyElasticSearchRecord{Cool: true})
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ClientRequester

type ClientRequester struct {
	Client  *elastic.Client
	Timeout time.Duration
}

ClientRequester is the default Requester that wraps an ElasticSearch client.

func (*ClientRequester) Send

func (c *ClientRequester) Send(data []elastic.BulkableRequest) error

Send implements Requester.Send.

type Condition

type Condition interface {
	// Inserted is called synchronously whenever a document is inserted into
	// the ElasticSearch queue. It should return true if the queue should be
	// immediately flushed to ElasticSearch.
	Inserted(document []byte, length int) (writeImmediately bool)

	// Write is called by the Queue once when first created. It returns a
	// channel which should emit whenever. This may return nil if the
	// condition does not care to do asynchronous writes.
	Write() (doWrite <-chan struct{}, cancel func())

	// Flushed is called whenever the queue is written out.
	Flushed()
}

Condition is passed to the Queue to determine when it should be written out.

func WriteAfterIdle

func WriteAfterIdle(interval time.Duration) Condition

WriteAfterIdle returns a write condition which'll cause the queue to be written out to ElasticSearch after no documents are written for a period of time.

func WriteAfterInterval

func WriteAfterInterval(interval time.Duration) Condition

WriteAfterInterval returns a write condition which'll cause the queue to be written out after a constant amount of time after the first write.

func WriteAfterLength

func WriteAfterLength(length int) Condition

WriteAfterLength returns a write condition which'll cause the queue to be written out after it reaches a predefined length.

func WriterAfterByteSize

func WriterAfterByteSize(maxBytes int) Condition

WriterAfterByteSize returns a write condition which'll cause the queue to be written out after it's more than "maxBytes" bytes long.

type Option

type Option func(q *Queue)

Option is passed to NewQueue to configure it.

func WithBackoff

func WithBackoff(backoff elastic.Backoff) Option

WithBackoff sets the backoff for ElasticSearch write retries. The backoff is permitted to be `nil` (the default) in which case no retries will be made against ElasticSearch.

func WithCondition

func WithCondition(conditions ...Condition) Option

WithCondition sets the write conditions for the queue.

func WithErrorHandler

func WithErrorHandler(errorHandler func(err error)) Option

WithErrorHandler sets the function that's called whenever an error occurs in a background ElasticSearch write.

func WithRequester

func WithRequester(requester Requester) Option

WithRequester sets the requester for the queue. This is primarily for testing purposes and can usually omitted unless you'd like low-level control over client behavior.

func WithTimeout

func WithTimeout(timeout time.Duration) Option

WithTimeout sets the timeout for ElasticSearch write operations.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is the implementation of the ElasticSearch queue.

func NewQueue

func NewQueue(client *elastic.Client, options ...Option) *Queue

NewQueue creates a new ElasticSearch queue around the provided client. Note that you are required to provide at least on Condition using WithCondition.

func (*Queue) Close

func (q *Queue) Close()

Close tears down resources and writes any pending operations.

func (*Queue) Store

func (q *Queue) Store(index, kind string, doc interface{}) (err error)

Store writes the document, or queues it for writing later. This is thread-safe.

func (*Queue) StoreWithId

func (q *Queue) StoreWithId(index, kind, id string, doc interface{}) (err error)

Store writes the document, or queues it for writing later. This is thread-safe.

type Requester

type Requester interface {
	// Send submits the bulk request to the server.
	Send(data []elastic.BulkableRequest) error
}

Requester is a simple interface around the client primarily for easy mocking in tests.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL