agent

package
v0.0.0-...-298395c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2019 License: MIT Imports: 11 Imported by: 0

README

UDF Agents and Servers

A UDF is a User Defined Function, meaning that you can write your own functions/algorithms and plug them into Kapacitor. Your custom function runs in its own process and Kapacitor communicates with it via a defined protocol, see udf.proto. To facilitate working with the protocol several agents have been written in various languages that abstract the protocol communication through an interface in the respective languages. You can find those agent implementations in this directory and subdirectories based on language name.

Example uses of the agents can be found in the examples directory. These examples are working examples and are executed as part of the testing suite, see server_test.go.

Child process vs Socket

There are two approaches for writing UDFs.

  • A child process based approach where Kapacitor spawns a child process and communicates over STDIN/STDOUT.
  • A socket based approach where you start the UDF process externally and Kapacitor connects to it over a socket.

For the socket based approach there will only ever be one instance of your UDF process running. Each use of the UDF in a TICKscript will be a new connection the socket. Where as each use of a process based UDF means a new child process is spawned for each.

Design

The protocol for communicating with Kapacitor consists of Request and Response messages. The agents wrap the communication and serialization and expose an interface that needs to be implemented to handle each request/response. In addition to the request/response paradigm agents provide a way to stream data back to Kapacitor. Your UDF is in control of when new points or batches are sent back to Kapacitor.

Agents and Servers

There are two main objects provided in the current implementations, an Agent and a Server. The Agent is responsible for managing the communication over input and output streams. The Server is responsible for accepting new connections and creating new Agents to handle those new connections.

Both process based and socket based UDFs will need to use an Agent to handle the communication/serialization aspects of the protocol. Only socket based UDFs need use the Server.

Writing an Agent for a new Language

The UDF protocol is designed to be simple and consists of reading and writing protocol buffer messages.

In order to write a UDF in the language of your choice your language must have protocol buffer support and be able to read and write to a socket.

The basic steps are:

  1. Add the language to the udf/io.go generate comment so the udf.proto code exists for your language.
  2. Implement a Varint encoder/decoder, this is trivial see the python implementation.
  3. Implement a method for reading and writing streamed protobuf messages. See udf.proto for more details.
  4. Create an interface for handling each of the request/responses.
  5. Write a loop for reading from an input stream and calling the handler interface, and write responses to an output stream.
  6. Provide an thread safe mechanism for writing points and batches to the output stream independent of the handler interface. This is easily accomplished with a synchronized write method, see the python implementation.
  7. Implement the examples using your new agent.
  8. Add your example to the test suite in cmd/kapacitord/run/server_test.go.

For process based UDFs it is expected that the process terminate after STDIN is closed and the remaining requests processed. After STDIN is closed, the agent process can continue to send Responses to Kapacitor as long as a keepalive timeout does not occur. Once a keepalive timeout is reached and after a 2*keepalive_time grace period, if the process has not terminated then it will be forcefully terminated.

Docker

It is expected that the example can run inside the test suite. Since generating different protocol buffer code requires different plugins and libraries to run we make use of Docker to provide the necessary environment. This makes testing the code easier as the developer does not have to install each supported language locally.

Documentation

Overview

Package agent is a generated protocol buffer package.

It is generated from these files:

udf.proto

It has these top-level messages:

InfoRequest
InfoResponse
OptionInfo
InitRequest
Option
OptionValue
InitResponse
SnapshotRequest
SnapshotResponse
RestoreRequest
RestoreResponse
KeepaliveRequest
KeepaliveResponse
ErrorResponse
BeginBatch
Point
EndBatch
Request
Response

Index

Constants

This section is empty.

Variables

View Source
var EdgeType_name = map[int32]string{
	0: "STREAM",
	1: "BATCH",
}
View Source
var EdgeType_value = map[string]int32{
	"STREAM": 0,
	"BATCH":  1,
}
View Source
var ValueType_name = map[int32]string{
	0: "BOOL",
	1: "INT",
	2: "DOUBLE",
	3: "STRING",
	4: "DURATION",
}
View Source
var ValueType_value = map[string]int32{
	"BOOL":     0,
	"INT":      1,
	"DOUBLE":   2,
	"STRING":   3,
	"DURATION": 4,
}

Functions

func ReadMessage

func ReadMessage(buf *[]byte, r ByteReadReader, msg proto.Message) error

Read a message from io.ByteReader by first reading a varint size, and then reading and decoding the message object. If buf is not big enough a new buffer will be allocated to replace buf.

func WriteMessage

func WriteMessage(msg proto.Message, w io.Writer) error

Write the message to the io.Writer with a varint size header.

Types

type Accepter

type Accepter interface {
	// Accept new connections from the listener and handle them accordingly.
	// The typical action is to create a new Agent with the connection as both its in and out objects.
	Accept(net.Conn)
}

type Agent

type Agent struct {

	// A channel for writing Responses, specifically Batch and Point responses.
	Responses chan<- *Response

	// The handler for requests.
	Handler Handler
	// contains filtered or unexported fields
}

Go implementation of a Kapacitor UDF agent. This agent is responsible for reading and writing messages over a socket.

The Agent requires a Handler object in order to fulfill requests.

func New

func New(in io.ReadCloser, out io.WriteCloser) *Agent

Create a new Agent is the provided in/out objects. To create an Agent that reads from STDIN/STDOUT of the process use New(os.Stdin, os.Stdout)

func (*Agent) Start

func (a *Agent) Start() error

Start the Agent, you must set an Handler on the agent before starting.

func (*Agent) Wait

func (a *Agent) Wait() error

Wait for the Agent to terminate. The Agent will not terminate till the Responses channel is closed. You will need to close this channel externally, typically in the Stop method for the Handler. The Agent will terminate if the In reader is closed or an error occurs.

type BeginBatch

type BeginBatch struct {
	Name   string            `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
	Group  string            `protobuf:"bytes,2,opt,name=group" json:"group,omitempty"`
	Tags   map[string]string `` /* 128-byte string literal not displayed */
	Size   int64             `protobuf:"varint,4,opt,name=size" json:"size,omitempty"`
	ByName bool              `protobuf:"varint,5,opt,name=byName" json:"byName,omitempty"`
}

Indicates the beginning of a batch. All subsequent points should be considered part of the batch until EndBatch arrives. This includes grouping. Batches of differing groups may not be interleaved.

All the meta data but tmax is provided, since tmax may not be known at the beginning of a batch.

Size is the number of points in the batch. If size is 0 then the batch has an undetermined size.

func (*BeginBatch) Descriptor

func (*BeginBatch) Descriptor() ([]byte, []int)

func (*BeginBatch) GetByName

func (m *BeginBatch) GetByName() bool

func (*BeginBatch) GetGroup

func (m *BeginBatch) GetGroup() string

func (*BeginBatch) GetName

func (m *BeginBatch) GetName() string

func (*BeginBatch) GetSize

func (m *BeginBatch) GetSize() int64

func (*BeginBatch) GetTags

func (m *BeginBatch) GetTags() map[string]string

func (*BeginBatch) ProtoMessage

func (*BeginBatch) ProtoMessage()

func (*BeginBatch) Reset

func (m *BeginBatch) Reset()

func (*BeginBatch) String

func (m *BeginBatch) String() string

type ByteReadReader

type ByteReadReader interface {
	io.Reader
	io.ByteReader
}

Interface for reading messages If you have an io.Reader wrap your reader in a bufio Reader to stasify this interface.

Example: brr := bufio.NewReader(reader)

type EdgeType

type EdgeType int32
const (
	EdgeType_STREAM EdgeType = 0
	EdgeType_BATCH  EdgeType = 1
)

func (EdgeType) EnumDescriptor

func (EdgeType) EnumDescriptor() ([]byte, []int)

func (EdgeType) String

func (x EdgeType) String() string

type EndBatch

type EndBatch struct {
	Name   string            `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
	Group  string            `protobuf:"bytes,2,opt,name=group" json:"group,omitempty"`
	Tmax   int64             `protobuf:"varint,3,opt,name=tmax" json:"tmax,omitempty"`
	Tags   map[string]string `` /* 128-byte string literal not displayed */
	ByName bool              `protobuf:"varint,5,opt,name=byName" json:"byName,omitempty"`
}

Indicates the end of a batch and contains all meta data associated with the batch. The same meta information is provided for ease of use with the addition of tmax since it may not be know at BeginBatch.

func (*EndBatch) Descriptor

func (*EndBatch) Descriptor() ([]byte, []int)

func (*EndBatch) GetByName

func (m *EndBatch) GetByName() bool

func (*EndBatch) GetGroup

func (m *EndBatch) GetGroup() string

func (*EndBatch) GetName

func (m *EndBatch) GetName() string

func (*EndBatch) GetTags

func (m *EndBatch) GetTags() map[string]string

func (*EndBatch) GetTmax

func (m *EndBatch) GetTmax() int64

func (*EndBatch) ProtoMessage

func (*EndBatch) ProtoMessage()

func (*EndBatch) Reset

func (m *EndBatch) Reset()

func (*EndBatch) String

func (m *EndBatch) String() string

type ErrorResponse

type ErrorResponse struct {
	Error string `protobuf:"bytes,1,opt,name=error" json:"error,omitempty"`
}

Sent from the process to Kapacitor indicating an error has occurred. If an ErrorResponse is received, Kapacitor will terminate the process.

func (*ErrorResponse) Descriptor

func (*ErrorResponse) Descriptor() ([]byte, []int)

func (*ErrorResponse) GetError

func (m *ErrorResponse) GetError() string

func (*ErrorResponse) ProtoMessage

func (*ErrorResponse) ProtoMessage()

func (*ErrorResponse) Reset

func (m *ErrorResponse) Reset()

func (*ErrorResponse) String

func (m *ErrorResponse) String() string

type Handler

type Handler interface {
	// Return the InfoResponse. Describing the properties of this Handler
	Info() (*InfoResponse, error)
	// Initialize the Handler with the provided options.
	Init(*InitRequest) (*InitResponse, error)
	// Create a snapshot of the running state of the handler.
	Snapshot() (*SnapshotResponse, error)
	// Restore a previous snapshot.
	Restore(*RestoreRequest) (*RestoreResponse, error)

	// A batch has begun.
	BeginBatch(*BeginBatch) error
	// A point has arrived.
	Point(*Point) error
	// The batch is complete.
	EndBatch(*EndBatch) error

	// Gracefully stop the Handler.
	// No other methods will be called.
	Stop()
}

The Agent calls the appropriate methods on the Handler as it receives requests over a socket.

Returning an error from any method will cause the Agent to stop and an ErrorResponse to be sent. Some *Response objects (like SnapshotResponse) allow for returning their own error within the object itself. These types of errors will not stop the Agent and Kapacitor will deal with them appropriately.

The Handler is called from a single goroutine, meaning methods will not be called concurrently.

To write Points/Batches back to the Agent/Kapacitor use the Agent.Responses channel.

type InfoRequest

type InfoRequest struct {
}

Request that the process return information about available Options.

func (*InfoRequest) Descriptor

func (*InfoRequest) Descriptor() ([]byte, []int)

func (*InfoRequest) ProtoMessage

func (*InfoRequest) ProtoMessage()

func (*InfoRequest) Reset

func (m *InfoRequest) Reset()

func (*InfoRequest) String

func (m *InfoRequest) String() string

type InfoResponse

type InfoResponse struct {
	Wants    EdgeType               `protobuf:"varint,1,opt,name=wants,enum=agent.EdgeType" json:"wants,omitempty"`
	Provides EdgeType               `protobuf:"varint,2,opt,name=provides,enum=agent.EdgeType" json:"provides,omitempty"`
	Options  map[string]*OptionInfo `` /* 134-byte string literal not displayed */
}

func (*InfoResponse) Descriptor

func (*InfoResponse) Descriptor() ([]byte, []int)

func (*InfoResponse) GetOptions

func (m *InfoResponse) GetOptions() map[string]*OptionInfo

func (*InfoResponse) GetProvides

func (m *InfoResponse) GetProvides() EdgeType

func (*InfoResponse) GetWants

func (m *InfoResponse) GetWants() EdgeType

func (*InfoResponse) ProtoMessage

func (*InfoResponse) ProtoMessage()

func (*InfoResponse) Reset

func (m *InfoResponse) Reset()

func (*InfoResponse) String

func (m *InfoResponse) String() string

type InitRequest

type InitRequest struct {
	Options []*Option `protobuf:"bytes,1,rep,name=options" json:"options,omitempty"`
	TaskID  string    `protobuf:"bytes,2,opt,name=taskID" json:"taskID,omitempty"`
	NodeID  string    `protobuf:"bytes,3,opt,name=nodeID" json:"nodeID,omitempty"`
}

Request that the process initialize itself with the provided options.

func (*InitRequest) Descriptor

func (*InitRequest) Descriptor() ([]byte, []int)

func (*InitRequest) GetNodeID

func (m *InitRequest) GetNodeID() string

func (*InitRequest) GetOptions

func (m *InitRequest) GetOptions() []*Option

func (*InitRequest) GetTaskID

func (m *InitRequest) GetTaskID() string

func (*InitRequest) ProtoMessage

func (*InitRequest) ProtoMessage()

func (*InitRequest) Reset

func (m *InitRequest) Reset()

func (*InitRequest) String

func (m *InitRequest) String() string

type InitResponse

type InitResponse struct {
	Success bool   `protobuf:"varint,1,opt,name=success" json:"success,omitempty"`
	Error   string `protobuf:"bytes,2,opt,name=error" json:"error,omitempty"`
}

Respond to Kapacitor whether initialization was successful.

func (*InitResponse) Descriptor

func (*InitResponse) Descriptor() ([]byte, []int)

func (*InitResponse) GetError

func (m *InitResponse) GetError() string

func (*InitResponse) GetSuccess

func (m *InitResponse) GetSuccess() bool

func (*InitResponse) ProtoMessage

func (*InitResponse) ProtoMessage()

func (*InitResponse) Reset

func (m *InitResponse) Reset()

func (*InitResponse) String

func (m *InitResponse) String() string

type KeepaliveRequest

type KeepaliveRequest struct {
	// The number of nanoseconds since the epoch.
	// Used only for debugging keepalive requests.
	Time int64 `protobuf:"varint,1,opt,name=time" json:"time,omitempty"`
}

Request that the process respond with a Keepalive to verify it is responding.

func (*KeepaliveRequest) Descriptor

func (*KeepaliveRequest) Descriptor() ([]byte, []int)

func (*KeepaliveRequest) GetTime

func (m *KeepaliveRequest) GetTime() int64

func (*KeepaliveRequest) ProtoMessage

func (*KeepaliveRequest) ProtoMessage()

func (*KeepaliveRequest) Reset

func (m *KeepaliveRequest) Reset()

func (*KeepaliveRequest) String

func (m *KeepaliveRequest) String() string

type KeepaliveResponse

type KeepaliveResponse struct {
	// The number of nanoseconds since the epoch.
	// Used only for debugging keepalive requests.
	Time int64 `protobuf:"varint,1,opt,name=time" json:"time,omitempty"`
}

Respond to KeepaliveRequest

func (*KeepaliveResponse) Descriptor

func (*KeepaliveResponse) Descriptor() ([]byte, []int)

func (*KeepaliveResponse) GetTime

func (m *KeepaliveResponse) GetTime() int64

func (*KeepaliveResponse) ProtoMessage

func (*KeepaliveResponse) ProtoMessage()

func (*KeepaliveResponse) Reset

func (m *KeepaliveResponse) Reset()

func (*KeepaliveResponse) String

func (m *KeepaliveResponse) String() string

type Option

type Option struct {
	Name   string         `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
	Values []*OptionValue `protobuf:"bytes,2,rep,name=values" json:"values,omitempty"`
}

func (*Option) Descriptor

func (*Option) Descriptor() ([]byte, []int)

func (*Option) GetName

func (m *Option) GetName() string

func (*Option) GetValues

func (m *Option) GetValues() []*OptionValue

func (*Option) ProtoMessage

func (*Option) ProtoMessage()

func (*Option) Reset

func (m *Option) Reset()

func (*Option) String

func (m *Option) String() string

type OptionInfo

type OptionInfo struct {
	ValueTypes []ValueType `protobuf:"varint,1,rep,packed,name=valueTypes,enum=agent.ValueType" json:"valueTypes,omitempty"`
}

func (*OptionInfo) Descriptor

func (*OptionInfo) Descriptor() ([]byte, []int)

func (*OptionInfo) GetValueTypes

func (m *OptionInfo) GetValueTypes() []ValueType

func (*OptionInfo) ProtoMessage

func (*OptionInfo) ProtoMessage()

func (*OptionInfo) Reset

func (m *OptionInfo) Reset()

func (*OptionInfo) String

func (m *OptionInfo) String() string

type OptionValue

type OptionValue struct {
	Type ValueType `protobuf:"varint,1,opt,name=type,enum=agent.ValueType" json:"type,omitempty"`
	// Types that are valid to be assigned to Value:
	//	*OptionValue_BoolValue
	//	*OptionValue_IntValue
	//	*OptionValue_DoubleValue
	//	*OptionValue_StringValue
	//	*OptionValue_DurationValue
	Value isOptionValue_Value `protobuf_oneof:"value"`
}

func (*OptionValue) Descriptor

func (*OptionValue) Descriptor() ([]byte, []int)

func (*OptionValue) GetBoolValue

func (m *OptionValue) GetBoolValue() bool

func (*OptionValue) GetDoubleValue

func (m *OptionValue) GetDoubleValue() float64

func (*OptionValue) GetDurationValue

func (m *OptionValue) GetDurationValue() int64

func (*OptionValue) GetIntValue

func (m *OptionValue) GetIntValue() int64

func (*OptionValue) GetStringValue

func (m *OptionValue) GetStringValue() string

func (*OptionValue) GetType

func (m *OptionValue) GetType() ValueType

func (*OptionValue) GetValue

func (m *OptionValue) GetValue() isOptionValue_Value

func (*OptionValue) ProtoMessage

func (*OptionValue) ProtoMessage()

func (*OptionValue) Reset

func (m *OptionValue) Reset()

func (*OptionValue) String

func (m *OptionValue) String() string

func (*OptionValue) XXX_OneofFuncs

func (*OptionValue) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{})

XXX_OneofFuncs is for the internal use of the proto package.

type OptionValue_BoolValue

type OptionValue_BoolValue struct {
	BoolValue bool `protobuf:"varint,2,opt,name=boolValue,oneof"`
}

type OptionValue_DoubleValue

type OptionValue_DoubleValue struct {
	DoubleValue float64 `protobuf:"fixed64,4,opt,name=doubleValue,oneof"`
}

type OptionValue_DurationValue

type OptionValue_DurationValue struct {
	DurationValue int64 `protobuf:"varint,6,opt,name=durationValue,oneof"`
}

type OptionValue_IntValue

type OptionValue_IntValue struct {
	IntValue int64 `protobuf:"varint,3,opt,name=intValue,oneof"`
}

type OptionValue_StringValue

type OptionValue_StringValue struct {
	StringValue string `protobuf:"bytes,5,opt,name=stringValue,oneof"`
}

type Point

type Point struct {
	Time            int64              `protobuf:"varint,1,opt,name=time" json:"time,omitempty"`
	Name            string             `protobuf:"bytes,2,opt,name=name" json:"name,omitempty"`
	Database        string             `protobuf:"bytes,3,opt,name=database" json:"database,omitempty"`
	RetentionPolicy string             `protobuf:"bytes,4,opt,name=retentionPolicy" json:"retentionPolicy,omitempty"`
	Group           string             `protobuf:"bytes,5,opt,name=group" json:"group,omitempty"`
	Dimensions      []string           `protobuf:"bytes,6,rep,name=dimensions" json:"dimensions,omitempty"`
	Tags            map[string]string  `` /* 128-byte string literal not displayed */
	FieldsDouble    map[string]float64 `` /* 146-byte string literal not displayed */
	FieldsInt       map[string]int64   `` /* 139-byte string literal not displayed */
	FieldsString    map[string]string  `` /* 145-byte string literal not displayed */
	FieldsBool      map[string]bool    `` /* 142-byte string literal not displayed */
	ByName          bool               `protobuf:"varint,11,opt,name=byName" json:"byName,omitempty"`
}

Message containing information about a single data point. Can be sent on it's own or bookended by BeginBatch and EndBatch messages.

func (*Point) Descriptor

func (*Point) Descriptor() ([]byte, []int)

func (*Point) GetByName

func (m *Point) GetByName() bool

func (*Point) GetDatabase

func (m *Point) GetDatabase() string

func (*Point) GetDimensions

func (m *Point) GetDimensions() []string

func (*Point) GetFieldsBool

func (m *Point) GetFieldsBool() map[string]bool

func (*Point) GetFieldsDouble

func (m *Point) GetFieldsDouble() map[string]float64

func (*Point) GetFieldsInt

func (m *Point) GetFieldsInt() map[string]int64

func (*Point) GetFieldsString

func (m *Point) GetFieldsString() map[string]string

func (*Point) GetGroup

func (m *Point) GetGroup() string

func (*Point) GetName

func (m *Point) GetName() string

func (*Point) GetRetentionPolicy

func (m *Point) GetRetentionPolicy() string

func (*Point) GetTags

func (m *Point) GetTags() map[string]string

func (*Point) GetTime

func (m *Point) GetTime() int64

func (*Point) ProtoMessage

func (*Point) ProtoMessage()

func (*Point) Reset

func (m *Point) Reset()

func (*Point) String

func (m *Point) String() string

type Request

type Request struct {
	// Types that are valid to be assigned to Message:
	//	*Request_Info
	//	*Request_Init
	//	*Request_Keepalive
	//	*Request_Snapshot
	//	*Request_Restore
	//	*Request_Begin
	//	*Request_Point
	//	*Request_End
	Message isRequest_Message `protobuf_oneof:"message"`
}

Request message wrapper -- sent from Kapacitor to process

func (*Request) Descriptor

func (*Request) Descriptor() ([]byte, []int)

func (*Request) GetBegin

func (m *Request) GetBegin() *BeginBatch

func (*Request) GetEnd

func (m *Request) GetEnd() *EndBatch

func (*Request) GetInfo

func (m *Request) GetInfo() *InfoRequest

func (*Request) GetInit

func (m *Request) GetInit() *InitRequest

func (*Request) GetKeepalive

func (m *Request) GetKeepalive() *KeepaliveRequest

func (*Request) GetMessage

func (m *Request) GetMessage() isRequest_Message

func (*Request) GetPoint

func (m *Request) GetPoint() *Point

func (*Request) GetRestore

func (m *Request) GetRestore() *RestoreRequest

func (*Request) GetSnapshot

func (m *Request) GetSnapshot() *SnapshotRequest

func (*Request) ProtoMessage

func (*Request) ProtoMessage()

func (*Request) Reset

func (m *Request) Reset()

func (*Request) String

func (m *Request) String() string

func (*Request) XXX_OneofFuncs

func (*Request) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{})

XXX_OneofFuncs is for the internal use of the proto package.

type Request_Begin

type Request_Begin struct {
	Begin *BeginBatch `protobuf:"bytes,16,opt,name=begin,oneof"`
}

type Request_End

type Request_End struct {
	End *EndBatch `protobuf:"bytes,18,opt,name=end,oneof"`
}

type Request_Info

type Request_Info struct {
	Info *InfoRequest `protobuf:"bytes,1,opt,name=info,oneof"`
}

type Request_Init

type Request_Init struct {
	Init *InitRequest `protobuf:"bytes,2,opt,name=init,oneof"`
}

type Request_Keepalive

type Request_Keepalive struct {
	Keepalive *KeepaliveRequest `protobuf:"bytes,3,opt,name=keepalive,oneof"`
}

type Request_Point

type Request_Point struct {
	Point *Point `protobuf:"bytes,17,opt,name=point,oneof"`
}

type Request_Restore

type Request_Restore struct {
	Restore *RestoreRequest `protobuf:"bytes,5,opt,name=restore,oneof"`
}

type Request_Snapshot

type Request_Snapshot struct {
	Snapshot *SnapshotRequest `protobuf:"bytes,4,opt,name=snapshot,oneof"`
}

type Response

type Response struct {
	// Types that are valid to be assigned to Message:
	//	*Response_Info
	//	*Response_Init
	//	*Response_Keepalive
	//	*Response_Snapshot
	//	*Response_Restore
	//	*Response_Error
	//	*Response_Begin
	//	*Response_Point
	//	*Response_End
	Message isResponse_Message `protobuf_oneof:"message"`
}

Response message wrapper -- sent from process to Kapacitor

func (*Response) Descriptor

func (*Response) Descriptor() ([]byte, []int)

func (*Response) GetBegin

func (m *Response) GetBegin() *BeginBatch

func (*Response) GetEnd

func (m *Response) GetEnd() *EndBatch

func (*Response) GetError

func (m *Response) GetError() *ErrorResponse

func (*Response) GetInfo

func (m *Response) GetInfo() *InfoResponse

func (*Response) GetInit

func (m *Response) GetInit() *InitResponse

func (*Response) GetKeepalive

func (m *Response) GetKeepalive() *KeepaliveResponse

func (*Response) GetMessage

func (m *Response) GetMessage() isResponse_Message

func (*Response) GetPoint

func (m *Response) GetPoint() *Point

func (*Response) GetRestore

func (m *Response) GetRestore() *RestoreResponse

func (*Response) GetSnapshot

func (m *Response) GetSnapshot() *SnapshotResponse

func (*Response) ProtoMessage

func (*Response) ProtoMessage()

func (*Response) Reset

func (m *Response) Reset()

func (*Response) String

func (m *Response) String() string

func (*Response) XXX_OneofFuncs

func (*Response) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{})

XXX_OneofFuncs is for the internal use of the proto package.

type Response_Begin

type Response_Begin struct {
	Begin *BeginBatch `protobuf:"bytes,16,opt,name=begin,oneof"`
}

type Response_End

type Response_End struct {
	End *EndBatch `protobuf:"bytes,18,opt,name=end,oneof"`
}

type Response_Error

type Response_Error struct {
	Error *ErrorResponse `protobuf:"bytes,6,opt,name=error,oneof"`
}

type Response_Info

type Response_Info struct {
	Info *InfoResponse `protobuf:"bytes,1,opt,name=info,oneof"`
}

type Response_Init

type Response_Init struct {
	Init *InitResponse `protobuf:"bytes,2,opt,name=init,oneof"`
}

type Response_Keepalive

type Response_Keepalive struct {
	Keepalive *KeepaliveResponse `protobuf:"bytes,3,opt,name=keepalive,oneof"`
}

type Response_Point

type Response_Point struct {
	Point *Point `protobuf:"bytes,17,opt,name=point,oneof"`
}

type Response_Restore

type Response_Restore struct {
	Restore *RestoreResponse `protobuf:"bytes,5,opt,name=restore,oneof"`
}

type Response_Snapshot

type Response_Snapshot struct {
	Snapshot *SnapshotResponse `protobuf:"bytes,4,opt,name=snapshot,oneof"`
}

type RestoreRequest

type RestoreRequest struct {
	Snapshot []byte `protobuf:"bytes,1,opt,name=snapshot,proto3" json:"snapshot,omitempty"`
}

Request that the process restore its state from a snapshot.

func (*RestoreRequest) Descriptor

func (*RestoreRequest) Descriptor() ([]byte, []int)

func (*RestoreRequest) GetSnapshot

func (m *RestoreRequest) GetSnapshot() []byte

func (*RestoreRequest) ProtoMessage

func (*RestoreRequest) ProtoMessage()

func (*RestoreRequest) Reset

func (m *RestoreRequest) Reset()

func (*RestoreRequest) String

func (m *RestoreRequest) String() string

type RestoreResponse

type RestoreResponse struct {
	Success bool   `protobuf:"varint,1,opt,name=success" json:"success,omitempty"`
	Error   string `protobuf:"bytes,2,opt,name=error" json:"error,omitempty"`
}

Respond with success or failure to a RestoreRequest

func (*RestoreResponse) Descriptor

func (*RestoreResponse) Descriptor() ([]byte, []int)

func (*RestoreResponse) GetError

func (m *RestoreResponse) GetError() string

func (*RestoreResponse) GetSuccess

func (m *RestoreResponse) GetSuccess() bool

func (*RestoreResponse) ProtoMessage

func (*RestoreResponse) ProtoMessage()

func (*RestoreResponse) Reset

func (m *RestoreResponse) Reset()

func (*RestoreResponse) String

func (m *RestoreResponse) String() string

type Server

type Server struct {
	// contains filtered or unexported fields
}

A server accepts connections on a listener and spawns new Agents for each connection.

func NewServer

func NewServer(l net.Listener, a Accepter) *Server

Create a new server.

func (*Server) Serve

func (s *Server) Serve() error

Server starts the server and blocks.

func (*Server) Stop

func (s *Server) Stop()

Stop closes the listener and stops all server activity.

func (*Server) StopOnSignals

func (s *Server) StopOnSignals(signals ...os.Signal)

StopOnSignals registers a signal handler to stop the Server for the given signals.

type SnapshotRequest

type SnapshotRequest struct {
}

Request that the process provide a snapshot of its state.

func (*SnapshotRequest) Descriptor

func (*SnapshotRequest) Descriptor() ([]byte, []int)

func (*SnapshotRequest) ProtoMessage

func (*SnapshotRequest) ProtoMessage()

func (*SnapshotRequest) Reset

func (m *SnapshotRequest) Reset()

func (*SnapshotRequest) String

func (m *SnapshotRequest) String() string

type SnapshotResponse

type SnapshotResponse struct {
	Snapshot []byte `protobuf:"bytes,1,opt,name=snapshot,proto3" json:"snapshot,omitempty"`
}

Respond to Kapacitor with a serialized snapshot of the running state.

func (*SnapshotResponse) Descriptor

func (*SnapshotResponse) Descriptor() ([]byte, []int)

func (*SnapshotResponse) GetSnapshot

func (m *SnapshotResponse) GetSnapshot() []byte

func (*SnapshotResponse) ProtoMessage

func (*SnapshotResponse) ProtoMessage()

func (*SnapshotResponse) Reset

func (m *SnapshotResponse) Reset()

func (*SnapshotResponse) String

func (m *SnapshotResponse) String() string

type ValueType

type ValueType int32
const (
	ValueType_BOOL     ValueType = 0
	ValueType_INT      ValueType = 1
	ValueType_DOUBLE   ValueType = 2
	ValueType_STRING   ValueType = 3
	ValueType_DURATION ValueType = 4
)

func (ValueType) EnumDescriptor

func (ValueType) EnumDescriptor() ([]byte, []int)

func (ValueType) String

func (x ValueType) String() string

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL