runtime

package
v1.14.0-dev.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TYPE_SOURCE = "source"
	TYPE_SINK   = "sink"
	TYPE_FUNC   = "func"
)
View Source
const (
	CMD_START = "start"
	CMD_STOP  = "stop"
)
View Source
const (
	REPLY_OK = "ok"
)

Variables

View Source
var PortbleConf = &PortableConfig{
	SendTimeout: 1000,
}

TODO setting configuration

Functions

func GetPluginInsManager

func GetPluginInsManager() *pluginInsManager

Types

type Closable

type Closable interface {
	Close() error
}

type Command

type Command struct {
	Cmd string `json:"cmd"`
	Arg string `json:"arg"`
}

type Control

type Control struct {
	SymbolName string                 `json:"symbolName"`
	Meta       Meta                   `json:"meta"`
	PluginType string                 `json:"pluginType"`
	DataSource string                 `json:"dataSource,omitempty"`
	Config     map[string]interface{} `json:"config,omitempty"`
}

type ControlChannel

type ControlChannel interface {
	Handshake() error
	SendCmd(arg []byte) error
	Closable
}

func CreateControlChannel

func CreateControlChannel(pluginName string) (ControlChannel, error)

type DataInChannel

type DataInChannel interface {
	Recv() ([]byte, error)
	Closable
}

func CreateSourceChannel

func CreateSourceChannel(ctx api.StreamContext) (DataInChannel, error)

type DataOutChannel

type DataOutChannel interface {
	Send([]byte) error
	Closable
}

func CreateSinkChannel

func CreateSinkChannel(ctx api.StreamContext) (DataOutChannel, error)

type DataReqChannel

type DataReqChannel interface {
	Req([]byte) ([]byte, error)
	Closable
}

func CreateFunctionChannel

func CreateFunctionChannel(symbolName string) (DataReqChannel, error)

type FuncData

type FuncData struct {
	Func string      `json:"func"`
	Arg  interface{} `json:"arg"`
}

type FuncMeta

type FuncMeta struct {
	Meta
	FuncId int `json:"funcId"`
}

type FuncReply

type FuncReply struct {
	State  bool        `json:"state"`
	Result interface{} `json:"result"`
}

type Meta

type Meta struct {
	RuleId     string `json:"ruleId"`
	OpId       string `json:"opId"`
	InstanceId int    `json:"instanceId"`
}

type NanomsgReqChannel

type NanomsgReqChannel struct {
	sync.Mutex
	// contains filtered or unexported fields
}

NanomsgReqChannel shared by symbols

func (*NanomsgReqChannel) Close

func (r *NanomsgReqChannel) Close() error

func (*NanomsgReqChannel) Handshake

func (r *NanomsgReqChannel) Handshake() error

Handshake should only be called once

func (*NanomsgReqChannel) SendCmd

func (r *NanomsgReqChannel) SendCmd(arg []byte) error

type NanomsgReqRepChannel

type NanomsgReqRepChannel struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*NanomsgReqRepChannel) Close

func (r *NanomsgReqRepChannel) Close() error

func (*NanomsgReqRepChannel) Req

func (r *NanomsgReqRepChannel) Req(arg []byte) ([]byte, error)

type PluginIns

type PluginIns struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

PluginIns created at two scenarios 1. At runtime, plugin is created/updated: in order to be able to reload rules that already uses previous ins 2. At system start/restart, when plugin is used by a rule Once created, never deleted until delete plugin command or system shutdown

func NewPluginIns

func NewPluginIns(name string, ctrlChan ControlChannel, process *os.Process) *PluginIns

func NewPluginInsForTest

func NewPluginInsForTest(name string, ctrlChan ControlChannel) *PluginIns

func (*PluginIns) StartSymbol

func (i *PluginIns) StartSymbol(ctx api.StreamContext, ctrl *Control) error

func (*PluginIns) Stop

func (i *PluginIns) Stop() error

Stop intentionally

func (*PluginIns) StopSymbol

func (i *PluginIns) StopSymbol(ctx api.StreamContext, ctrl *Control) error

type PluginMeta

type PluginMeta struct {
	Name        string  `json:"name"`
	Version     string  `json:"version"`
	Language    string  `json:"language"`
	Executable  string  `json:"executable"`
	VirtualType *string `json:"virtualEnvType,omitempty"`
	Env         *string `json:"env,omitempty"`
}

type PortableConfig

type PortableConfig struct {
	SendTimeout int64 `json:"sendTimeout"`
}

type PortableFunc

type PortableFunc struct {
	// contains filtered or unexported fields
}

PortableFunc each function symbol only has a singleton Each singleton are long-running go routine Currently, it is cached and never ended once created It is actually a wrapper of the data channel and can be fit to any plugin instance Thus, it is possible to hot reload, which is simply attach a new nng client to the same channel without changing the server(plugin runtime) side TODO think about ending a portable func when needed.

func NewPortableFunc

func NewPortableFunc(symbolName string, reg *PluginMeta) (_ *PortableFunc, e error)

func (*PortableFunc) Close

func (f *PortableFunc) Close() error

func (*PortableFunc) Exec

func (f *PortableFunc) Exec(args []interface{}, ctx api.FunctionContext) (interface{}, bool)

func (*PortableFunc) IsAggregate

func (f *PortableFunc) IsAggregate() bool

func (*PortableFunc) Validate

func (f *PortableFunc) Validate(args []interface{}) error

type PortableSink

type PortableSink struct {
	// contains filtered or unexported fields
}

func NewPortableSink

func NewPortableSink(symbolName string, reg *PluginMeta) *PortableSink

func (*PortableSink) Close

func (ps *PortableSink) Close(ctx api.StreamContext) error

func (*PortableSink) Collect

func (ps *PortableSink) Collect(ctx api.StreamContext, item interface{}) error

func (*PortableSink) Configure

func (ps *PortableSink) Configure(props map[string]interface{}) error

func (*PortableSink) Open

func (ps *PortableSink) Open(ctx api.StreamContext) error

type PortableSource

type PortableSource struct {
	// contains filtered or unexported fields
}

func NewPortableSource

func NewPortableSource(symbolName string, reg *PluginMeta) *PortableSource

func (*PortableSource) Close

func (ps *PortableSource) Close(ctx api.StreamContext) error

func (*PortableSource) Configure

func (ps *PortableSource) Configure(topic string, props map[string]interface{}) error

func (*PortableSource) Open

func (ps *PortableSource) Open(ctx api.StreamContext, consumer chan<- api.SourceTuple, errCh chan<- error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL