esworker

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2020 License: MIT Imports: 16 Imported by: 0

README

go-esworker

go-esworker is an async worker that documents can bulk insert, update, delete to the elasticsearch using Golang. It is support to an infrastructure on AWS, GCP, Elastic Cloud, and so on.

license

Installation

go get -u github.com/gjbae1212/go-esworker

Usage

import (
	"context"
	"log"

	"github.com/gjbae1212/go-esworker"
)

func main() {

	// Create dispatcher
	dispatcher, err := esworker.NewDispatcher(
    		esworker.WithESVersionOption(esworker.V6),
    		esworker.WithAddressesOption([]string{"http://localhost:9200"}),
    		esworker.WithUsernameOption("user"),
    		esworker.WithPasswordOption("password"),
    		esworker.WithErrorHandler(func(err error) {
    			log.Println(err)
    		}),
    	)
    	if err != nil {
    		log.Panic(err)
    	}

	// Start dispatcher
	if err := dispatcher.Start(); err != nil {
		log.Panic(err)
	}

	// Process operations in bulk.
	ctx := context.Background()
	// create doc
	dispatcher.AddAction(ctx, &esworker.StandardAction{
		op:    esworker.ES_CREATE,
		index: "allan",
		id:    "1",
		doc:   map[string]interface{}{"field1": 10},
	})

	// update doc
	dispatcher.AddAction(ctx, &esworker.StandardAction{
		op:    esworker.ES_UPDATE,
		index: "allan",
		id:    "1",
		doc:   map[string]interface{}{"field1": 20},
	})

	// delete doc
	dispatcher.AddAction(ctx, &esworker.StandardAction{
		op:    esworker.ES_DELETE,
		index: "allan",
		id:    "1",
	})
}

Dispatcher Parameters

It should pass parameters for dependency injection when you are creating a go-esworker dispatcher.
A list to support the parameters below.

method name description value state
WithESVersionOption ElasticSearch Version esworker.V5, esworker.V6, esworker.V7 default V6
WithAddressesOption ElasticSearch Address default http://localhost:9200
WithUsernameOption ElasticSearch Username for HTTP basic authentication optional
WithPasswordOption ElasticSearch Password for HTTP basic authentication optional
WithCloudIdOption ID for Elastic Cloud optional
WithApiKeyOption Base64-Encoded value for authorization(api-key) optional(if set, overrides username and password)
WithTransportOption Http transport default http default transport
WithLoggerOption Logger optional
WithGlobalQueueSizeOption Global queue max size default 5000
WithWorkerSizeOption Worker size default 5
WithWorkerQueueSizeOption Worker max queue size default 5
WithWorkerWaitInterval Deal with data in worker queue after every interval time default 2 * time.Second
WithErrorHandler A function that deals with an error when an error is raised optional

Action Interface

To deal with operation as insert and update and delete to, you would use to the StandardAction struct or a struct which is implementing esworker.Action interface.

// generate and start dispatcher 
dispatcher, _ := esworker.NewDispatcher()
dispatcher.Start()

// Ex) Standard Action Example
act := &esworker.StandardAction{
	Op: ES_CREATE
	Index: "sample",
	DocType: "_doc",
	Id: "test-id",
	Doc: map[string]interface{}{"field": 1},
}
dispatcher.AddAction(context.Background(), act)


// Ex) Custom Action Example
sampleAction struct {}

func (act *sampleAction) GetOperation() esworker.ESOperation {
	// return esworker.ES_CREATE
	// return esworker.ES_INDEX
	// return esworker.ES_UPDATE
	// return esworker.ES_DELETE    
}

func (act *sampleAction) GetIndex() string {
	// return "your index name"
}

func (act *sampleAction) GetDocType() string {
	//return ""
	//return "doc type"	
}

func (act *sampleAction) GetID() string {
	//return ""
	//return "doc id"		
}

func (act *sampleAction) GetDoc() map[string]interface{} {
	//return map[string]interface{}{}
}
dispatcher.AddAction(context.Background(), &sampleAction{})

If you will make to a custom struct which is implementing esworker.Action interface, it must implement 5 methods.

name description
GetOperation ES_CREATE, ES_INDEX, ES_UPDATE, ES_DELETE
GetIndex index name
GetDocType doc type (if it is returned an empty string, default _doc or doc)
GetID doc id (if an operation is ES_INDEX, possible empty string)
GetDoc doc data

Elastic Cloud

If you use to infrastructure on Elastic Cloud, you could access to ElasticSearch without endpoint and basic authentication. (How to use API-KEY)

dispatcher, err := esworker.NewDispatcher(
		esworker.WithESVersionOption(esworker.V7),
		esworker.WithCloudIdOption("your-cloud-id"),
		esworker.WithApiKeyOption("api-key"),
)
dispatcher.Start()
LICENSE

This project is following The MIT.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Action

type Action interface {
	GetOperation() ESOperation
	GetIndex() string
	GetDocType() string
	GetID() string
	GetDoc() map[string]interface{}
}

Action is an operation that could create or update or delete to document.

type Dispatcher

type Dispatcher interface {
	AddAction(ctx context.Context, action Action) error
	Start() error
	Stop() error
}

Dispatcher is an interface of workers orchestration that could manage all of Action and control all of the process flows.

func NewDispatcher

func NewDispatcher(opts ...Option) (Dispatcher, error)

NewDispatcher is to make Dispatcher.

type ESOperation

type ESOperation int

ESOperation is a type of elasticsearch.

const (
	ES_INDEX ESOperation = iota
	ES_CREATE
	ES_UPDATE
	ES_DELETE
)

ESOperation supports to index, create, update, delete.

func (ESOperation) GetString

func (ao ESOperation) GetString() string

GetString converts int to string value.

type ESProxy

type ESProxy interface {
	Bulk(ctx context.Context, acts []Action) (bulk *ESResponseBulk, err error)
}

ESProxy is an interface that actually request the elasticserach.

type ESResponseBulk

type ESResponseBulk struct {
	Errors bool             `json:"errors"`
	Items  []ESResponseItem `json:"items"`
}

it is response structs of elasticserach.

func (*ESResponseBulk) Count

func (bulk *ESResponseBulk) Count() (success int, fail int)

Count returns a success and fail count.

func (*ESResponseBulk) ResultError added in v1.0.6

func (bulk *ESResponseBulk) ResultError() error

ResultErrors returns bulk result error.

type ESResponseCause

type ESResponseCause struct {
	Type   string `json:"type"`
	Reason string `json:"reason"`
}

it is response structs of elasticserach.

type ESResponseError

type ESResponseError struct {
	Type   string          `json:"type"`
	Reason string          `json:"reason"`
	Cause  ESResponseCause `json:"caused_by"`
}

it is response structs of elasticserach.

type ESResponseItem

type ESResponseItem struct {
	Index  ESResponseStatus `json:"index"`
	Create ESResponseStatus `json:"create"`
	Update ESResponseStatus `json:"update"`
	Delete ESResponseStatus `json:"delete"`
}

it is response structs of elasticserach.

type ESResponseStatus

type ESResponseStatus struct {
	Id     string          `json:"_id"`
	Result string          `json:"result"`
	Status int             `json:"status"`
	Error  ESResponseError `json:"error"`
}

it is response structs of elasticserach.

type ESVersion

type ESVersion int

ESVersion would identify a version about elastic search.

const (
	V5 ESVersion = iota
	V6
	V7
)

func (ESVersion) GetString

func (v ESVersion) GetString() string

type ErrorHandler

type ErrorHandler func(error)

ErrorHandler is called when an error is raised.

type Logger

type Logger struct {
	Type               LoggerType
	Output             io.Writer
	EnableRequestBody  bool
	EnableResponseBody bool
}

Logger is an intermediate struct to be changed elastic logger.

func (*Logger) GetESLogger

func (logger *Logger) GetESLogger(v ESVersion) (interface{}, error)

GetESLogger is to return elastic search logger.

type LoggerType

type LoggerType int
const (
	LOGGER_TYPE_TEXT LoggerType = iota
	LOGGER_TYPE_COLOR
	LOGGER_TYPE_CURL
	LOGGER_TYPE_JSON
)

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option is something for dependency injection.

type OptionFunc

type OptionFunc func(cfg *config)

OptionFunc is a practical struct for Option.

func WithAddressesOption

func WithAddressesOption(addrs []string) OptionFunc

WithAddressesOption has associated a list of elastic search nodes

func WithApiKeyOption

func WithApiKeyOption(key string) OptionFunc

WithApiKeyOption has associated apikey for authorization.

func WithCloudIdOption

func WithCloudIdOption(id string) OptionFunc

WithCloudIdOption has associated cloud-id about endpoint for elastic cloud

func WithESVersionOption

func WithESVersionOption(v ESVersion) OptionFunc

WithESVersionOption has associated version that elastic search nodes are running

func WithErrorHandler

func WithErrorHandler(h ErrorHandler) OptionFunc

WithErrorHandler has associated a handler called when an error is raised.

func WithGlobalQueueSizeOption

func WithGlobalQueueSizeOption(size int) OptionFunc

WithGlobalQueueSizeOption has associated queue size in global.

func WithLoggerOption

func WithLoggerOption(logger *Logger) OptionFunc

WithLoggerOption has associated logger object.

func WithPasswordOption

func WithPasswordOption(password string) OptionFunc

WithPasswordOption has associated password for HTTP basic authentication.

func WithTransportOption

func WithTransportOption(tp http.RoundTripper) OptionFunc

WithTransportOption has associated http transport object.

func WithUsernameOption

func WithUsernameOption(username string) OptionFunc

WithUsernameOption has associated username for HTTP basic authentication.

func WithWorkerQueueSizeOption

func WithWorkerQueueSizeOption(size int) OptionFunc

WithWorkerQueueSizeOption has associated queue size at a worker.

func WithWorkerSizeOption

func WithWorkerSizeOption(size int) OptionFunc

WithWorkerSizeOption has associated size for running workers.

func WithWorkerWaitInterval

func WithWorkerWaitInterval(d time.Duration) OptionFunc

WithWorkerWaitInterval has associated wait time at a worker.

type StandardAction

type StandardAction struct {
	Op      ESOperation
	Index   string
	DocType string
	Id      string
	Doc     map[string]interface{}
}

StandardAction is a struct to implement an interface of Action.

func (*StandardAction) GetDoc

func (da *StandardAction) GetDoc() map[string]interface{}

GetDoc returns value that should insert to a document on Elasticsearch.

func (*StandardAction) GetDocType

func (da *StandardAction) GetDocType() string

GetDocType returns a doctype that want to insert on Elasticsearch.

func (*StandardAction) GetID

func (da *StandardAction) GetID() string

GetID returns an id for document on Elasticsearch.

func (*StandardAction) GetIndex

func (da *StandardAction) GetIndex() string

GetIndex returns an index that want to insert on Elasticsearch.

func (*StandardAction) GetOperation

func (da *StandardAction) GetOperation() ESOperation

GetOperation returns an operation to process a document.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL