Documentation ¶
Overview ¶
Package redeo provides a toolkit for building redis-protocol compatible services optimised for high thoughput and low latency.
A simple server example with two commands:
srv := redeo.NewServer(nil) // Define handlers srv.HandleFunc("ping", func(w resp.ResponseWriter, _ *resp.Command) { w.AppendInlineString("PONG") }) srv.HandleFunc("info", func(w resp.ResponseWriter, _ *resp.Command) { w.AppendBulkString(srv.Info().String()) }) // More handlers; demo usage of redeo.WrapperFunc srv.Handle("echo", redeo.WrapperFunc(func(c *resp.Command) interface{} { if c.ArgN() != 1 { return redeo.ErrWrongNumberOfArgs(c.Name) } return c.Arg(0) })) // Open a new listener lis, err := net.Listen("tcp", ":9736") if err != nil { panic(err) } defer lis.Close() // Start serving (blocking) srv.Serve(lis)
Index ¶
- Constants
- Variables
- func ErrUnknownCommand(cmd string) error
- func ErrWrongNumberOfArgs(cmd string) error
- func UnknownCommand(cmd string) string
- func WrongNumberOfArgs(cmd string) string
- type Callback
- type CallbackFunc
- type Client
- func (c *Client) AddResponses(rsp interface{}) error
- func (c *Client) Close()
- func (c *Client) Conn() net.Conn
- func (c *Client) Context() context.Context
- func (c *Client) ID() uint64
- func (c *Client) RemoteAddr() net.Addr
- func (c *Client) Responses() chan interface{}
- func (c *Client) SetContext(ctx context.Context)
- func (c *Client) WaitClose()
- type ClientInfo
- type CommandDescription
- type CommandDescriptions
- type Config
- type Contextable
- type Handler
- type HandlerFunc
- type PubSubBroker
- type Server
- func (srv *Server) Close(lis net.Listener)
- func (srv *Server) GetClient(id uint64) (*Client, bool)
- func (srv *Server) Handle(name string, h Handler)
- func (srv *Server) HandleCallback(cb Callback)
- func (srv *Server) HandleCallbackFunc(cbf CallbackFunc)
- func (srv *Server) HandleFunc(name string, fn HandlerFunc)
- func (srv *Server) HandleStream(name string, h StreamHandler)
- func (srv *Server) HandleStreamFunc(name string, fn StreamHandlerFunc)
- func (srv *Server) Info() *ServerInfo
- func (srv *Server) Release()
- func (srv *Server) Serve(lis net.Listener) error
- func (srv *Server) ServeAsync(lis net.Listener) error
- func (srv *Server) ServeClient(cli *Client, syncs ...bool) error
- func (srv *Server) ServeForeignClient(cn net.Conn, syncs ...bool) error
- type ServerInfo
- func (i *ServerInfo) Client(clientID uint64) (*Client, bool)
- func (i *ServerInfo) ClientInfo() []*ClientInfo
- func (i *ServerInfo) Clients() []*Client
- func (i *ServerInfo) Fetch(name string) *info.Section
- func (i *ServerInfo) Find(name string) *info.Section
- func (i *ServerInfo) NumClients() int
- func (i *ServerInfo) String() string
- func (i *ServerInfo) TotalCommands() int64
- func (i *ServerInfo) TotalConnections() int64
- type StreamHandler
- type StreamHandlerFunc
- type SubCommands
- type WrapperFunc
Examples ¶
Constants ¶
const (
ASYNC_CB_NAME = "callback"
)
Variables ¶
var (
ErrClientClosed = errors.New("client closed")
)
Functions ¶
func ErrUnknownCommand ¶
ErrUnknownCommand returns an unknown command error
func ErrWrongNumberOfArgs ¶
ErrWrongNumberOfArgs returns an unknown command error
func UnknownCommand ¶
UnknownCommand returns an unknown command error string
func WrongNumberOfArgs ¶
WrongNumberOfArgs returns an unknown command error string
Types ¶
type Callback ¶
type Callback interface { // ServeRedeoStream serves a streaming request. ServeCallback(w resp.ResponseWriter, r interface{}) }
StreamHandler is an interface for responding to streaming commands
type CallbackFunc ¶
type CallbackFunc func(w resp.ResponseWriter, r interface{})
StreamHandlerFunc is a callback function, implementing Handler.
func (CallbackFunc) ServeCallback ¶
func (f CallbackFunc) ServeCallback(w resp.ResponseWriter, r interface{})
ServeRedeoStream calls f(w, c).
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client contains information about a client connection
Example ¶
package main import ( "github.com/mason-leap-lab/redeo" "github.com/mason-leap-lab/redeo/resp" ) func main() { srv := redeo.NewServer(nil) srv.HandleFunc("myip", func(w resp.ResponseWriter, cmd *resp.Command) { client := redeo.GetClient(cmd.Context()) if client == nil { w.AppendNil() return } w.AppendInlineString(client.RemoteAddr().String()) }) }
Output:
func GetClient ¶
GetClient retrieves the client from a the context. This function may return nil if a client is not set.
func (*Client) AddResponses ¶ added in v1.1.0
Add asynchronize response, error if the client is closed.
func (*Client) Close ¶
func (c *Client) Close()
Close will disconnect as soon as all pending replies have been written to the client
func (*Client) RemoteAddr ¶
RemoteAddr return the remote client address
func (*Client) Responses ¶
func (c *Client) Responses() chan interface{}
Obsoleted. Don't use responses channel directly, use AddResponses instead.
func (*Client) SetContext ¶
SetContext sets the client's context
type ClientInfo ¶
type ClientInfo struct { // ID is the internal client ID ID uint64 // RemoteAddr is the remote address string RemoteAddr string // LastCmd is the last command called by this client LastCmd string // CreateTime returns the time at which the client has // connected to the server CreateTime time.Time // AccessTime returns the time of the last access AccessTime time.Time // contains filtered or unexported fields }
ClientInfo contains client stats
type CommandDescription ¶
type CommandDescription struct { // Name is the command name, returned as a lowercase string. Name string // Arity is the command arity specification. // https://redis.io/commands/command#command-arity. // It follows a simple pattern: // positive if command has fixed number of required arguments. // negative if command has minimum number of required arguments, but may have more. Arity int64 // Flags is an enumeration of command flags. // https://redis.io/commands/command#flags. Flags []string // FirstKey is the position of first key in argument list. // https://redis.io/commands/command#first-key-in-argument-list FirstKey int64 // LastKey is the position of last key in argument list. // https://redis.io/commands/command#last-key-in-argument-list LastKey int64 // KeyStepCount is the step count for locating repeating keys. // https://redis.io/commands/command#step-count KeyStepCount int64 }
CommandDescription describes supported commands
type CommandDescriptions ¶
type CommandDescriptions []CommandDescription
CommandDescriptions returns a command handler. https://redis.io/commands/command
Example ¶
package main import ( "github.com/mason-leap-lab/redeo" ) func main() { srv := redeo.NewServer(nil) srv.Handle("command", redeo.CommandDescriptions{ {Name: "get", Arity: 2, Flags: []string{"readonly", "fast"}, FirstKey: 1, LastKey: 1, KeyStepCount: 1}, {Name: "randomkey", Arity: 1, Flags: []string{"readonly", "random"}}, {Name: "mset", Arity: -3, Flags: []string{"write", "denyoom"}, FirstKey: 1, LastKey: -1, KeyStepCount: 2}, {Name: "quit", Arity: 1}, }) }
Output:
func (CommandDescriptions) ServeRedeo ¶
func (s CommandDescriptions) ServeRedeo(w resp.ResponseWriter, c *resp.Command)
type Config ¶
type Config struct { // Timeout represents the per-request socket read/write timeout. // Default: 0 (disabled) Timeout time.Duration // IdleTimeout forces servers to close idle connection once timeout is reached. // Default: 0 (disabled) IdleTimeout time.Duration // If non-zero, use SO_KEEPALIVE to send TCP ACKs to clients in absence // of communication. This is useful for two reasons: // 1) Detect dead peers. // 2) Take the connection alive from the point of view of network // equipment in the middle. // On Linux, the specified value (in seconds) is the period used to send ACKs. // Note that to close the connection the double of the time is needed. // On other kernels the period depends on the kernel configuration. // Default: 0 (disabled) TCPKeepAlive time.Duration }
Config holds the server configuration
type Contextable ¶ added in v1.1.5
type Contextable interface { // Context retrieve context Context() context.Context // SetContext reset context SetContext(context.Context) }
Contextable interface for types own context
type Handler ¶
type Handler interface { // ServeRedeo serves a request. ServeRedeo(w resp.ResponseWriter, c *resp.Command) }
Handler is an abstract handler interface for responding to commands
func Info ¶
Info returns an info handler. https://redis.io/commands/info
Example ¶
package main import ( "github.com/mason-leap-lab/redeo" ) func main() { srv := redeo.NewServer(nil) srv.Handle("info", redeo.Info(srv)) }
Output:
func Ping ¶
func Ping() Handler
Ping returns a ping handler. https://redis.io/commands/ping
Example ¶
package main import ( "github.com/mason-leap-lab/redeo" ) func main() { srv := redeo.NewServer(nil) srv.Handle("ping", redeo.Ping()) }
Output:
type HandlerFunc ¶
type HandlerFunc func(w resp.ResponseWriter, c *resp.Command)
HandlerFunc is a callback function, implementing Handler.
Example ¶
package main import ( "sync" "github.com/mason-leap-lab/redeo" "github.com/mason-leap-lab/redeo/resp" ) func main() { mu := sync.RWMutex{} data := make(map[string]string) srv := redeo.NewServer(nil) srv.HandleFunc("set", func(w resp.ResponseWriter, c *resp.Command) { if c.ArgN() != 2 { w.AppendError(redeo.WrongNumberOfArgs(c.Name)) return } key := c.Arg(0).String() val := c.Arg(1).String() mu.Lock() data[key] = val mu.Unlock() w.AppendInt(1) }) srv.HandleFunc("get", func(w resp.ResponseWriter, c *resp.Command) { if c.ArgN() != 1 { w.AppendError(redeo.WrongNumberOfArgs(c.Name)) return } key := c.Arg(0).String() mu.RLock() val, ok := data[key] mu.RUnlock() if ok { w.AppendBulkString(val) return } w.AppendNil() }) }
Output:
func (HandlerFunc) ServeRedeo ¶
func (f HandlerFunc) ServeRedeo(w resp.ResponseWriter, c *resp.Command)
ServeRedeo calls f(w, c).
type PubSubBroker ¶
type PubSubBroker struct {
// contains filtered or unexported fields
}
PubSubBroker can be used to emulate redis' native pub/sub functionality
Example ¶
package main import ( "github.com/mason-leap-lab/redeo" ) func main() { broker := redeo.NewPubSubBroker() srv := redeo.NewServer(nil) srv.Handle("publish", broker.Publish()) srv.Handle("subscribe", broker.Subscribe()) }
Output:
func NewPubSubBroker ¶
func NewPubSubBroker() *PubSubBroker
NewPubSubBroker inits a new pub-sub broker
func (*PubSubBroker) Publish ¶
func (b *PubSubBroker) Publish() Handler
Publish acts as a publish handler
func (*PubSubBroker) PublishMessage ¶
func (b *PubSubBroker) PublishMessage(name, msg string) int64
PublishMessage allows to publish a message to the broker outside the command-cycle. Returns the number of subscribers
func (*PubSubBroker) Subscribe ¶
func (b *PubSubBroker) Subscribe() Handler
Subscribe returns a subscribe handler
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server configuration
Example ¶
package main import ( "net" "github.com/mason-leap-lab/redeo" "github.com/mason-leap-lab/redeo/resp" ) func main() { srv := redeo.NewServer(nil) // Define handlers srv.HandleFunc("ping", func(w resp.ResponseWriter, _ *resp.Command) { w.AppendInlineString("PONG") }) srv.HandleFunc("info", func(w resp.ResponseWriter, _ *resp.Command) { w.AppendBulkString(srv.Info().String()) }) // More handlers; demo usage of redeo.WrapperFunc srv.Handle("echo", redeo.WrapperFunc(func(c *resp.Command) interface{} { if c.ArgN() != 1 { return redeo.ErrWrongNumberOfArgs(c.Name) } return c.Arg(0) })) // Open a new listener lis, err := net.Listen("tcp", ":9736") if err != nil { panic(err) } defer lis.Close() // Start serving (blocking) srv.Serve(lis) }
Output:
func (*Server) HandleCallback ¶
Callback registers a handler for a callback command.
func (*Server) HandleCallbackFunc ¶
func (srv *Server) HandleCallbackFunc(cbf CallbackFunc)
CallbackFunc registers a handler func for a command
func (*Server) HandleFunc ¶
func (srv *Server) HandleFunc(name string, fn HandlerFunc)
HandleFunc registers a handler func for a command.
func (*Server) HandleStream ¶
func (srv *Server) HandleStream(name string, h StreamHandler)
HandleStream registers a handler for a streaming command.
func (*Server) HandleStreamFunc ¶
func (srv *Server) HandleStreamFunc(name string, fn StreamHandlerFunc)
HandleStreamFunc registers a handler func for a command
func (*Server) Serve ¶
Serve accepts incoming connections on a listener, creating a new service goroutine for each.
func (*Server) ServeClient ¶ added in v1.1.3
Lambda facing serve client
type ServerInfo ¶
type ServerInfo struct {
// contains filtered or unexported fields
}
ServerInfo contains server stats
func (*ServerInfo) Client ¶
func (i *ServerInfo) Client(clientID uint64) (*Client, bool)
Client returns connected clients by id
func (*ServerInfo) ClientInfo ¶
func (i *ServerInfo) ClientInfo() []*ClientInfo
ClientInfo returns details about connected clients
func (*ServerInfo) Clients ¶
func (i *ServerInfo) Clients() []*Client
Clients returns all connected clients
func (*ServerInfo) Fetch ¶
func (i *ServerInfo) Fetch(name string) *info.Section
Fetch finds or creates an info section. This method is not thread-safe.
func (*ServerInfo) Find ¶
func (i *ServerInfo) Find(name string) *info.Section
Find finds an info section by name.
func (*ServerInfo) NumClients ¶
func (i *ServerInfo) NumClients() int
NumClients returns the number of connected clients
func (*ServerInfo) TotalCommands ¶
func (i *ServerInfo) TotalCommands() int64
TotalCommands returns the total number of commands executed since the start of the server.
func (*ServerInfo) TotalConnections ¶
func (i *ServerInfo) TotalConnections() int64
TotalConnections returns the total number of connections made since the start of the server.
type StreamHandler ¶
type StreamHandler interface { // ServeRedeoStream serves a streaming request. ServeRedeoStream(w resp.ResponseWriter, c *resp.CommandStream) }
StreamHandler is an interface for responding to streaming commands
type StreamHandlerFunc ¶
type StreamHandlerFunc func(w resp.ResponseWriter, c *resp.CommandStream)
StreamHandlerFunc is a callback function, implementing Handler.
func (StreamHandlerFunc) ServeRedeoStream ¶
func (f StreamHandlerFunc) ServeRedeoStream(w resp.ResponseWriter, c *resp.CommandStream)
ServeRedeoStream calls f(w, c).
type SubCommands ¶
SubCommands returns a handler that is parsing sub-commands
Example ¶
package main import ( "github.com/mason-leap-lab/redeo" ) func main() { srv := redeo.NewServer(nil) srv.Handle("custom", redeo.SubCommands{ "ping": redeo.Ping(), "echo": redeo.Echo(), }) }
Output:
func (SubCommands) ServeRedeo ¶
func (s SubCommands) ServeRedeo(w resp.ResponseWriter, c *resp.Command)
type WrapperFunc ¶
WrapperFunc implements Handler, accepts a command and must return one of the following types:
nil error string []byte bool float32, float64 int, int8, int16, int32, int64 uint, uint8, uint16, uint32, uint64 resp.CustomResponse instances slices of any of the above typs maps containing keys and values of any of the above types
Example ¶
package main import ( "sync" "github.com/mason-leap-lab/redeo" "github.com/mason-leap-lab/redeo/resp" ) func main() { mu := sync.RWMutex{} data := make(map[string]string) srv := redeo.NewServer(nil) srv.Handle("set", redeo.WrapperFunc(func(c *resp.Command) interface{} { if c.ArgN() != 2 { return redeo.ErrWrongNumberOfArgs(c.Name) } key := c.Arg(0).String() val := c.Arg(1).String() mu.Lock() data[key] = val mu.Unlock() return 1 })) srv.Handle("get", redeo.WrapperFunc(func(c *resp.Command) interface{} { if c.ArgN() != 1 { return redeo.ErrWrongNumberOfArgs(c.Name) } key := c.Arg(0).String() mu.RLock() val, ok := data[key] mu.RUnlock() if ok { return val } return nil })) }
Output:
func (WrapperFunc) ServeRedeo ¶
func (f WrapperFunc) ServeRedeo(w resp.ResponseWriter, c *resp.Command)
ServeRedeo implements Handler
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package client implements a minimalist client for working with redis servers.
|
Package client implements a minimalist client for working with redis servers. |
cmd
|
|
Package resp implements low-level primitives for dealing with RESP (REdis Serialization Protocol).
|
Package resp implements low-level primitives for dealing with RESP (REdis Serialization Protocol). |