udf

package
v0.0.0-...-298395c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2019 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrServerStopped = errors.New("server already stopped")

Functions

This section is empty.

Types

type Diagnostic

type Diagnostic interface {
	Error(msg string, err error, ctx ...keyvalue.T)

	UDFLog(msg string)
}

type Info

type Info struct {
	Wants    agent.EdgeType
	Provides agent.EdgeType
	Options  map[string]*agent.OptionInfo
}

type Interface

type Interface interface {
	Open() error
	Info() (Info, error)
	Init(options []*agent.Option) error
	Abort(err error)
	Close() error

	Snapshot() ([]byte, error)
	Restore(snapshot []byte) error

	In() chan<- edge.Message
	Out() <-chan edge.Message
}

Interface for communicating with a UDF

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server provides an implementation for the core communication with UDFs. The Server provides only a partial implementation of udf.Interface as it is expected that setup and teardown will be necessary to create a Server. As such the Open and Close methods are not implemented.

Once a Server is created and started the owner can send points or batches to the UDF by writing them to the PointIn or BatchIn channels respectively, and according to the type of UDF created.

The Server may be Aborted at anytime for various reasons. It is the owner's responsibility via the abortCallback to stop writing to the *In channels since no more selects on the channels will be performed.

Calling Stop on the Server should only be done once the owner has stopped writing to the *In channel, at which point the remaining data will be processed and the UDF will be allowed to clean up.

Callling Info returns information about available options the UDF has.

Calling Init is required to process data. The behavior is undefined if you send points/batches to the Server without calling Init.

func NewServer

func NewServer(
	taskID, nodeID string,
	in agent.ByteReadReader,
	out io.WriteCloser,
	d Diagnostic,
	timeout time.Duration,
	abortCallback func(),
	killCallback func(),
) *Server

func (*Server) Abort

func (s *Server) Abort(err error)

Abort the server. Data in-flight will not be processed. Give a reason for aborting via the err parameter.

func (*Server) In

func (s *Server) In() chan<- edge.Message

func (*Server) Info

func (s *Server) Info() (Info, error)

Get information about the process, available options etc. Info need not be called every time a process is started.

func (*Server) Init

func (s *Server) Init(options []*agent.Option) error

Initialize the process with a set of Options. Calling Init is required even if you do not have any specific Options, just pass nil

func (*Server) Out

func (s *Server) Out() <-chan edge.Message

func (*Server) Restore

func (s *Server) Restore(snapshot []byte) error

Request to restore a snapshot.

func (*Server) Snapshot

func (s *Server) Snapshot() ([]byte, error)

Request a snapshot from the process.

func (*Server) Start

func (s *Server) Start() error

Start the Server

func (*Server) Stop

func (s *Server) Stop() error

Stop the Server cleanly.

Calling Stop should only be done once the owner has stopped writing to the *In channel, at which point the remaining data will be processed and the subprocess will be allowed to exit cleanly.

func (*Server) WaitIO

func (s *Server) WaitIO()

Wait for all IO to stop on the in/out objects.

Directories

Path Synopsis
Package agent is a generated protocol buffer package.
Package agent is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL