smudge

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2017 License: Apache-2.0 Imports: 13 Imported by: 0

README

Smudge

GoDoc Build Status Go Report Card

Introduction

Smudge is a minimalist Go implementation of the SWIM (Scalable Weakly-consistent Infection-style Membership) protocol for cluster node membership, status dissemination, and failure detection developed at Cornell University by Motivala, et al. It isn't a distributed data store in its own right, but rather a framework intended to facilitate the construction of such systems.

Smudge also extends the standard SWIM protocol so that in addition to the standard membership status functionality it also allows the transmission of broadcasts containing a small amount (256 bytes) of arbitrary content to all present healthy members. This maximum is related to the limit imposed on maximum safe UDP packet size by RFC 791 and RFC 2460. We recognize that some systems allow larger packets, however, and although that can risk fragmentation and dropped packets the maximum payload size is configurable.

Smudge was conceived with space-sensitive systems (mobile, IOT, containers) in mind, and therefore was developed with a minimalist philosophy of doing a few things well. As such, its feature set is relatively small and mostly limited to functionality around adding and removing nodes and detecting status changes on the cluster.

Complete documentation is available from the associated Godoc.

Features

  • Uses gossip (i.e., epidemic) protocol for dissemination, the latency of which grows logarithmically with the number of members.
  • Low-bandwidth UDP-based failure detection and status dissemination.
  • Imposes a constant message load per group member, regardless of the number of members.
  • Member status changes are eventually detected by all non-faulty members of the cluster (strong completeness).
  • Supports transmission of short (256 byte) broadcasts that are propagated at most once to all present, healthy members.

Known issues

  • Broadcasts are limited to 256 bytes.
  • No WAN support: only local-network, private IPs are supported.
  • No multicast discovery.
Deviations from Motivala, et al
  • Dead nodes are not immediately removed, but are instead periodically re-tried (with exponential backoff) for a time before finally being removed.
  • Smudge allows the transsion of short, arbitrary-content broadcasts to all healthy nodes.

How to use

To use the code, you simply specify a few configuration options (or use the defaults), create and add a node status change listener, and call the smudge.Begin() function.

Configuring the node with environment variables

Perhaps the simplest way of directing the behavior of the SWIM driver is by setting the appropriate system environment variables, which is useful when making use of Smudge inside of a container.

The following variables and their default values are as follows:

Variable                   | Default | Description
-------------------------- | ------- | -------------------------------
SMUDGE_HEARTBEAT_MILLIS    |     250 | Milliseconds between heartbeats
SMUDGE_INITIAL_HOSTS       |         | Comma-delimmited list of known members as IP or IP:PORT.
SMUDGE_LISTEN_PORT         |    9999 | UDP port to listen on
SMUDGE_MAX_BROADCAST_BYTES |     256 | Maximum byte length of broadcast payloads
Configuring the node with API calls

If you prefer to direct the behavior of the service using the API, the calls are relatively straight-forward. Note that setting the application properties using this method overrides the behavior of environment variables.

smudge.SetListenPort(9999)
smudge.SetHeartbeatMillis(250)
smudge.SetMaxBroadcastBytes(256)
Creating and adding a status change listener

Creating a status change listener is very straight-forward:

type MyStatusListener struct {
	smudge.StatusListener
}

func (m MyStatusListener) OnChange(node *smudge.Node, status smudge.NodeStatus) {
	fmt.Printf("Node %s is now status %s\n", node.Address(), status)
}

func main() {
	smudge.AddStatusListener(MyStatusListener{})
}
Creating and adding a broadcast listener

Adding a broadcast listener is very similar to creating a status listener:

type MyBroadcastListener struct {
	smudge.BroadcastListener
}

func (m MyBroadcastListener) OnBroadcast(b *smudge.Broadcast) {
	fmt.Printf("Received broadcast from %v: %s\n",
		b.Origin().Address(),
		string(b.Bytes()))
}

func main() {
	smudge.AddBroadcastListener(MyBroadcastListener{})
}
Adding a new member to the "known nodes" list

Adding a new member to your known nodes list will also make that node aware of the adding server. Note that because this package doesn't yet support multicast notifications, at this time to join an existing cluster you must use this method to add at least one of that cluster's healthy member nodes.

node, err := smudge.CreateNodeByAddress("localhost:10000")
if err == nil {
    smudge.AddNode(node)
}
Starting the server

Once everything else is done, starting the server is trivial:

Simply call: smudge.Begin()

Transmitting a broadcast

To transmit a broadcast to all healthy nodes currenty in the cluster you can use one of the BroadcastBytes(bytes []byte) or BroadcastString(str string) functions.

Be aware of the following caveats:

  • Attempting to send a broadcast before the server has been started will cause a panic.
  • The broadcast will not be received by the originating member; BroadcastListeners on the originating member will not be triggered.
  • Nodes that join the cluster after the broadcast has been fully propagated will not receive the broadcast; nodes that join after the initial transmission but before complete proagation may or may not receive the broadcast.
Getting a list of nodes

The AllNodes() can be used to get all known nodes; HealthyNodes() works similarly, but returns only healthy nodes (defined as nodes with a status of "alive").

Everything in one place
package main

import "github.com/clockworksoul/smudge"
import "fmt"

type MyStatusListener struct {
	smudge.StatusListener
}

func (m MyStatusListener) OnChange(node *smudge.Node, status smudge.NodeStatus) {
	fmt.Printf("Node %s is now status %s\n", node.Address(), status)
}

type MyBroadcastListener struct {
	smudge.BroadcastListener
}

func (m MyBroadcastListener) OnBroadcast(b *smudge.Broadcast) {
	fmt.Printf("Received broadcast from %s: %s\n",
		b.Origin().Address(),
		string(b.Bytes()))
}

func main() {
	heartbeatMillis := 500
	listenPort := 9999

	// Set configuration options
	smudge.SetListenPort(listenPort)
	smudge.SetHeartbeatMillis(heartbeatMillis)

	// Add the status listener
	smudge.AddStatusListener(MyStatusListener{})

	// Add the broadcast listener
	smudge.AddBroadcastListener(MyBroadcastListener{})

	// Add a new remote node. Currently, to join an existing cluster you must
	// add at least one of its healthy member nodes.
	node, err := smudge.CreateNodeByAddress("localhost:10000")
	if err == nil {
		smudge.AddNode(node)
	}

	// Start the server!
	smudge.Begin()
}

Documentation

Index

Constants

View Source
const (
	// EnvVarHeartbeatMillis is the name of the environment variable that
	// sets the heartbeat frequency (in millis).
	EnvVarHeartbeatMillis = "SMUDGE_HEARTBEAT_MILLIS"

	// DefaultHeartbeatMillis is the default heartbeat frequency (in millis).
	DefaultHeartbeatMillis int = 250

	// EnvVarInitialHosts is the name of the environment variable that sets
	// the initial known hosts. The value it sets should be a comma-delimitted
	// string of one or more IP:PORT pairs (port is optional if it matched the
	// value of SMUDGE_LISTEN_PORT).
	EnvVarInitialHosts = "SMUDGE_INITIAL_HOSTS"

	// DefaultInitialHosts default lists of initially known hosts.
	DefaultInitialHosts string = ""

	// EnvVarListenPort is the name of the environment variable that sets
	// the UDP listen port.
	EnvVarListenPort = "SMUDGE_LISTEN_PORT"

	// DefaultListenPort is the default UDP listen port.
	DefaultListenPort int = 9999

	// EnvVarMaxBroadcastBytes is the name of the environment variable that
	// the maximum byte length for broadcast payloads. Note that increasing
	// this runs the risk of packet fragmentation and dropped messages.
	EnvVarMaxBroadcastBytes = "SMUDGE_MAX_BROADCAST_BYTES"

	// DefaultMaxBroadcastBytes is the default maximum byte length for
	// broadcast payloads. This is guided by the maximum safe UDP packet size
	// of 508 bytes, which must also contain status updates and additional
	// message overhead.
	DefaultMaxBroadcastBytes int = 256
)

Variables

This section is empty.

Functions

func AddBroadcastListener

func AddBroadcastListener(listener BroadcastListener)

AddBroadcastListener allows the submission of a BroadcastListener implementation whose OnChange() function will be called whenever the node is notified of any change in the status of a cluster member.

func AddStatusListener

func AddStatusListener(listener StatusListener)

AddStatusListener allows the submission of a StatusListener implementation whose OnChange() function will be called whenever the node is notified of any change in the status of a cluster member.

func Begin

func Begin()

Begin starts the server by opening a UDP port and beginning the heartbeat. Note that this is a blocking function, so act appropriately.

func BroadcastBytes

func BroadcastBytes(bytes []byte) error

BroadcastBytes allows a user to emit a short broadcast in the form of a byte slice, which will be transmitted at most once to all other healthy current members. Members that join after the broadcast has already propagated through the cluster will not receive the message. The maximum broadcast length is 256 bytes.

func BroadcastString

func BroadcastString(str string) error

BroadcastString allows a user to emit a short broadcast in the form of a string, which will be transmitted at most once to all other healthy current members. Members that join after the broadcast has already propagated through the cluster will not receive the message. The maximum broadcast length is 256 bytes.

func GetHeartbeatMillis

func GetHeartbeatMillis() int

GetHeartbeatMillis gets this host's heartbeat frequency in milliseconds.

func GetInitialHosts

func GetInitialHosts() []string

GetInitialHosts returns the list of initially known hosts.

func GetListenPort

func GetListenPort() int

GetListenPort returns the port that this host will listen on.

func GetLocalIP

func GetLocalIP() (net.IP, error)

GetLocalIP queries the host interface to determine the local IPv4 of this machine. If a local IPv4 cannot be found, then nil is returned. If the query to the underlying OS fails, an error is returned.

func GetMaxBroadcastBytes

func GetMaxBroadcastBytes() int

GetMaxBroadcastBytes returns the maximum byte length for broadcast payloads.

func GetNowInMillis

func GetNowInMillis() uint32

GetNowInMillis returns the current local time in milliseconds since the epoch.

func PingNode

func PingNode(node *Node) error

PingNode can be used to explicitly ping a node. Calls the low-level doPingNode(), and outputs a message (and returns an error) if it fails.

func SetHeartbeatMillis

func SetHeartbeatMillis(val int)

SetHeartbeatMillis sets this nodes heartbeat frequency. Unlike SetListenPort(), calling this function after Begin() has been called will have an effect.

func SetListenPort

func SetListenPort(val int)

SetListenPort sets the UDP port to listen on. It has no effect once Begin() has been called.

func SetLogThreshold

func SetLogThreshold(level LogLevel)

SetLogThreshold allows the output noise level to be adjusted by setting the logging priority threshold.

func SetMaxBroadcastBytes

func SetMaxBroadcastBytes(val int)

SetMaxBroadcastBytes sets the maximum byte length for broadcast payloads. Note that increasing this beyond the default of 256 runs the risk of packet fragmentation and dropped messages.

func UpdateNodeStatus

func UpdateNodeStatus(node *Node, status NodeStatus)

UpdateNodeStatus assigns a new status for the specified node and adds it to the list of recently updated nodes. If the status is StatusDead, then the node will be moved from the live nodes list to the dead nodes list.

Types

type Broadcast

type Broadcast struct {
	// contains filtered or unexported fields
}

Broadcast represents a packet of bytes emitted across the cluster on top of the status update infrastructure. Although useful, its payload is limited to only 256 bytes.

func (*Broadcast) Bytes

func (b *Broadcast) Bytes() []byte

Bytes returns a copy of this broadcast's bytes. Manipulating the contents of this slice will not be reflected in the contents of the broadcast.

func (*Broadcast) Index

func (b *Broadcast) Index() uint32

Index returns the origin message index for this broadcast. This value is incremented for each broadcast. The combination of originIP:originPort:Index is unique.

func (*Broadcast) Label

func (b *Broadcast) Label() string

Label returns a unique label string composed of originIP:originPort:Index.

func (*Broadcast) Origin

func (b *Broadcast) Origin() *Node

Origin returns the node that this broadcast originated from.

type BroadcastListener

type BroadcastListener interface {
	// The OnChange() function is called whenever the node is notified of any
	// change in the status of a cluster member.
	OnBroadcast(broadcast *Broadcast)
}

BroadcastListener is the interface that must be implemented to take advantage of the cluster member status update notification functionality provided by the AddBroadcastListener() function.

type LogLevel

type LogLevel byte

LogLevel represents a logging levels to be used as a parameter passed to the SetLogThreshhold() function.

const (
	// LogAll allows all log output of all levels to be emitted.
	LogAll LogLevel = iota

	// LogTrace restricts log output to trace level and above.
	LogTrace

	// LogDebug restricts log output to debug level and above.
	LogDebug

	// LogInfo restricts log output to info level and above.
	LogInfo

	// LogWarn restricts log output to warn level and above.
	LogWarn

	// LogError restricts log output to error level and above.
	LogError

	// LogFatal restricts log output to fatal level.
	LogFatal

	// LogOff prevents all log output entirely.
	LogOff
)

func (LogLevel) String

func (s LogLevel) String() string

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node represents a single node in the cluster.

func AddNode

func AddNode(node *Node) (*Node, error)

AddNode can be used to explicitly add a node to the list of known live nodes. Updates the node timestamp but DOES NOT implicitly update the node's status; you need to do this explicitly.

func AllNodes

func AllNodes() []*Node

AllNodes will return a list of all nodes known at the time of the request, including nodes that have been marked as "dead" but haven't yet been removed from the registry.

func CreateNodeByAddress

func CreateNodeByAddress(address string) (*Node, error)

CreateNodeByAddress will create and return a new node when supplied with a node address ("ip:port" string). This doesn't add the node to the list of live nodes; use AddNode().

func CreateNodeByIP

func CreateNodeByIP(ip net.IP, port uint16) (*Node, error)

CreateNodeByIP will create and return a new node when supplied with an IP address and port number. This doesn't add the node to the list of live nodes; use AddNode().

func HealthyNodes

func HealthyNodes() []*Node

HealthyNodes will return a list of all nodes known at the time of the request with a healthy status.

func RemoveNode

func RemoveNode(node *Node) (*Node, error)

RemoveNode can be used to explicitly remove a node from the list of known live nodes. Updates the node timestamp but DOES NOT implicitly update the node's status; you need to do this explicitly.

func (*Node) Address

func (n *Node) Address() string

Address rReturns the address for this node in string format, which is simply the node's local IP and listen port. This is used as a unique identifier throughout the code base.

func (*Node) Age

func (n *Node) Age() uint32

Age returns the time since we last heard from this node, in milliseconds.

func (*Node) EmitCounter

func (n *Node) EmitCounter() int8

EmitCounter returns the number of times remaining that current status will be emitted by this node to other nodes.

func (*Node) IP

func (n *Node) IP() net.IP

IP returns the IP associated with this node.

func (*Node) Port

func (n *Node) Port() uint16

Port returns the port associated with this node.

func (*Node) Status

func (n *Node) Status() NodeStatus

Status returns this node's current status.

func (*Node) Timestamp

func (n *Node) Timestamp() uint32

Timestamp returns the timestamp of this node's last ping or status update, in milliseconds from the epoch

func (*Node) Touch

func (n *Node) Touch()

Touch updates the timestamp to the local time in milliseconds.

type NodeStatus

type NodeStatus byte

NodeStatus represents the believed status of a member node.

const (
	// StatusUnknown is the default node status of newly-created nodes.
	StatusUnknown NodeStatus = iota

	// StatusAlive indicates that a node is alive and healthy.
	StatusAlive

	// StatusDead indicatates that a node is dead and no longer healthy.
	StatusDead

	// StatusForwardTo is a pseudo status used by message to indicate
	// the target of a ping request.
	StatusForwardTo
)

func (NodeStatus) String

func (s NodeStatus) String() string

type StatusListener

type StatusListener interface {
	// The OnChange() function is called whenever the node is notified of any
	// change in the status of a cluster member.
	OnChange(node *Node, status NodeStatus)
}

StatusListener is the interface that must be implemented to take advantage of the cluster member status update notification functionality provided by the AddStatusListener() function.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL