Versions in this module Expand all Collapse all v1 v1.4.1 Feb 27, 2023 Changes in this version + const ActiveLinks + const BACKING_DISABLED + const BACKING_ENABLED + const BACKING_FORBID + const BACKING_RESERVED + const ConnectionClosed + const ConnectionClosing + const ConnectionOpen + const DESCRIPTION_ACTIVATING + const DESCRIPTION_ACTIVE + const DESCRIPTION_CLOSED + const DESCRIPTION_MAYBE + const DESCRIPTION_SLEEPING + const DESCRIPTION_UNDEFINED + const DESCRIPTION_UNSTARTED + const DISPATCH_OPT_BUSY_CHECK + const DISPATCH_OPT_RELOCATED + const FAILURE_MAX_QUEUE_REACHED + const INSTANCE_ACTIVATING + const INSTANCE_ACTIVE + const INSTANCE_BACKING + const INSTANCE_CLOSED + const INSTANCE_MASK_STATUS_BACKING + const INSTANCE_MASK_STATUS_CONNECTION + const INSTANCE_MASK_STATUS_FAILURE + const INSTANCE_MASK_STATUS_LIFECYCLE + const INSTANCE_MASK_STATUS_START + const INSTANCE_MAYBE + const INSTANCE_RECOVERING + const INSTANCE_RUNNING + const INSTANCE_SHADOW + const INSTANCE_SLEEPING + const INSTANCE_UNSTARTED + const IN_CONCURRENCY + const LinkBucketSize + const MAX_CONCURRENCY + const NUM_REQUEST_MASK + const NUM_REQUEST_UNIT + const NUM_WRITE_REQUEST_BITS + const NUM_WRITE_REQUEST_MASK + const NUM_WRITE_REQUEST_UNIT + const OUT_CONCURRENCY + const PHASE_ACTIVE + const PHASE_BACKING_ONLY + const PHASE_EXPIRED + const PHASE_RECLAIMED + const TEMP_MAP_SIZE + const UnlimitedActiveLinks + var AwsSession = awsSession.Must(awsSession.NewSessionWithOptions(awsSession.Options{ ... })) + var BackoffFactor = 2 + var CM ClusterManager + var DefaultConnectTimeout = 1000 * time.Millisecond + var DefaultPingPayload = []byte + var ErrCapacityExceeded = errors.New("capacity exceeded") + var ErrConnectionClosed = errors.New("connection closed") + var ErrDuplicatedSession = errors.New("session has started") + var ErrInstanceBusy = errors.New("instance busy") + var ErrInstanceClosed = errors.New("instance closed") + var ErrInstanceReclaimed = errors.New("instance reclaimed") + var ErrInstanceRecovering = errors.New("instance is recovering") + var ErrInstanceSleeping = errors.New("instance is sleeping") + var ErrInstanceValidated = errors.New("instance has been validated by another connection") + var ErrLinkManagerReset = &LambdaError + var ErrLinkRequestClosed = &LambdaError + var ErrLinkRequestTimeout = &LambdaError + var ErrMissingRequest = errors.New("missing request") + var ErrMissingResponse = errors.New("missing response") + var ErrNilLink = &LambdaError + var ErrNotCtrlLink = errors.New("not control link") + var ErrQueueTimeout = &LambdaError + var ErrRelocationFailed = errors.New("relocation failed") + var ErrReservationFailed = errors.New("reservation failed") + var ErrUnexpectedCommand = errors.New("unexpected command") + var ErrUnexpectedSendRequest = errors.New("unexpected SendRequest call") + var ErrUnexpectedType = errors.New("unexpected type") + var ErrUnknown = errors.New("unknown error") + var ErrValidationTimeout = &LambdaError + var ErrWarmupReturn = errors.New("return from warmup") + var MaxConnectTimeout = 1 * time.Second + var MaxControlRequestSize = int64(200000) + var MaxValidationFailure = 3 + var MinValidationInterval = RTT + var PromisedGoodDue = 1 * time.Second + var RTT = 20 * time.Millisecond + var TriggerTimeout = 1 * time.Second + var UnitTestMTC1 = false + var WarmTimeout = config.InstanceWarmTimeout + func IsLambdaTimeout(err error) bool + type AvailableLink struct + func (l *AvailableLink) Close() + func (l *AvailableLink) Closed() <-chan struct{} + func (l *AvailableLink) Error() error + func (l *AvailableLink) Request() chan<- *types.Request + func (l *AvailableLink) SetTimeout(d time.Duration) + type AvailableLinks struct + func (l *AvailableLinks) AddAvailable(link manageableLink, nolimit bool) bool + func (l *AvailableLinks) GetRequestPipe() *AvailableLink + func (l *AvailableLinks) Len() int + func (l *AvailableLinks) OffsetLimit(offset int, max int) int + func (l *AvailableLinks) Reset() + func (l *AvailableLinks) SetLimit(limit int) int + type Backer interface + ReserveBacking func() error + StartBacking func(*Instance, int, int) bool + StopBacking func(*Instance) + type BackerGetter interface + type BackupIterator struct + func (iter *BackupIterator) Len() int + func (iter *BackupIterator) Next() bool + func (iter *BackupIterator) Value() (int, interface{}) + type Backups struct + func NewBackups(ins *Instance, backups []Backer) *Backups + func NewBackupsFromInstances(ins *Instance, backups []*Instance, adapter BackerGetter) *Backups + func (b *Backups) Availables() int + func (b *Backups) GetByHash(hash uint64) (*Instance, bool) + func (b *Backups) GetByKey(key string) (*Instance, bool) + func (b *Backups) GetByLocation(loc int, required int) (*Instance, bool) + func (b *Backups) Invalidate() + func (b *Backups) Iter() *BackupIterator + func (b *Backups) Len() int + func (b *Backups) Locator() *protocol.BackupLocator + func (b *Backups) Reserve(fallback mapreduce.Iterator) int + func (b *Backups) ResetCandidates(required int, candidates []*Instance) + func (b *Backups) Start(target *Instance) int + func (b *Backups) StartByIndex(i int, target *Instance) (*Instance, bool) + func (b *Backups) Stop(target *Instance) + type CandidateProvider func() *Instance + type CandidateQueue struct + func NewCandidateQueue(bufsize int, provider CandidatesProvider) *CandidateQueue + func (q *CandidateQueue) Candidates() <-chan *Instance + func (q *CandidateQueue) Close() + func (q *CandidateQueue) Start() bool + type CandidatesProvider interface + LoadCandidates func(*CandidateQueue, []*Instance) int + type ClusterManager interface + type Connection struct + Id uint32 + func NewConnection(cn net.Conn) *Connection + func (conn *Connection) BindInstance(ins *Instance) *Connection + func (conn *Connection) ClearResponses() + func (conn *Connection) Close() error + func (conn *Connection) CloseAndWait() error + func (conn *Connection) CloseWithReason(reason string, block bool) error + func (conn *Connection) IsClosed() bool + func (conn *Connection) IsSameWorker(another *Connection) bool + func (conn *Connection) SendControl(ctrl *types.Control) error + func (conn *Connection) SendPing(payload []byte) error + func (conn *Connection) SendRequest(req *types.Request, args ...interface{}) error + func (conn *Connection) ServeLambda() + func (conn *Connection) String() string + func (conn *Connection) Writer() *resp.RequestWriter + type DefaultInstanceEnumerator struct + func NewInstanceEnumerator(instances []*Instance) *DefaultInstanceEnumerator + func (enum *DefaultInstanceEnumerator) Instance(i int) *Instance + type Delegate struct + func (ins *Delegate) StartBacking(deleIns *Instance, bakId int, total int) bool + type DelegateBackerAdapter struct + type Deployment struct + Block int + func NewDeployment(name string, id uint64) *Deployment + func (d *Deployment) Id() uint64 + func (d *Deployment) Name() string + type Instance struct + func NewInstance(name string, id uint64) *Instance + func NewInstanceFromDeployment(dp *Deployment, id uint64) *Instance + func (ins *Instance) AbandonLambda() + func (ins *Instance) AssignBackups(numBak int, candidates []*Instance) + func (ins *Instance) Close() + func (ins *Instance) CollectData() + func (ins *Instance) Degrade() + func (ins *Instance) Description() string + func (ins *Instance) Dispatch(cmd types.Command) error + func (ins *Instance) DispatchWithOptions(cmd types.Command, opts int) error + func (ins *Instance) Expire() + func (ins *Instance) FlagDataCollected(ok string) + func (ins *Instance) ForbidBacking() bool + func (ins *Instance) GetShadowInstance() *Instance + func (ins *Instance) HandleRequests() + func (ins *Instance) IsActive() bool + func (ins *Instance) IsBacking(includingPrepare bool) bool + func (ins *Instance) IsBusy(cmd types.Command) (uint64, bool) + func (ins *Instance) IsClosed() bool + func (ins *Instance) IsReclaimed() bool + func (ins *Instance) IsRecovering() bool + func (ins *Instance) Migrate() error + func (ins *Instance) Occupancy(mode types.InstanceOccupancyMode) float64 + func (ins *Instance) Phase() uint32 + func (ins *Instance) ReserveBacking() error + func (ins *Instance) ResetDue(delay bool, reason string) + func (ins *Instance) ResumeServing() + func (ins *Instance) SetDue(due int64, delay bool, reason string, args ...interface{}) + func (ins *Instance) StartBacking(bakIns *Instance, bakId int, total int) bool + func (ins *Instance) StartDelegation() int + func (ins *Instance) StartRecovery() int + func (ins *Instance) Status() uint64 + func (ins *Instance) StatusDescription() string + func (ins *Instance) StopBacking(bakIns *Instance) + func (ins *Instance) String() string + func (ins *Instance) TryFlagValidated(conn *Connection, sid string, flags int64) (*Connection, time.Duration, error) + func (ins *Instance) Validate(opts ...*ValidateOption) (*Connection, time.Duration, error) + func (ins *Instance) WarmUp() + type InstanceEnumerator interface + Instance func(i int) *Instance + type InstanceManager interface + GetBackupCandidates func() mapreduce.Iterator + GetDelegates func() []*Instance + GetPersistCache func() types.PersistCache + GetServePort func(uint64) int + Instance func(uint64) *Instance + Recycle func(types.LambdaDeployment) error + type LambdaError struct + func (e *LambdaError) IsTimeout() bool + type LambdaErrorType int + const LambdaErrorTimeout + const LambdaErrorUncategoried + type LinkBucket struct + func (b *LinkBucket) Reset() + type LinkManager struct + func NewLinkManager(ins *Instance) *LinkManager + func (m *LinkManager) AddDataLink(link *Connection) bool + func (m *LinkManager) Close() + func (m *LinkManager) DataLinks() *AvailableLinks + func (m *LinkManager) FlagAvailableForRequest(link *Connection) bool + func (m *LinkManager) GetAvailableForRequest() *AvailableLink + func (m *LinkManager) GetControl() *Connection + func (m *LinkManager) GetLastControl() *Connection + func (m *LinkManager) InvalidateControl(link manageableLink) + func (m *LinkManager) RemoveDataLink(link *Connection) + func (m *LinkManager) Reset() + func (m *LinkManager) SetControl(link *Connection) + func (m *LinkManager) SetMaxActiveDataLinks(num int) + type Meta struct + Capacity uint64 + DiffRank float64 + Hash string + SnapshotSize uint64 + SnapshotTerm uint64 + SnapshotUpdates uint64 + Stale bool + Term uint64 + Updates uint64 + func (m *Meta) AddChunk(key string, sz int64) (num int, size uint64) + func (m *Meta) DecreaseSize(dec int64) uint64 + func (m *Meta) EffectiveCapacity() uint64 + func (m *Meta) FromProtocolMeta(meta *protocol.Meta) (bool, error) + func (m *Meta) IncreaseSize(inc int64) uint64 + func (m *Meta) ModifiedOccupancy(adjustment uint64) float64 + func (m *Meta) NumChunks() int + func (m *Meta) Reconcile(meta *protocol.ShortMeta) + func (m *Meta) RemoveChunk(key string, sz int64) (num int, size uint64) + func (m *Meta) ReservedCapacity() uint64 + func (m *Meta) ResetCapacity(capacity uint64, effective uint64) + func (m *Meta) Size() uint64 + func (m *Meta) ToBackupPayload(id uint64, key int, total int, maxChunkSize uint64) ([]byte, error) + func (m *Meta) ToDelegatePayload(id uint64, key int, total int, maxChunkSize uint64) (*protocol.Meta, []byte, error) + func (m *Meta) ToPayload(id uint64) ([]byte, error) + func (m *Meta) ToProtocolMeta(id uint64) *protocol.Meta + type Relocator interface + Relocate func(interface{}, int, types.Command) (*Instance, error) + TryRelocate func(interface{}, int, types.Command) (*Instance, bool, error) + type ValidateOption struct + Command types.Command + Error error + Notifier chan struct{} + Validated *Connection + WarmUp bool