types

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2022 License: MIT Imports: 7 Imported by: 6

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrDuplicateSubs = errors.New("duplicate subscriber")
	ErrNoSubs        = errors.New("subscriber does not exist")
	ErrNoData        = errors.New("no data in message")
	ErrBadParser     = errors.New("bad parser")

	DefaultSubsChSize = 10
)
View Source
var (
	// ErrReplicaStoreFull is returned when more than the intended number of replicas register with the scheduler tool
	ErrReplicaStoreFull = errors.New("replica store is full")
)

Functions

This section is empty.

Types

type AnnotatedEventDAG

type AnnotatedEventDAG struct {
	// contains filtered or unexported fields
}

func NewAnnotatedEventDag

func NewAnnotatedEventDag(d *EventDAG) *AnnotatedEventDAG

func (*AnnotatedEventDAG) AddEvent

func (a *AnnotatedEventDAG) AddEvent(e *Event, parents []*Event)

func (*AnnotatedEventDAG) Check

func (a *AnnotatedEventDAG) Check() bool

func (*AnnotatedEventDAG) SetFrameOfReference

func (a *AnnotatedEventDAG) SetFrameOfReference(e *Event) bool

type BaseService

type BaseService struct {
	Logger *log.Logger
	// contains filtered or unexported fields
}

BaseService provides the basic nuts an bolts needed to implement a service

func NewBaseService

func NewBaseService(name string, parentLogger *log.Logger) *BaseService

NewBaseService instantiates BaseService

func (*BaseService) Name

func (b *BaseService) Name() string

Name returns the name of the service

func (*BaseService) QuitCh

func (b *BaseService) QuitCh() <-chan struct{}

QuitCh returns the quit channel which will be closed when the service stops running

func (*BaseService) Running

func (b *BaseService) Running() bool

Running returns the flag

func (*BaseService) StartRunning

func (b *BaseService) StartRunning()

StartRunning is called to set the running flag

func (*BaseService) StopRunning

func (b *BaseService) StopRunning()

StopRunning is called to unset the running flag

type ClockValue

type ClockValue []float64

func (ClockValue) Eq

func (c ClockValue) Eq(other ClockValue) bool

func (ClockValue) Lt

func (c ClockValue) Lt(other ClockValue) bool

type Clonable

type Clonable interface {
	Clone() Clonable
}

Clonable is any type which returns a copy of itself on Clone()

type Event

type Event struct {
	// Replica at which the event occurs
	Replica ReplicaID `json:"replica"`
	// Type of the event
	Type EventType `json:"-"`
	// TypeS is the string representation of the event
	TypeS string `json:"type"`
	// ID unique identifier assigned for every new event
	ID uint64 `json:"id"`
	// Timestamp of the event
	Timestamp int64 `json:"timestamp"`
	// Vector clock value of the event
	ClockValue ClockValue
}

Event is a generic event that occurs at a replica

func NewEvent

func NewEvent(replica ReplicaID, t EventType, ts string, id uint64, time int64) *Event

func (*Event) Clone

func (e *Event) Clone() Clonable

Clone implements Clonable

func (*Event) IsMessageReceive

func (e *Event) IsMessageReceive() bool

func (*Event) IsMessageSend

func (e *Event) IsMessageSend() bool

func (*Event) IsTimeoutEnd

func (e *Event) IsTimeoutEnd() bool

func (*Event) IsTimeoutStart

func (e *Event) IsTimeoutStart() bool

func (*Event) Lt

func (e *Event) Lt(other *Event) bool

Returns true if the current event is less than `other`

func (*Event) MessageID

func (e *Event) MessageID() (string, bool)

func (*Event) Timeout

func (e *Event) Timeout() (*ReplicaTimeout, bool)

type EventDAG

type EventDAG struct {
	// contains filtered or unexported fields
}

func NewEventDag

func NewEventDag() *EventDAG

func (*EventDAG) AddDirty

func (d *EventDAG) AddDirty(e *Event, parents []*Event)

func (*EventDAG) AddNode

func (d *EventDAG) AddNode(e *Event, parents []*Event)

func (*EventDAG) Clean

func (d *EventDAG) Clean()

func (*EventDAG) Clone

func (d *EventDAG) Clone() *EventDAG

func (*EventDAG) GetLatestNode

func (d *EventDAG) GetLatestNode(e *Event) (*Event, bool)

func (*EventDAG) GetNode

func (d *EventDAG) GetNode(eid uint64) (*EventNode, bool)

func (*EventDAG) GetReceiveNode

func (d *EventDAG) GetReceiveNode(e *Event) (*Event, bool)

func (*EventDAG) GetSendNode

func (d *EventDAG) GetSendNode(e *Event) (*Event, bool)

func (*EventDAG) GetTimeoutEnd

func (d *EventDAG) GetTimeoutEnd(e *Event) (*Event, bool)

func (*EventDAG) GetTimeoutStart

func (d *EventDAG) GetTimeoutStart(e *Event) (*Event, bool)

func (*EventDAG) MarshalJSON

func (d *EventDAG) MarshalJSON() ([]byte, error)

type EventNode

type EventNode struct {
	Event *Event `json:"event"`

	Parents  *EventNodeSet `json:"parents"`
	Children *EventNodeSet `json:"children"`
	// contains filtered or unexported fields
}

func NewEventNode

func NewEventNode(e *Event) *EventNode

func (*EventNode) AddParents

func (n *EventNode) AddParents(parents []*EventNode)

func (*EventNode) Clone

func (n *EventNode) Clone() *EventNode

func (*EventNode) GetNext

func (n *EventNode) GetNext() uint64

func (*EventNode) GetPrev

func (n *EventNode) GetPrev() uint64

func (*EventNode) IsDirty

func (n *EventNode) IsDirty() bool

func (*EventNode) MarkClean

func (n *EventNode) MarkClean()

func (*EventNode) MarkDirty

func (n *EventNode) MarkDirty()

func (*EventNode) SetNext

func (n *EventNode) SetNext(next uint64)

func (*EventNode) SetPrev

func (n *EventNode) SetPrev(prev uint64)

type EventNodeSet

type EventNodeSet struct {
	// contains filtered or unexported fields
}

func NewEventNodeSet

func NewEventNodeSet() *EventNodeSet

func (*EventNodeSet) Add

func (d *EventNodeSet) Add(nid uint64)

func (*EventNodeSet) Clone

func (d *EventNodeSet) Clone() *EventNodeSet

func (*EventNodeSet) Exists

func (d *EventNodeSet) Exists(nid uint64) bool

func (*EventNodeSet) Iter

func (d *EventNodeSet) Iter() []uint64

func (*EventNodeSet) MarshalJSON

func (d *EventNodeSet) MarshalJSON() ([]byte, error)

func (*EventNodeSet) Size

func (d *EventNodeSet) Size() int

type EventNodeTag

type EventNodeTag struct {
	Event *Event
	Type  EventNodeTagType
	Min   *IntW
	Max   *IntW
}

func NewEventNodeTag

func NewEventNodeTag(e *Event) *EventNodeTag

type EventNodeTagType

type EventNodeTagType = string
var (
	MessageSend    EventNodeTagType = "MessageSend"
	MessageReceive EventNodeTagType = "MessageReceive"
	TimeoutStart   EventNodeTagType = "TimeoutStart"
	TimeoutEnd     EventNodeTagType = "TimeoutEnd"
	Other          EventNodeTagType = "Other"
)

type EventQueue

type EventQueue struct {
	*BaseService
	// contains filtered or unexported fields
}

EventQueue datastructure to store the messages in a FIFO queue

func NewEventQueue

func NewEventQueue(logger *log.Logger) *EventQueue

NewEventQueue returns an empty EventQueue

func (*EventQueue) Add

func (q *EventQueue) Add(m *Event)

Add adds a message to the queue

func (*EventQueue) Flush

func (q *EventQueue) Flush()

Flush clears the queue of all messages

func (*EventQueue) Restart

func (q *EventQueue) Restart() error

Restart implements Service

func (*EventQueue) Start

func (q *EventQueue) Start() error

Start implements Service

func (*EventQueue) Stop

func (q *EventQueue) Stop() error

Stop implements Service

func (*EventQueue) Subscribe

func (q *EventQueue) Subscribe(label string) chan *Event

Subscribe creates and returns a channel for the subscriber with the given label

type EventType

type EventType interface {
	// Clone copies the event type
	Clone() EventType
	// Type is a unique key for that event type
	Type() string
	// String should return a string representation of the event type
	String() string
}

EventType abstract type for representing different types of events

type GenericEventType

type GenericEventType struct {
	// Marshalled parameters
	Params map[string]string `json:"params"`
	// Type of event for reference
	// Eg: Commit
	T string `json:"type"`
}

GenericEventType is the event type published by a replica It can be specific to the algorithm that is implemented

func NewGenericEventType

func NewGenericEventType(params map[string]string, t string) *GenericEventType

NewGenericEventType instantiates GenericEventType

func (*GenericEventType) Clone

func (g *GenericEventType) Clone() EventType

Clone returns a copy of the current GenericEventType

func (*GenericEventType) String

func (g *GenericEventType) String() string

String returns a string representation of the event type

func (*GenericEventType) Type

func (g *GenericEventType) Type() string

Type returns a unique key for GenericEventType

type GlobalClock

type GlobalClock struct {
	// contains filtered or unexported fields
}

func NewGlobalClock

func NewGlobalClock(dag *EventDAG, messageStore *MessageStore) *GlobalClock

type IntW

type IntW struct {
	// contains filtered or unexported fields
}

func NewIntW

func NewIntW() *IntW

func (*IntW) Set

func (i *IntW) Set(v int)

func (*IntW) Undefined

func (i *IntW) Undefined() bool

type Message

type Message struct {
	From          ReplicaID     `json:"from"`
	To            ReplicaID     `json:"to"`
	Data          []byte        `json:"data"`
	Type          string        `json:"type"`
	ID            string        `json:"id"`
	Intercept     bool          `json:"intercept"`
	ParsedMessage ParsedMessage `json:"-"`
	Repr          string        `json:"repr"`
}

Message stores a message that has been interecepted between two replicas

func (*Message) Clone

func (m *Message) Clone() *Message

Clone to create a new Message object with the same attributes

func (*Message) Parse

func (m *Message) Parse(parser MessageParser) error

type MessageParser

type MessageParser interface {
	Parse([]byte) (ParsedMessage, error)
}

type MessageQueue

type MessageQueue struct {
	*BaseService
	// contains filtered or unexported fields
}

MessageQueue datastructure to store the messages in a FIFO queue

func NewMessageQueue

func NewMessageQueue(logger *log.Logger) *MessageQueue

NewMessageQueue returns an empty MessageQueue

func (*MessageQueue) Add

func (q *MessageQueue) Add(m *Message)

Add adds a message to the queue

func (*MessageQueue) Disable

func (q *MessageQueue) Disable()

Disable closes the queue and drops all incoming messages Use with caution

func (*MessageQueue) Enable

func (q *MessageQueue) Enable()

Enable enqueues the messages and feeds it to the subscribers if any

func (*MessageQueue) Flush

func (q *MessageQueue) Flush()

Flush clears the queue of all messages

func (*MessageQueue) Pop

func (q *MessageQueue) Pop() (*Message, bool)

func (*MessageQueue) Restart

func (q *MessageQueue) Restart() error

Restart implements Service

func (*MessageQueue) Start

func (q *MessageQueue) Start() error

Start implements Service

func (*MessageQueue) Stop

func (q *MessageQueue) Stop() error

Stop implements Service

func (*MessageQueue) Subscribe

func (q *MessageQueue) Subscribe(label string) chan *Message

Subscribe create and returns a channel for the subscriber with the specified label

type MessageReceiveEventType

type MessageReceiveEventType struct {
	// MessageID is the ID of the message received
	MessageID string
}

MessageReceiveEventType is the event type when a replica receives a message

func NewMessageReceiveEventType

func NewMessageReceiveEventType(messageID string) *MessageReceiveEventType

NewMessageReceiveEventType instantiates MessageReceiveEventType

func (*MessageReceiveEventType) Clone

Clone returns a copy of the current MessageReceiveEventType

func (*MessageReceiveEventType) String

func (r *MessageReceiveEventType) String() string

String returns a string representation of the event type

func (*MessageReceiveEventType) Type

func (r *MessageReceiveEventType) Type() string

Type returns a unique key for MessageReceiveEventType

type MessageSendEventType

type MessageSendEventType struct {
	// MessageID of the message that was sent
	MessageID string
}

MessageSendEventType is the event type where a message is sent from the replica

func NewMessageSendEventType

func NewMessageSendEventType(messageID string) *MessageSendEventType

NewMessageSendEventType instantiates MessageSendEventType

func (*MessageSendEventType) Clone

func (s *MessageSendEventType) Clone() EventType

Clone returns a copy of the current MessageSendEventType

func (*MessageSendEventType) String

func (s *MessageSendEventType) String() string

String returns a string representation of the event type

func (*MessageSendEventType) Type

func (s *MessageSendEventType) Type() string

Type returns a unique key for MessageSendEventType

type MessageStore

type MessageStore struct {
	// contains filtered or unexported fields
}

MessageStore to store the messages. Thread safe

func NewMessageStore

func NewMessageStore() *MessageStore

NewMessageStore creates a empty MessageStore

func (*MessageStore) Add

func (s *MessageStore) Add(m *Message) *Message

Add adds a message to the store Returns any old message with the same ID if it exists or nil if not

func (*MessageStore) Exists

func (s *MessageStore) Exists(id string) bool

Exists returns true if the message exists

func (*MessageStore) Get

func (s *MessageStore) Get(id string) (*Message, bool)

Get returns a message and bool indicating if the message exists

func (*MessageStore) Iter

func (s *MessageStore) Iter() []*Message

Iter returns a list of all the messages in the store

func (*MessageStore) Remove

func (s *MessageStore) Remove(id string) *Message

Remove returns and deleted the message from the store if it exists. Returns nil otherwise

func (*MessageStore) RemoveAll

func (s *MessageStore) RemoveAll()

RemoveAll empties the message store

func (*MessageStore) Size

func (s *MessageStore) Size() int

Size returns the size of the message store

type ParsedMessage

type ParsedMessage interface {
	String() string
	Clone() ParsedMessage
	Marshal() ([]byte, error)
}

type Replica

type Replica struct {
	ID    ReplicaID              `json:"id"`
	Ready bool                   `json:"ready"`
	Info  map[string]interface{} `json:"info"`
	Addr  string                 `json:"addr"`
}

Replica immutable representation of the attributes of a replica

type ReplicaID

type ReplicaID string

ReplicaID is an identifier for the replica encoded as a string

type ReplicaLog

type ReplicaLog struct {
	// Params is a marhsalled params of the log message
	Params map[string]string `json:"params"`
	// Message is the message that was logged
	Message string `json:"message"`
	// Timestamp of the log
	Timestamp int64 `json:"timestamp"`
	// Replica which posted the log
	Replica ReplicaID `json:"replica"`
}

ReplicaLog encapsulates a log message with the necessary attributes

func (*ReplicaLog) Clone

func (l *ReplicaLog) Clone() Clonable

Clone implements Clonable

type ReplicaLogQueue

type ReplicaLogQueue struct {
	*BaseService
	// contains filtered or unexported fields
}

ReplicaLogQueue is the queue of log messages

func NewReplicaLogQueue

func NewReplicaLogQueue(logger *log.Logger) *ReplicaLogQueue

NewReplicaLogQueue instantiates ReplicaLogQueue

func (*ReplicaLogQueue) Add

func (q *ReplicaLogQueue) Add(log *ReplicaLog)

Add adds to the queue

func (*ReplicaLogQueue) Flush

func (q *ReplicaLogQueue) Flush()

Flush erases the contents of the queue

func (*ReplicaLogQueue) Start

func (q *ReplicaLogQueue) Start()

Start implements Service

func (*ReplicaLogQueue) Stop

func (q *ReplicaLogQueue) Stop()

Stop implements Service

func (*ReplicaLogQueue) Subscribe

func (q *ReplicaLogQueue) Subscribe(label string) chan *ReplicaLog

Subscribe creates and returns a channel for the subscriber

type ReplicaLogStore

type ReplicaLogStore struct {
	// contains filtered or unexported fields
}

ReplicaLogStore stores the logs as a map indexed by the replica ID

func NewReplicaLogStore

func NewReplicaLogStore() *ReplicaLogStore

NewReplicaLogStore instantiates a ReplicaLogStore

func (*ReplicaLogStore) Add

func (store *ReplicaLogStore) Add(log *ReplicaLog)

Add adds to the log store

func (*ReplicaLogStore) GetLogs

func (store *ReplicaLogStore) GetLogs(replica ReplicaID, from, to int) ([]*ReplicaLog, int)

GetLogs returns the list of logs for a replica where from <=index<to

func (*ReplicaLogStore) Reset

func (store *ReplicaLogStore) Reset()

type ReplicaState

type ReplicaState struct {
	State     string    `json:"state"`
	Timestamp int64     `json:"timestamp"`
	Replica   ReplicaID `json:"replica"`
}

type ReplicaStateStore

type ReplicaStateStore struct {
	// contains filtered or unexported fields
}

func NewReplicaStateStore

func NewReplicaStateStore() *ReplicaStateStore

type ReplicaStore

type ReplicaStore struct {
	// contains filtered or unexported fields
}

ReplicaStore to store all replica information, thread safe

func NewReplicaStore

func NewReplicaStore(size int) *ReplicaStore

NewReplicaStore creates an empty ReplicaStore

func (*ReplicaStore) Add

func (s *ReplicaStore) Add(p *Replica)

Add adds or updates a replica to the store

func (*ReplicaStore) Cap

func (s *ReplicaStore) Cap() int

Cap returns the set of replicas used for the test

func (*ReplicaStore) Count

func (s *ReplicaStore) Count() int

Count returns the total number of replicas

func (*ReplicaStore) Get

func (s *ReplicaStore) Get(id ReplicaID) (p *Replica, ok bool)

Get returns the replica and a bool indicating if it exists or not

func (*ReplicaStore) Iter

func (s *ReplicaStore) Iter() []*Replica

Iter returns a list of the existing replicas

func (*ReplicaStore) NumReady

func (s *ReplicaStore) NumReady() int

NumReady returns the number of replicas with Ready attribute set to true

func (*ReplicaStore) ResetReady

func (s *ReplicaStore) ResetReady()

ResetReady sets the Ready attribute of all replicas to false

type ReplicaTimeout

type ReplicaTimeout struct {
	Replica  ReplicaID     `json:"replica"`
	Type     string        `json:"type"`
	Duration time.Duration `json:"duration"`
}

func TimeoutFromParams

func TimeoutFromParams(replica ReplicaID, params map[string]string) (*ReplicaTimeout, bool)

func (*ReplicaTimeout) Eq

func (t *ReplicaTimeout) Eq(other *ReplicaTimeout) bool

func (*ReplicaTimeout) MarshalJSON

func (t *ReplicaTimeout) MarshalJSON() ([]byte, error)

type RestartableService

type RestartableService interface {
	Service
	// Restart restarts the service
	Restart() error
}

RestartableService is a service which can be restarted

type Service

type Service interface {
	// Name of the service
	Name() string
	// Start to start the service
	Start() error
	// Running to indicate if the service is running
	Running() bool
	// Stop to stop the service
	Stop() error
	// Quit returns a channel which will be closed once the service stops running
	QuitCh() <-chan struct{}
}

Service is any entity which runs on a separate thread

type Subscriber

type Subscriber struct {
	Ch chan interface{}
}

Generic subscriber to maintain state of the subsciber

type TimeoutContext

type TimeoutContext struct {
	PendingTimeouts map[string]*timeoutWrapper
	PendingReceives map[string]*Event
	// contains filtered or unexported fields
}

func NewTimeoutContext

func NewTimeoutContext(d *EventDAG) *TimeoutContext

func (*TimeoutContext) AddEvent

func (t *TimeoutContext) AddEvent(e *Event)

func (*TimeoutContext) CanDeliverMessages

func (t *TimeoutContext) CanDeliverMessages(messages []*Message)

type TimeoutEndEventType

type TimeoutEndEventType struct {
	Timeout *ReplicaTimeout
}

func NewTimeoutEndEventType

func NewTimeoutEndEventType(timeout *ReplicaTimeout) *TimeoutEndEventType

func (*TimeoutEndEventType) Clone

func (te *TimeoutEndEventType) Clone() EventType

func (*TimeoutEndEventType) String

func (te *TimeoutEndEventType) String() string

func (*TimeoutEndEventType) Type

func (te *TimeoutEndEventType) Type() string

type TimeoutStartEventType

type TimeoutStartEventType struct {
	Timeout *ReplicaTimeout
}

func NewTimeoutStartEventType

func NewTimeoutStartEventType(timeout *ReplicaTimeout) *TimeoutStartEventType

func (*TimeoutStartEventType) Clone

func (ts *TimeoutStartEventType) Clone() EventType

func (*TimeoutStartEventType) String

func (ts *TimeoutStartEventType) String() string

func (*TimeoutStartEventType) Type

func (ts *TimeoutStartEventType) Type() string

type TimeoutStore

type TimeoutStore struct {
	*BaseService
	// contains filtered or unexported fields
}

func NewTimeoutStore

func NewTimeoutStore(logger *log.Logger) *TimeoutStore

func (*TimeoutStore) AddTimeout

func (s *TimeoutStore) AddTimeout(t *ReplicaTimeout)

func (*TimeoutStore) Reset

func (s *TimeoutStore) Reset()

func (*TimeoutStore) Start

func (s *TimeoutStore) Start() error

func (*TimeoutStore) Stop

func (s *TimeoutStore) Stop() error

func (*TimeoutStore) ToDispatch

func (s *TimeoutStore) ToDispatch() []*ReplicaTimeout

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL