types

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2023 License: MIT Imports: 16 Imported by: 1

Documentation

Index

Constants

View Source
const (
	REQUEST_INVOKED   = 0
	REQUEST_RETURNED  = 1
	REQUEST_RESPONDED = 2

	CHANGE_PLACEMENT = 0x0001
	MAX_ATTEMPTS     = 3

	DEBUG_INITPROMISE_WAIT = 0x0001
)

Variables

View Source
var (
	ErrInvalidChunkSize = errors.New("invalid chunk size")
	ErrUnimplemented    = errors.New("unimplemented")
	ErrRequestFailure   = errors.New("request failed")
	ErrStoredTwice      = errors.New("chunk stored twice")
	ErrChunkClosed      = errors.New("chunk closed")
	ErrChunkStoreFailed = errors.New("failed to cache chunk")
	ErrUnexpectedClose  = errors.New("chunk unexpected closed")

	CtxKeyRequest = cacheCtxKey("request")
)
View Source
var (
	ErrStreamingReq       = errors.New("can not retry a streaming request")
	ErrMaxAttemptsReached = errors.New("max attempts reached")
	ErrResponded          = errors.New("responded")
	ErrNoClient           = errors.New("no client set")
	ErrNotSuppport        = errors.New("not support")

	PlaceholderResponse = &response{}
)
View Source
var (
	ErrControlCmd = errors.New("will not retry control command")
)
View Source
var ErrNoSpareDeployment = errors.New("no spare deployment")

Functions

This section is empty.

Types

type ClusterStats

type ClusterStats interface {
	InstanceLen() int
	InstanceStats(int) InstanceStats
	AllInstancesStats() Iterator
	InstanceStatsFromIterator(Iterator) (int, InstanceStats)
	MetaStats() MetaStoreStats
}

type Command

type Command interface {
	Name() string
	String() string
	GetInfo() interface{}
	GetRequest() *Request
	// MarkError Mark an failure attempt to send request, return attempts left.
	MarkError(error) int
	// LastError Get last failure attempt, return attempts left and last error.
	LastError() (int, error)
	// FailureError Get error for failure (no attempt left.)
	FailureError() error
	Flush() error
}

type Conn

type Conn interface {
	net.Conn
	Writer() *resp.RequestWriter
}

type Control

type Control struct {
	Cmd        string
	Addr       string
	Deployment string
	Id         uint64
	Info       interface{}
	Payload    []byte
	*Request

	Callback ControlCallback
	// contains filtered or unexported fields
}

func (*Control) FailureError

func (req *Control) FailureError() error

func (*Control) Flush

func (ctrl *Control) Flush() (err error)

func (*Control) GetInfo

func (req *Control) GetInfo() interface{}

func (*Control) GetRequest

func (req *Control) GetRequest() *Request

func (*Control) LastError

func (req *Control) LastError() (int, error)

func (*Control) MarkError

func (req *Control) MarkError(err error) int

func (*Control) Name

func (req *Control) Name() string

func (*Control) PrepareForData

func (ctrl *Control) PrepareForData(conn Conn)

func (*Control) PrepareForDel

func (ctrl *Control) PrepareForDel(conn Conn)

func (*Control) PrepareForMigrate

func (ctrl *Control) PrepareForMigrate(conn Conn)

func (*Control) PrepareForRecover

func (ctrl *Control) PrepareForRecover(conn Conn)

func (*Control) String

func (req *Control) String() string

type ControlCallback

type ControlCallback func(*Control, interface{})

type GroupedClusterStats

type GroupedClusterStats interface {
	ClusterLen() int
	ClusterStats(int) ClusterStats
	AllClustersStats() Iterator
	ClusterStatsFromIterator(Iterator) (int, ClusterStats)
	MetaStats() MetaStoreStats
}

type Id

type Id struct {
	ReqId   string
	ChunkId string
	// contains filtered or unexported fields
}

func (*Id) Chunk

func (id *Id) Chunk() int

func (*Id) String

func (id *Id) String() string

type InstanceOccupancyMode

type InstanceOccupancyMode int
const (
	InstanceOccupancyMain InstanceOccupancyMode = iota
	InstanceOccupancyModified
	InstanceOccupancyMax
	InstanceOccupancyDisabled
	InstanceOccupancyMod
)

func (InstanceOccupancyMode) String

func (iom InstanceOccupancyMode) String() string

type InstanceStats

type InstanceStats interface {
	Status() uint64
	Occupancy(InstanceOccupancyMode) float64
}

type Iterator

type Iterator interface {
	Len() int
	Next() bool
	Value() (int, interface{})
}

type LambdaDeployment

type LambdaDeployment interface {
	Name() string
	Id() uint64
}

type MetaStoreStats

type MetaStoreStats interface {
	Len() int
}

type MigrationScheduler

type MigrationScheduler interface {
	StartMigrator(uint64) (string, error)
	GetDestination(uint64) (LambdaDeployment, error)
}

type PersistCache

type PersistCache interface {
	// Len returns the number of chunks in the cache.
	Len() int

	// Get returns a PersistChunk instance by specified key.
	// This call is read-after-write safe because the key(chunk key) will only
	// be available to get request after an earlier write request was finished.
	GetOrCreate(key string, size int64) (chunk PersistChunk, first bool)

	// Get will return a existed PersistChunk, nil if not found.
	Get(key string) PersistChunk

	// Restore restores the cache from local storage.
	Restore() error

	// Report outputs the cache status.
	Report()
}

PersistCache offers API to cache data temporarily for persistent support and request batching. For persistent support, chunk to be written will 1. Stores in the PersistCache and persists to local storage before writing to SMS. 2. Once the chunk is written to SMS, the request will return without waiting for the chunk to be persisted to the COS. 3. If persisting to COS fails, the chunk will be loaded from PersistCache and retry persisting again. 4. If the chunk is requested before persisting to COS for scaling purpose, the chunk will be served from PersistCache. 5. After persisted to COS, the chunk will be removed from PersistCache. 6. On proxy failure, all chunks stored in PersistCache will be restored from local storage. For request batching, concurrent chunk requests will be merged into one request as: 1. The first request will load chunk from SMS and store it in PersistCache. 2. The following requests will load chunk from PersistCache.

type PersistChunk

type PersistChunk interface {
	redeo.Contextable
	PersistChunkForResponse

	// Key returns the key of the chunk.
	Key() string

	// Size returns the size of the chunk.
	Size() int64

	// Store stores the chunk by intercepting a stream.
	Store(resp.AllReadCloser) (resp.AllReadCloser, error)

	// GetInterceptor returns the interceptor that returns on calling Store().
	GetInterceptor() resp.AllReadCloser

	// WaitStored waits for the chunk to be stored or error occurred.
	WaitStored() error

	// Load loads the data by returning a stream.
	Load(context.Context) (resp.AllReadCloser, error)

	// LoadAll loads the data by returning the fully loaded data, wait if not fully loaded.
	LoadAll(context.Context) ([]byte, error)

	// StartPersist instructs the chunk to avoid from being closed before persisted to COS.
	StartPersist(req interface{}, timeout time.Duration, retry PersistRetrier)

	// DonePersist instructs the chunk persistencing has concluded, either success or failure.
	DonePersist()

	// Error returns the error occurred during storing chunk.
	Error() error

	// Close closes the chunk to be removed from the cache.
	Close()

	// Close closes the chunk with specified error.
	CloseWithError(err error)

	// CanClose returns true if there is no Load() pending.
	CanClose() bool

	// IsClosed returns true if the chunk is closed.
	IsClosed() bool
}

PersistChunk offers API for a abstract chunk to support persisting.

type PersistChunkForResponse

type PersistChunkForResponse interface {
	// IsStored returns whether the chunk is fully stored.
	IsStored() bool

	// ByteStored returns how many bytes is stored.
	BytesStored() int64
}

type PersistRetrier

type PersistRetrier func(PersistChunk)

type ProxyResponse

type ProxyResponse interface {
	redeo.Contextable
	Request() *Request
	Response() interface{}
}

type Request

type Request struct {
	Seq            int64
	Id             Id
	InsId          uint64 // Instance the request targeted.
	Cmd            string
	Key            string
	RetCommand     string
	BodySize       int64
	Body           []byte
	BodyStream     resp.AllReadCloser
	Info           interface{}
	Changes        int
	CollectorEntry interface{}
	Option         int64
	RequestGroup   RequestGroup
	PersistChunk   PersistChunk
	PredictedDue   time.Duration

	// Added by Tianium 20221102
	AllDone      bool // Is sharded request done after chunk request
	AllSucceeded bool // Is sharded request succeeded after chunk request
	// contains filtered or unexported fields
}

func GetRequest

func GetRequest(client *redeo.Client) *Request

func (*Request) Abandon

func (req *Request) Abandon() error

Only appliable to GET so far.

func (*Request) FailureError

func (req *Request) FailureError() error

func (*Request) Flush

func (req *Request) Flush() error

func (*Request) GetInfo

func (req *Request) GetInfo() interface{}

func (*Request) GetRequest

func (req *Request) GetRequest() *Request

func (*Request) InitPromise

func (req *Request) InitPromise(opts ...int) (ret promise.Promise)

InitPromise initialize a promise for later use. NOTE: This function is not thread-safe

func (*Request) IsResponse

func (req *Request) IsResponse(rsp *Response) bool

func (*Request) IsReturnd

func (req *Request) IsReturnd() bool

IsReturnd indicates if a response is received, especially as one of the request set. The content of the response may not available now and can change depends on the status of request set. (e.g. first-d optimization)

func (*Request) LastError

func (req *Request) LastError() (int, error)

func (*Request) MarkError

func (req *Request) MarkError(err error) int

MarkError updates the error of the request and returns the remaining attempts. Error specified can be nil to reset the error.

func (*Request) MarkReturned

func (req *Request) MarkReturned() bool

MarkReturned marks the request as returned/response received and returns if this is the first time to mark.

func (*Request) MustRequest

func (req *Request) MustRequest() bool

MustRequest indicates that the request must be sent to the Lambda.

func (*Request) Name

func (req *Request) Name() string

func (*Request) PrepareForDel

func (req *Request) PrepareForDel(conn Conn)

func (*Request) PrepareForGet

func (req *Request) PrepareForGet(conn Conn)

func (*Request) PrepareForRecover

func (req *Request) PrepareForRecover(conn Conn)

func (*Request) PrepareForSet

func (req *Request) PrepareForSet(conn Conn)

func (*Request) ResponseTimeout

func (req *Request) ResponseTimeout() time.Duration

func (*Request) SetErrorResponse

func (req *Request) SetErrorResponse(err error) error

func (*Request) SetResponse

func (req *Request) SetResponse(rsp *Response) error

func (*Request) Size

func (req *Request) Size() int64

func (*Request) String

func (req *Request) String() string

func (*Request) Timeout

func (req *Request) Timeout(opts ...int) (responded promise.Promise, err error)

func (*Request) ToAbandonGetResponse

func (req *Request) ToAbandonGetResponse() *Response

func (*Request) ToCachedResponse

func (req *Request) ToCachedResponse(cached PersistChunkForResponse) *Response

func (*Request) ToConcurrentSetResponse

func (req *Request) ToConcurrentSetResponse(placement uint64) *Response

func (*Request) ToRecover

func (req *Request) ToRecover() *Request

func (*Request) ToSetRetrial

func (req *Request) ToSetRetrial(stream resp.AllReadCloser) *Request

func (*Request) Validate

func (req *Request) Validate(final bool) bool

Validate confirms if the request should be sent to the Lambda. Pass true for the final confirmation.

type RequestGroup

type RequestGroup interface {
	MarkReturnd(*Id) (uint64, bool)
	IsFulfilled(status ...uint64) bool
	IsAllReturned(status ...uint64) bool
	Close()
}

type Response

type Response struct {
	util.Closer
	protocol.Contextable

	Id   Id
	Cmd  string
	Size string
	Body []byte

	Status int64 // Customized status. For GET: 1 - recovered
	// contains filtered or unexported fields
}

func NewResponse

func NewResponse(cmd string) *Response

func (*Response) CancelFlush

func (rsp *Response) CancelFlush()

func (*Response) Close

func (rsp *Response) Close()

func (*Response) Flush

func (rsp *Response) Flush() error

func (*Response) IsAbandon

func (rsp *Response) IsAbandon() bool

func (*Response) IsCached

func (rsp *Response) IsCached() (stored int64, full bool, cached bool)

func (*Response) OnFinalize

func (rsp *Response) OnFinalize(finalizer ResponseFinalizer)

func (*Response) PrepareForGet

func (rsp *Response) PrepareForGet(w resp.ResponseWriter, seq int64)

func (*Response) PrepareForSet

func (rsp *Response) PrepareForSet(w resp.ResponseWriter, seq int64)

func (*Response) Request

func (rsp *Response) Request() *Request

func (*Response) Response

func (rsp *Response) Response() interface{}

func (*Response) SetBodyStream

func (rsp *Response) SetBodyStream(stream resp.AllReadCloser)

func (*Response) String

func (rsp *Response) String() string

func (*Response) Wait

func (rsp *Response) Wait()

Close will block and wait for the stream to be flushed. Don't clean any fields if it can't be blocked until flushed.

func (*Response) WaitFlush

func (rsp *Response) WaitFlush(ctxCancelable bool) error

WaitFlush will wait for the response stream to be flushed. Non-stream transmission will return immediately. It returns: 1. context.Cancel if the flush is canceled. 2. transmission error if the flush was failed. Read error after canceled will be ignored.

type ResponseFinalizer

type ResponseFinalizer func(*Response)

type ScaleEvent

type ScaleEvent struct {
	// BaseInstance Instance that triggers the scaling event.
	BaseInstance interface{}

	// ScaleTarget The number of instances to scale.
	ScaleTarget int

	// Scaled A promise object that can be used to wait for the completion of the scaling event.
	Scaled promise.Promise

	// Retire If there is insufficient space in BaseInstance, set to true to retire it.
	Retire bool

	// Reason for logging.
	Reason string
}

func (*ScaleEvent) SetError

func (evt *ScaleEvent) SetError(err error)

func (*ScaleEvent) SetScaled

func (evt *ScaleEvent) SetScaled()

type ServerStats

type ServerStats interface {
	PersistCacheLen() int
}

type StatsIterator

type StatsIterator struct {
	// contains filtered or unexported fields
}

func NewStatsIterator

func NewStatsIterator(arr interface{}, len int) *StatsIterator

func (*StatsIterator) Len

func (iter *StatsIterator) Len() int

func (*StatsIterator) Next

func (iter *StatsIterator) Next() bool

func (*StatsIterator) Value

func (iter *StatsIterator) Value() (int, interface{})

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL