drpc

package
v0.0.0-...-3a1d29c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2024 License: BSD-2-Clause-Patent Imports: 15 Imported by: 16

README

dRPC

dRPC is a means of communication between processes local to the same physical system, via a Unix Domain Socket. At any given time a process may act as a client, a server, or both, though each listening dRPC server needs its own Unix Domain Socket.

The server will fail to create the socket if something already exists at that location in the filesystem, even if it is an older incarnation of the socket. Optionally, your application may wish to unlink that filesystem location before creating the socket.

dRPC calls are defined by module and method identifiers. The dRPC module can be thought of as a package of related functions. The dRPC method indicates a specific function to be executed by the server. If the method requires an input, it should be marshalled in the body of the dRPC call. The server will respond with a dRPC response structure, which may include a method-specific response in the body.

The DAOS dRPC implementation is dependent on Protocol Buffers to define the structures passed over the dRPC channel. Any structure to be sent via dRPC as part of a call or response must be defined in a .proto file.

Go API

In Go, the drpc package includes both client and server functionality, which is outlined below. For documentation of the C API, see here.

The dRPC call and response are represented by the Protobuf-generated drpc.Call and drpc.Response structures.

Go Client

The dRPC client is represented by the drpc.ClientConnection object.

Basic Client Workflow
  1. Create a new client connection with the path to the dRPC server's Unix Domain Socket:
    conn := drpc.NewClientConnection("/var/run/my_socket.sock")
    
  2. Connect to the dRPC server:
    err := conn.Connect()
    
  3. Create your drpc.Call and send it to the server:
    call := drpc.Call{}
    // Set up the Call with module, method, and body
    resp, err := drpc.SendMsg(call)
    
    An error indicates that the drpc.Call couldn't be sent, or an invalid drpc.Response was received. If there is no error returned, the content of the drpc.Response should still be checked for errors reported by the server.
  4. Send as many calls as desired.
  5. Close the connection when finished:
    conn.Close()
    
Go Server

The dRPC server is represented by the drpc.DomainSocketServer object.

Individual dRPC modules must be registered with the server in order to handle incoming dRPC calls for that module. To create a dRPC module, create an object that implements the drpc.Module interface. The module ID must be unique.

Basic Server Workflow
  1. Create the new DomainSocketServer with the server's Unix Domain Socket (file permissions 0600):
    drpcServer, err := drpc.NewDomainSocketServer(log, "/var/run/my_socket.sock", 0600)
    
  2. Register the dRPC modules that the server needs to handle:
    drpcServer.RegisterRPCModule(&MyExampleModule{})
    drpcServer.RegisterRPCModule(&AnotherExampleModule{})
    
  3. Start the server to kick off the Goroutine to start listening for and handling incoming connections:
    err = drpc.Start()
    
  4. When it is time to shut down the server, close down the listening Goroutine:
    drpcServer.Shutdown()
    

Documentation

Index

Constants

View Source
const (
	// MethodNotifyReady is a ModuleSrv method
	MethodNotifyReady srvMethod = C.DRPC_METHOD_SRV_NOTIFY_READY
	// MethodGetPoolServiceRanks requests the service ranks for a pool
	MethodGetPoolServiceRanks srvMethod = C.DRPC_METHOD_SRV_GET_POOL_SVC
	// MethodPoolFindByLabel requests the service ranks and UUID for a pool
	MethodPoolFindByLabel srvMethod = C.DRPC_METHOD_SRV_POOL_FIND_BYLABEL
	// MethodClusterEvent notifies of a cluster event in the I/O Engine.
	MethodClusterEvent srvMethod = C.DRPC_METHOD_SRV_CLUSTER_EVENT
	// MethodCheckerListPools requests the list of pools from the MS
	MethodCheckerListPools srvMethod = C.DRPC_METHOD_CHK_LIST_POOL
	// MethodCheckerRegisterPool registers a pool with the MS
	MethodCheckerRegisterPool srvMethod = C.DRPC_METHOD_CHK_REG_POOL
	// MethodCheckerDeregisterPool deregisters a pool with the MS
	MethodCheckerDeregisterPool srvMethod = C.DRPC_METHOD_CHK_DEREG_POOL
	// MethodCheckerReport reports a checker finding to the MS
	MethodCheckerReport srvMethod = C.DRPC_METHOD_CHK_REPORT
)
View Source
const MaxMsgSize = 1 << 20

MaxMsgSize is the maximum drpc message size that may be sent. Using a packetsocket over the unix domain socket means that we receive a whole message at a time without knowing its size. So for this reason we need to restrict the maximum message size so we can preallocate a buffer to put all of the information in. Corresponding C definition is found in include/daos/drpc.h

View Source
const (
	// MethodRequestCredentials is a ModuleSecurityAgent method
	MethodRequestCredentials securityAgentMethod = C.DRPC_METHOD_SEC_AGENT_REQUEST_CREDS
)
View Source
const (
	// MethodValidateCredentials is a ModuleSecurity method
	MethodValidateCredentials securityMethod = C.DRPC_METHOD_SEC_VALIDATE_CREDS
)

Variables

View Source
var (
	Status_name = map[int32]string{
		0: "SUCCESS",
		1: "SUBMITTED",
		2: "FAILURE",
		3: "UNKNOWN_MODULE",
		4: "UNKNOWN_METHOD",
		5: "FAILED_UNMARSHAL_CALL",
		6: "FAILED_UNMARSHAL_PAYLOAD",
		7: "FAILED_MARSHAL",
	}
	Status_value = map[string]int32{
		"SUCCESS":                  0,
		"SUBMITTED":                1,
		"FAILURE":                  2,
		"UNKNOWN_MODULE":           3,
		"UNKNOWN_METHOD":           4,
		"FAILED_UNMARSHAL_CALL":    5,
		"FAILED_UNMARSHAL_PAYLOAD": 6,
		"FAILED_MARSHAL":           7,
	}
)

Enum value maps for Status.

Functions

func FaultSocketFileInUse

func FaultSocketFileInUse(path string) *fault.Fault

FaultSocketFileInUse indicates that the dRPC socket file was already in use when we tried to start the dRPC server.

func Marshal

func Marshal(message proto.Message) ([]byte, error)

Marshal is a utility function that can be used by dRPC method handlers to marshal their method-specific response to be passed back to the ModuleService.

Types

type Call

type Call struct {
	Module   int32  `protobuf:"varint,1,opt,name=module,proto3" json:"module,omitempty"`     // ID of the module to process the call.
	Method   int32  `protobuf:"varint,2,opt,name=method,proto3" json:"method,omitempty"`     // ID of the method to be executed.
	Sequence int64  `protobuf:"varint,3,opt,name=sequence,proto3" json:"sequence,omitempty"` // Sequence number for matching a response to this call.
	Body     []byte `protobuf:"bytes,4,opt,name=body,proto3" json:"body,omitempty"`          // Input payload to be used by the method.
	// contains filtered or unexported fields
}

Call describes a function call to be executed over the dRPC channel.

func (*Call) Descriptor deprecated

func (*Call) Descriptor() ([]byte, []int)

Deprecated: Use Call.ProtoReflect.Descriptor instead.

func (*Call) GetBody

func (x *Call) GetBody() []byte

func (*Call) GetMethod

func (x *Call) GetMethod() int32

func (*Call) GetModule

func (x *Call) GetModule() int32

func (*Call) GetSequence

func (x *Call) GetSequence() int64

func (*Call) ProtoMessage

func (*Call) ProtoMessage()

func (*Call) ProtoReflect

func (x *Call) ProtoReflect() protoreflect.Message

func (*Call) Reset

func (x *Call) Reset()

func (*Call) String

func (x *Call) String() string

type ClientConnection

type ClientConnection struct {
	sync.Mutex
	// contains filtered or unexported fields
}

ClientConnection represents a client connection to a dRPC server

func (*ClientConnection) Close

func (c *ClientConnection) Close() error

Close shuts down the connection to the Unix Domain Socket

func (*ClientConnection) Connect

func (c *ClientConnection) Connect(ctx context.Context) error

Connect opens a connection to the internal Unix Domain Socket path

func (*ClientConnection) GetSocketPath

func (c *ClientConnection) GetSocketPath() string

GetSocketPath returns client dRPC socket file path.

func (*ClientConnection) IsConnected

func (c *ClientConnection) IsConnected() bool

IsConnected indicates whether the client connection is currently active

func (*ClientConnection) SendMsg

func (c *ClientConnection) SendMsg(ctx context.Context, msg *Call) (*Response, error)

SendMsg sends a message to the connected dRPC server, and returns the response to the caller.

type DomainSocketClient

type DomainSocketClient interface {
	sync.Locker
	IsConnected() bool
	Connect(context.Context) error
	Close() error
	SendMsg(context.Context, *Call) (*Response, error)
	GetSocketPath() string
}

DomainSocketClient is the interface to a dRPC client communicating over a Unix Domain Socket

func NewClientConnection

func NewClientConnection(socket string) DomainSocketClient

NewClientConnection creates a new dRPC client

type DomainSocketServer

type DomainSocketServer struct {
	// contains filtered or unexported fields
}

DomainSocketServer is the object that listens for incoming dRPC connections, maintains the connections for sessions, and manages the message processing.

func NewDomainSocketServer

func NewDomainSocketServer(log logging.Logger, sock string, sockMode os.FileMode) (*DomainSocketServer, error)

NewDomainSocketServer returns a new unstarted instance of a DomainSocketServer for the specified unix domain socket path.

func (*DomainSocketServer) Listen

func (d *DomainSocketServer) Listen(ctx context.Context)

Listen listens for incoming connections on the UNIX domain socket and creates individual sessions for each one.

func (*DomainSocketServer) RegisterRPCModule

func (d *DomainSocketServer) RegisterRPCModule(mod Module)

RegisterRPCModule takes a Module and associates it with the given DomainSocketServer so it can be used to process incoming dRPC calls.

func (*DomainSocketServer) Start

func (d *DomainSocketServer) Start(ctx context.Context) error

Start sets up the dRPC server socket and kicks off the listener goroutine.

type Failure

type Failure struct {
	// contains filtered or unexported fields
}

Failure represents a dRPC protocol failure.

func MarshalingFailure

func MarshalingFailure() Failure

MarshalingFailure creates a Failure for a failed attempt at marshaling a response.

func NewFailure

func NewFailure(status Status) Failure

NewFailure returns a Failure with the given status and a corresponding message.

func NewFailureWithMessage

func NewFailureWithMessage(message string) Failure

NewFailureWithMessage returns a generic failure with a custom message

func UnknownMethodFailure

func UnknownMethodFailure() Failure

UnknownMethodFailure creates a Failure for unknown dRPC method.

func UnknownModuleFailure

func UnknownModuleFailure() Failure

UnknownModuleFailure creates a Failure for unknown dRPC module.

func UnmarshalingCallFailure

func UnmarshalingCallFailure() Failure

UnmarshalingCallFailure creates a Failure for a failed attempt to unmarshal an incoming call.

func UnmarshalingPayloadFailure

func UnmarshalingPayloadFailure() Failure

UnmarshalingPayloadFailure creates a Failure for a failed attempt to unmarshal a call payload.

func (Failure) Error

func (e Failure) Error() string

Error provides a descriptive string associated with the failure.

func (Failure) GetStatus

func (e Failure) GetStatus() Status

GetStatus provides a dRPC status code associated with the failure.

type Method

type Method interface {
	ID() int32
	Module() ModuleID
	String() string
	IsValid() bool
}

type MgmtMethod

type MgmtMethod int32
const (
	// MethodPrepShutdown is a ModuleMgmt method
	MethodPrepShutdown MgmtMethod = C.DRPC_METHOD_MGMT_PREP_SHUTDOWN
	// MethodPingRank is a ModuleMgmt method
	MethodPingRank MgmtMethod = C.DRPC_METHOD_MGMT_PING_RANK
	// MethodSetRank is a ModuleMgmt method
	MethodSetRank MgmtMethod = C.DRPC_METHOD_MGMT_SET_RANK
	// MethodSetLogMasks is a ModuleMgmt method
	MethodSetLogMasks MgmtMethod = C.DRPC_METHOD_MGMT_SET_LOG_MASKS
	// MethodGetAttachInfo is a ModuleMgmt method
	MethodGetAttachInfo MgmtMethod = C.DRPC_METHOD_MGMT_GET_ATTACH_INFO
	// MethodPoolCreate is a ModuleMgmt method
	MethodPoolCreate MgmtMethod = C.DRPC_METHOD_MGMT_POOL_CREATE
	// MethodPoolDestroy is a ModuleMgmt method
	MethodPoolDestroy MgmtMethod = C.DRPC_METHOD_MGMT_POOL_DESTROY
	// MethodPoolEvict is a ModuleMgmt method
	MethodPoolEvict MgmtMethod = C.DRPC_METHOD_MGMT_POOL_EVICT
	// MethodPoolExclude is a ModuleMgmt method
	MethodPoolExclude MgmtMethod = C.DRPC_METHOD_MGMT_EXCLUDE
	// MethodPoolDrain is a ModuleMgmt method
	MethodPoolDrain MgmtMethod = C.DRPC_METHOD_MGMT_DRAIN
	// MethodPoolExtend is a ModuleMgmt method
	MethodPoolExtend MgmtMethod = C.DRPC_METHOD_MGMT_EXTEND
	// MethodPoolReintegrate is a ModuleMgmt method
	MethodPoolReintegrate MgmtMethod = C.DRPC_METHOD_MGMT_REINTEGRATE
	// MethodBioHealth is a ModuleMgmt method
	MethodBioHealth MgmtMethod = C.DRPC_METHOD_MGMT_BIO_HEALTH_QUERY
	// MethodSetUp is a ModuleMgmt method
	MethodSetUp MgmtMethod = C.DRPC_METHOD_MGMT_SET_UP
	// MethodSmdDevs is a ModuleMgmt method
	MethodSmdDevs MgmtMethod = C.DRPC_METHOD_MGMT_SMD_LIST_DEVS
	// MethodSmdPools is a ModuleMgmt method
	MethodSmdPools MgmtMethod = C.DRPC_METHOD_MGMT_SMD_LIST_POOLS
	// MethodPoolGetACL is a ModuleMgmt method
	MethodPoolGetACL MgmtMethod = C.DRPC_METHOD_MGMT_POOL_GET_ACL
	// MethodPoolOverwriteACL is a ModuleMgmt method
	MethodPoolOverwriteACL MgmtMethod = C.DRPC_METHOD_MGMT_POOL_OVERWRITE_ACL
	// MethodPoolUpdateACL is a ModuleMgmt method
	MethodPoolUpdateACL MgmtMethod = C.DRPC_METHOD_MGMT_POOL_UPDATE_ACL
	// MethodPoolDeleteACL is a ModuleMgmt method
	MethodPoolDeleteACL MgmtMethod = C.DRPC_METHOD_MGMT_POOL_DELETE_ACL
	// MethodSetFaultyState is a ModuleMgmt method
	MethodSetFaultyState MgmtMethod = C.DRPC_METHOD_MGMT_DEV_SET_FAULTY
	// MethodReplaceStorage is a ModuleMgmt method
	MethodReplaceStorage MgmtMethod = C.DRPC_METHOD_MGMT_DEV_REPLACE
	// MethodListContainers is a ModuleMgmt method
	MethodListContainers MgmtMethod = C.DRPC_METHOD_MGMT_LIST_CONTAINERS
	// MethodPoolQuery defines a method for querying a pool
	MethodPoolQuery MgmtMethod = C.DRPC_METHOD_MGMT_POOL_QUERY
	// MethodPoolQueryTarget defines a method for querying a pool engine's targets
	MethodPoolQueryTarget MgmtMethod = C.DRPC_METHOD_MGMT_POOL_QUERY_TARGETS
	// MethodPoolSetProp defines a method for setting a pool property
	MethodPoolSetProp MgmtMethod = C.DRPC_METHOD_MGMT_POOL_SET_PROP
	// MethodContSetOwner defines a method for setting the container's owner
	MethodContSetOwner MgmtMethod = C.DRPC_METHOD_MGMT_CONT_SET_OWNER
	// MethodGroupUpdate defines a method for updating the group map
	MethodGroupUpdate MgmtMethod = C.DRPC_METHOD_MGMT_GROUP_UPDATE
	// MethodNotifyPoolConnect defines a method to indicate a successful pool connect call
	MethodNotifyPoolConnect MgmtMethod = C.DRPC_METHOD_MGMT_NOTIFY_POOL_CONNECT
	// MethodNotifyPoolDisconnect defines a method to indicate a successful pool disconnect call
	MethodNotifyPoolDisconnect MgmtMethod = C.DRPC_METHOD_MGMT_NOTIFY_POOL_DISCONNECT
	// MethodNotifyExit defines a method for signaling a clean client shutdown
	MethodNotifyExit MgmtMethod = C.DRPC_METHOD_MGMT_NOTIFY_EXIT
	// MethodPoolGetProp defines a method for getting pool properties
	MethodPoolGetProp MgmtMethod = C.DRPC_METHOD_MGMT_POOL_GET_PROP
	// MethodCheckerStart defines a method for starting the checker
	MethodCheckerStart MgmtMethod = C.DRPC_METHOD_MGMT_CHK_START
	// MethodCheckerStop defines a method for stopping the checker
	MethodCheckerStop MgmtMethod = C.DRPC_METHOD_MGMT_CHK_STOP
	// MethodCheckerQuery defines a method for getting the checker status
	MethodCheckerQuery MgmtMethod = C.DRPC_METHOD_MGMT_CHK_QUERY
	// MethodCheckerProp defines a method for getting the checker properties
	MethodCheckerProp MgmtMethod = C.DRPC_METHOD_MGMT_CHK_PROP
	// MethodCheckerAction defines a method for specifying a checker action
	MethodCheckerAction MgmtMethod = C.DRPC_METHOD_MGMT_CHK_ACT
	// MethodPoolUpgrade defines a method for upgrade pool
	MethodPoolUpgrade MgmtMethod = C.DRPC_METHOD_MGMT_POOL_UPGRADE
	// MethodLedManage defines a method to manage a VMD device LED state
	MethodLedManage MgmtMethod = C.DRPC_METHOD_MGMT_LED_MANAGE
	// MethodSetupClientTelemetry defines a method to setup client telemetry
	MethodSetupClientTelemetry MgmtMethod = C.DRPC_METHOD_MGMT_SETUP_CLIENT_TELEM
)

func (MgmtMethod) ID

func (m MgmtMethod) ID() int32

func (MgmtMethod) IsValid

func (m MgmtMethod) IsValid() bool

IsValid sanity checks the Method ID is within expected bounds.

func (MgmtMethod) Module

func (m MgmtMethod) Module() ModuleID

func (MgmtMethod) String

func (m MgmtMethod) String() string

type Module

type Module interface {
	HandleCall(context.Context, *Session, Method, []byte) ([]byte, error)
	ID() ModuleID
}

Module is an interface that a type must implement to provide the functionality needed by the ModuleService to process dRPC requests.

type ModuleID

type ModuleID int32
const (
	// ModuleSecurityAgent is the dRPC module for security tasks in DAOS agent
	ModuleSecurityAgent ModuleID = C.DRPC_MODULE_SEC_AGENT
	// ModuleMgmt is the dRPC module for management service tasks
	ModuleMgmt ModuleID = C.DRPC_MODULE_MGMT
	// ModuleSrv is the dRPC module for tasks relating to server setup
	ModuleSrv ModuleID = C.DRPC_MODULE_SRV
	// ModuleSecurity is the dRPC module for security tasks in DAOS server
	ModuleSecurity ModuleID = C.DRPC_MODULE_SEC
)

func (ModuleID) GetMethod

func (id ModuleID) GetMethod(methodID int32) (Method, error)

func (ModuleID) ID

func (id ModuleID) ID() int32

func (ModuleID) String

func (id ModuleID) String() string

type ModuleService

type ModuleService struct {
	// contains filtered or unexported fields
}

ModuleService is the collection of Modules used by DomainSocketServer to be used to process messages.

func NewModuleService

func NewModuleService(log logging.Logger) *ModuleService

NewModuleService creates an initialized ModuleService instance

func (*ModuleService) GetModule

func (r *ModuleService) GetModule(id ModuleID) (Module, bool)

GetModule fetches the module for the given ID. Returns true if found, false otherwise.

func (*ModuleService) ProcessMessage

func (r *ModuleService) ProcessMessage(ctx context.Context, session *Session, msgBytes []byte) ([]byte, error)

ProcessMessage is the main entry point into the ModuleService. It accepts a marshaled drpc.Call instance, processes it, calls the handler in the appropriate Module, and marshals the result into the body of a drpc.Response.

func (*ModuleService) RegisterModule

func (r *ModuleService) RegisterModule(mod Module)

RegisterModule will take in a type that implements the Module interface and ensure that no other module is already registered with that module identifier.

type Response

type Response struct {
	Sequence int64  `protobuf:"varint,1,opt,name=sequence,proto3" json:"sequence,omitempty"`              // Sequence number of the Call that triggered this response.
	Status   Status `protobuf:"varint,2,opt,name=status,proto3,enum=drpc.Status" json:"status,omitempty"` // High-level status of the RPC. If SUCCESS, method-specific status may be included in the body.
	Body     []byte `protobuf:"bytes,3,opt,name=body,proto3" json:"body,omitempty"`                       // Output payload produced by the method.
	// contains filtered or unexported fields
}

Response describes the result of a dRPC call.

func (*Response) Descriptor deprecated

func (*Response) Descriptor() ([]byte, []int)

Deprecated: Use Response.ProtoReflect.Descriptor instead.

func (*Response) GetBody

func (x *Response) GetBody() []byte

func (*Response) GetSequence

func (x *Response) GetSequence() int64

func (*Response) GetStatus

func (x *Response) GetStatus() Status

func (*Response) ProtoMessage

func (*Response) ProtoMessage()

func (*Response) ProtoReflect

func (x *Response) ProtoReflect() protoreflect.Message

func (*Response) Reset

func (x *Response) Reset()

func (*Response) String

func (x *Response) String() string

type Session

type Session struct {
	Conn net.Conn
	// contains filtered or unexported fields
}

Session represents an individual client connection to the Domain Socket Server.

func NewSession

func NewSession(conn net.Conn, svc *ModuleService) *Session

NewSession creates a new dRPC Session object

func (*Session) Close

func (s *Session) Close()

Close closes the session

func (*Session) ProcessIncomingMessage

func (s *Session) ProcessIncomingMessage(ctx context.Context) error

ProcessIncomingMessage listens for an incoming message on the session, calls its handler, and sends the response.

type Status

type Status int32

Status represents the valid values for a response status.

const (
	Status_SUCCESS                  Status = 0 // The method executed and provided a response payload, if needed. Otherwise, the method simply succeeded.
	Status_SUBMITTED                Status = 1 // The method has been queued for asynchronous execution.
	Status_FAILURE                  Status = 2 // The method has failed and did not provide a response payload.
	Status_UNKNOWN_MODULE           Status = 3 // The requested module does not exist.
	Status_UNKNOWN_METHOD           Status = 4 // The requested method does not exist.
	Status_FAILED_UNMARSHAL_CALL    Status = 5 // Could not unmarshal the incoming call.
	Status_FAILED_UNMARSHAL_PAYLOAD Status = 6 // Could not unmarshal the method-specific payload of the incoming call.
	Status_FAILED_MARSHAL           Status = 7 // Generated a response payload, but couldn't marshal it into the response.
)

func ErrorToStatus

func ErrorToStatus(err error) Status

ErrorToStatus translates an error to a dRPC Status. In practice it checks to see if it was a dRPC Failure error, and uses the Status if so. Otherwise it is assumed to be a generic failure.

func (Status) Descriptor

func (Status) Descriptor() protoreflect.EnumDescriptor

func (Status) Enum

func (x Status) Enum() *Status

func (Status) EnumDescriptor deprecated

func (Status) EnumDescriptor() ([]byte, []int)

Deprecated: Use Status.Descriptor instead.

func (Status) Number

func (x Status) Number() protoreflect.EnumNumber

func (Status) String

func (x Status) String() string

func (Status) Type

func (Status) Type() protoreflect.EnumType

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL