Documentation ¶
Index ¶
- Constants
- Variables
- type ClusterStats
- type Command
- type Conn
- type Control
- func (req *Control) FailureError() error
- func (ctrl *Control) Flush() (err error)
- func (req *Control) GetInfo() interface{}
- func (req *Control) GetRequest() *Request
- func (req *Control) LastError() (int, error)
- func (req *Control) MarkError(err error) int
- func (req *Control) Name() string
- func (ctrl *Control) PrepareForData(conn Conn)
- func (ctrl *Control) PrepareForDel(conn Conn)
- func (ctrl *Control) PrepareForMigrate(conn Conn)
- func (ctrl *Control) PrepareForRecover(conn Conn)
- func (req *Control) String() string
- type ControlCallback
- type GroupedClusterStats
- type Id
- type InstanceOccupancyMode
- type InstanceStats
- type Iterator
- type LambdaDeployment
- type MetaStoreStats
- type MigrationScheduler
- type PersistCache
- type PersistChunk
- type PersistChunkForResponse
- type PersistRetrier
- type ProxyResponse
- type Request
- func (req *Request) Abandon() error
- func (req *Request) FailureError() error
- func (req *Request) Flush() error
- func (req *Request) GetInfo() interface{}
- func (req *Request) GetRequest() *Request
- func (req *Request) InitPromise(opts ...int) (ret promise.Promise)
- func (req *Request) IsResponse(rsp *Response) bool
- func (req *Request) IsReturnd() bool
- func (req *Request) LastError() (int, error)
- func (req *Request) MarkError(err error) int
- func (req *Request) MarkReturned() bool
- func (req *Request) MustRequest() bool
- func (req *Request) Name() string
- func (req *Request) PrepareForDel(conn Conn)
- func (req *Request) PrepareForGet(conn Conn)
- func (req *Request) PrepareForRecover(conn Conn)
- func (req *Request) PrepareForSet(conn Conn)
- func (req *Request) ResponseTimeout() time.Duration
- func (req *Request) SetErrorResponse(err error) error
- func (req *Request) SetResponse(rsp *Response) error
- func (req *Request) Size() int64
- func (req *Request) String() string
- func (req *Request) Timeout(opts ...int) (responded promise.Promise, err error)
- func (req *Request) ToAbandonGetResponse() *Response
- func (req *Request) ToCachedResponse(cached PersistChunkForResponse) *Response
- func (req *Request) ToConcurrentSetResponse(placement uint64) *Response
- func (req *Request) ToRecover() *Request
- func (req *Request) ToSetRetrial(stream resp.AllReadCloser) *Request
- func (req *Request) Validate(final bool) bool
- type RequestGroup
- type Response
- func (rsp *Response) CancelFlush()
- func (rsp *Response) Close()
- func (rsp *Response) Flush() error
- func (rsp *Response) IsAbandon() bool
- func (rsp *Response) IsCached() (stored int64, full bool, cached bool)
- func (rsp *Response) OnFinalize(finalizer ResponseFinalizer)
- func (rsp *Response) PrepareForGet(w resp.ResponseWriter, seq int64)
- func (rsp *Response) PrepareForSet(w resp.ResponseWriter, seq int64)
- func (rsp *Response) Request() *Request
- func (rsp *Response) Response() interface{}
- func (rsp *Response) SetBodyStream(stream resp.AllReadCloser)
- func (rsp *Response) String() string
- func (rsp *Response) Wait()
- func (rsp *Response) WaitFlush(ctxCancelable bool) error
- type ResponseFinalizer
- type ScaleEvent
- type ServerStats
- type StatsIterator
Constants ¶
const ( REQUEST_INVOKED = 0 REQUEST_RETURNED = 1 REQUEST_RESPONDED = 2 CHANGE_PLACEMENT = 0x0001 MAX_ATTEMPTS = 3 DEBUG_INITPROMISE_WAIT = 0x0001 )
Variables ¶
var ( ErrInvalidChunkSize = errors.New("invalid chunk size") ErrUnimplemented = errors.New("unimplemented") ErrRequestFailure = errors.New("request failed") ErrStoredTwice = errors.New("chunk stored twice") ErrChunkClosed = errors.New("chunk closed") ErrChunkStoreFailed = errors.New("failed to cache chunk") ErrUnexpectedClose = errors.New("chunk unexpected closed") CtxKeyRequest = cacheCtxKey("request") )
var ( ErrStreamingReq = errors.New("can not retry a streaming request") ErrMaxAttemptsReached = errors.New("max attempts reached") ErrResponded = errors.New("responded") ErrNoClient = errors.New("no client set") ErrNotSuppport = errors.New("not support") PlaceholderResponse = &response{} )
var (
ErrControlCmd = errors.New("will not retry control command")
)
var ErrNoSpareDeployment = errors.New("no spare deployment")
Functions ¶
This section is empty.
Types ¶
type ClusterStats ¶
type ClusterStats interface { InstanceLen() int InstanceStats(int) InstanceStats AllInstancesStats() Iterator InstanceStatsFromIterator(Iterator) (int, InstanceStats) MetaStats() MetaStoreStats }
type Command ¶
type Command interface { Name() string String() string GetInfo() interface{} GetRequest() *Request // MarkError Mark an failure attempt to send request, return attempts left. MarkError(error) int // LastError Get last failure attempt, return attempts left and last error. LastError() (int, error) // FailureError Get error for failure (no attempt left.) FailureError() error Flush() error }
type Control ¶
type Control struct { Cmd string Addr string Deployment string Id uint64 Info interface{} Payload []byte *Request Callback ControlCallback // contains filtered or unexported fields }
func (*Control) FailureError ¶
func (*Control) GetRequest ¶
func (*Control) PrepareForData ¶
func (*Control) PrepareForDel ¶
func (*Control) PrepareForMigrate ¶
func (*Control) PrepareForRecover ¶
type ControlCallback ¶
type ControlCallback func(*Control, interface{})
type GroupedClusterStats ¶
type GroupedClusterStats interface { ClusterLen() int ClusterStats(int) ClusterStats AllClustersStats() Iterator ClusterStatsFromIterator(Iterator) (int, ClusterStats) MetaStats() MetaStoreStats }
type InstanceOccupancyMode ¶
type InstanceOccupancyMode int
const ( InstanceOccupancyMain InstanceOccupancyMode = iota InstanceOccupancyModified InstanceOccupancyMax InstanceOccupancyDisabled InstanceOccupancyMod )
func (InstanceOccupancyMode) String ¶
func (iom InstanceOccupancyMode) String() string
type InstanceStats ¶
type InstanceStats interface { Status() uint64 Occupancy(InstanceOccupancyMode) float64 }
type LambdaDeployment ¶
type MetaStoreStats ¶
type MetaStoreStats interface {
Len() int
}
type MigrationScheduler ¶
type PersistCache ¶
type PersistCache interface { // Len returns the number of chunks in the cache. Len() int // Get returns a PersistChunk instance by specified key. // This call is read-after-write safe because the key(chunk key) will only // be available to get request after an earlier write request was finished. GetOrCreate(key string, size int64) (chunk PersistChunk, first bool) // Get will return a existed PersistChunk, nil if not found. Get(key string) PersistChunk // Restore restores the cache from local storage. Restore() error // Report outputs the cache status. Report() }
PersistCache offers API to cache data temporarily for persistent support and request batching. For persistent support, chunk to be written will 1. Stores in the PersistCache and persists to local storage before writing to SMS. 2. Once the chunk is written to SMS, the request will return without waiting for the chunk to be persisted to the COS. 3. If persisting to COS fails, the chunk will be loaded from PersistCache and retry persisting again. 4. If the chunk is requested before persisting to COS for scaling purpose, the chunk will be served from PersistCache. 5. After persisted to COS, the chunk will be removed from PersistCache. 6. On proxy failure, all chunks stored in PersistCache will be restored from local storage. For request batching, concurrent chunk requests will be merged into one request as: 1. The first request will load chunk from SMS and store it in PersistCache. 2. The following requests will load chunk from PersistCache.
type PersistChunk ¶
type PersistChunk interface { redeo.Contextable PersistChunkForResponse // Key returns the key of the chunk. Key() string // Size returns the size of the chunk. Size() int64 // Store stores the chunk by intercepting a stream. Store(resp.AllReadCloser) (resp.AllReadCloser, error) // GetInterceptor returns the interceptor that returns on calling Store(). GetInterceptor() resp.AllReadCloser // WaitStored waits for the chunk to be stored or error occurred. WaitStored() error // Load loads the data by returning a stream. Load(context.Context) (resp.AllReadCloser, error) // LoadAll loads the data by returning the fully loaded data, wait if not fully loaded. LoadAll(context.Context) ([]byte, error) // StartPersist instructs the chunk to avoid from being closed before persisted to COS. StartPersist(req interface{}, timeout time.Duration, retry PersistRetrier) // DonePersist instructs the chunk persistencing has concluded, either success or failure. DonePersist() // Error returns the error occurred during storing chunk. Error() error // Close closes the chunk to be removed from the cache. Close() // Close closes the chunk with specified error. CloseWithError(err error) // CanClose returns true if there is no Load() pending. CanClose() bool // IsClosed returns true if the chunk is closed. IsClosed() bool }
PersistChunk offers API for a abstract chunk to support persisting.
type PersistChunkForResponse ¶
type PersistRetrier ¶
type PersistRetrier func(PersistChunk)
type ProxyResponse ¶
type ProxyResponse interface { redeo.Contextable Request() *Request Response() interface{} }
type Request ¶
type Request struct { Seq int64 Id Id InsId uint64 // Instance the request targeted. Cmd string Key string RetCommand string BodySize int64 Body []byte BodyStream resp.AllReadCloser Info interface{} Changes int CollectorEntry interface{} Option int64 RequestGroup RequestGroup PersistChunk PersistChunk PredictedDue time.Duration // Added by Tianium 20221102 AllDone bool // Is sharded request done after chunk request AllSucceeded bool // Is sharded request succeeded after chunk request // contains filtered or unexported fields }
func GetRequest ¶
func (*Request) FailureError ¶
func (*Request) GetRequest ¶
func (*Request) InitPromise ¶
InitPromise initialize a promise for later use. NOTE: This function is not thread-safe
func (*Request) IsResponse ¶
func (*Request) IsReturnd ¶
IsReturnd indicates if a response is received, especially as one of the request set. The content of the response may not available now and can change depends on the status of request set. (e.g. first-d optimization)
func (*Request) MarkError ¶
MarkError updates the error of the request and returns the remaining attempts. Error specified can be nil to reset the error.
func (*Request) MarkReturned ¶
MarkReturned marks the request as returned/response received and returns if this is the first time to mark.
func (*Request) MustRequest ¶
MustRequest indicates that the request must be sent to the Lambda.
func (*Request) PrepareForDel ¶
func (*Request) PrepareForGet ¶
func (*Request) PrepareForRecover ¶
func (*Request) PrepareForSet ¶
func (*Request) ResponseTimeout ¶
func (*Request) SetErrorResponse ¶
func (*Request) SetResponse ¶
func (*Request) ToAbandonGetResponse ¶
func (*Request) ToCachedResponse ¶
func (req *Request) ToCachedResponse(cached PersistChunkForResponse) *Response
func (*Request) ToConcurrentSetResponse ¶
func (*Request) ToSetRetrial ¶
func (req *Request) ToSetRetrial(stream resp.AllReadCloser) *Request
type RequestGroup ¶
type Response ¶
type Response struct { util.Closer protocol.Contextable Id Id Cmd string Size string Body []byte Status int64 // Customized status. For GET: 1 - recovered // contains filtered or unexported fields }
func NewResponse ¶
func (*Response) CancelFlush ¶
func (rsp *Response) CancelFlush()
func (*Response) OnFinalize ¶
func (rsp *Response) OnFinalize(finalizer ResponseFinalizer)
func (*Response) PrepareForGet ¶
func (rsp *Response) PrepareForGet(w resp.ResponseWriter, seq int64)
func (*Response) PrepareForSet ¶
func (rsp *Response) PrepareForSet(w resp.ResponseWriter, seq int64)
func (*Response) SetBodyStream ¶
func (rsp *Response) SetBodyStream(stream resp.AllReadCloser)
func (*Response) Wait ¶
func (rsp *Response) Wait()
Close will block and wait for the stream to be flushed. Don't clean any fields if it can't be blocked until flushed.
type ResponseFinalizer ¶
type ResponseFinalizer func(*Response)
type ScaleEvent ¶
type ScaleEvent struct { // BaseInstance Instance that triggers the scaling event. BaseInstance interface{} // ScaleTarget The number of instances to scale. ScaleTarget int // Scaled A promise object that can be used to wait for the completion of the scaling event. Scaled promise.Promise // Retire If there is insufficient space in BaseInstance, set to true to retire it. Retire bool // Reason for logging. Reason string }
func (*ScaleEvent) SetError ¶
func (evt *ScaleEvent) SetError(err error)
func (*ScaleEvent) SetScaled ¶
func (evt *ScaleEvent) SetScaled()
type ServerStats ¶
type ServerStats interface {
PersistCacheLen() int
}
type StatsIterator ¶
type StatsIterator struct {
// contains filtered or unexported fields
}
func NewStatsIterator ¶
func NewStatsIterator(arr interface{}, len int) *StatsIterator
func (*StatsIterator) Len ¶
func (iter *StatsIterator) Len() int
func (*StatsIterator) Next ¶
func (iter *StatsIterator) Next() bool
func (*StatsIterator) Value ¶
func (iter *StatsIterator) Value() (int, interface{})