Documentation ¶
Overview ¶
Package acomm is a library for asynchronous JSON-RPC-like communication between services.
Like JSON-RPC, requests specify the task to run, arguments for the task, and an id. Additionally, they specify a responseHook as well as success and error handlers. The response hook is where the response should be sent, and the hooks will run based on the type of response received. Responses are also similar to JSON-RPC, with the addition of a StreamURL field, used to indicate that additional data is available to stream directly. The request args and response result are left as json.RawMessage and can be unmarshalled into whatever struct the user desires.
The tracker provides request/response tracking. A request can be registered with the tracker, along with a timeout, before sending. When a response arrives, the request will be retrieved based on ID. Shutting down a tracker will wait for all open requests to be handled, whether it is a response arriving or a timeout occuring. The tracker also provides functionality for proxying requests that use an http response hook to one that uses a unix socket (provided by the tracker). It tracks the original request and returns a new request using its response listener as the response hook. When the response comes, it will then forward it along to the original response hook.
In a similar vein, the tracker can set up ad-hoc unix listeners for streaming data, as well as proxy it to http. It includes an HTTP handler func for handling http stream requests.
The UnixListener provides a wrapper around a unix socket, with connection tracking for graceful shutdown. Communication over a unix socket is done by sending a payload size header and then the JSON data; there are included methods for handling the sending and reading of such data.
Index ¶
- func ProxyStreamHandler(w http.ResponseWriter, r *http.Request)
- func ReplaceLocalhost(u *url.URL, replacement string) error
- func Send(addr *url.URL, payload interface{}) error
- func SendConnData(conn net.Conn, payload interface{}) error
- func Stream(dest io.Writer, addr *url.URL) error
- func UnmarshalConnData(conn net.Conn, dest interface{}) error
- type MultiRequest
- type Request
- func (req *Request) HandleResponse(resp *Response)
- func (req *Request) Respond(resp *Response) error
- func (req *Request) SetArgs(args interface{}) error
- func (req *Request) SetResponseHook(urlString string) error
- func (req *Request) SetStreamURL(urlString string) error
- func (req *Request) SetTaskURL(urlString string) error
- func (req *Request) UnmarshalArgs(dest interface{}) error
- func (req *Request) Validate() error
- type RequestOptions
- type Response
- type ResponseHandler
- type Tracker
- func (t *Tracker) Addr() string
- func (t *Tracker) HandleResponse(resp *Response)
- func (t *Tracker) NewStreamUnix(dir string, src io.ReadCloser) (*url.URL, error)
- func (t *Tracker) NumRequests() int
- func (t *Tracker) ProxyExternal(req *Request, timeout time.Duration) (*Request, error)
- func (t *Tracker) ProxyExternalHandler(w http.ResponseWriter, r *http.Request)
- func (t *Tracker) ProxyStreamHTTPURL(addr *url.URL) (*url.URL, error)
- func (t *Tracker) ProxyUnix(req *Request, timeout time.Duration) (*Request, error)
- func (t *Tracker) RemoveRequest(req *Request) bool
- func (t *Tracker) Start() error
- func (t *Tracker) Stop()
- func (t *Tracker) SyncRequest(dest *url.URL, opts RequestOptions, timeout time.Duration) (*Response, error)
- func (t *Tracker) TrackRequest(req *Request, timeout time.Duration) error
- func (t *Tracker) URL() *url.URL
- type UnixListener
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ProxyStreamHandler ¶
func ProxyStreamHandler(w http.ResponseWriter, r *http.Request)
ProxyStreamHandler is an HTTP HandlerFunc for simple proxy streaming.
func ReplaceLocalhost ¶
ReplaceLocalhost replaces localhost, 127.0.0.1, or ::1 with the specified host.
func SendConnData ¶
SendConnData marshals and writes payload JSON data to the Conn with appropriate headers.
func UnmarshalConnData ¶
UnmarshalConnData reads and unmarshals JSON data from the connection into the destination object.
Types ¶
type MultiRequest ¶
type MultiRequest struct {
// contains filtered or unexported fields
}
MultiRequest provides a way to manage multiple parallel requests
func NewMultiRequest ¶
func NewMultiRequest(tracker *Tracker, timeout time.Duration) *MultiRequest
NewMultiRequest creates and initializes a new MultiRequest.
func (*MultiRequest) AddRequest ¶
func (m *MultiRequest) AddRequest(name string, req *Request) error
AddRequest adds a request to the MultiRequest. Sending the request is still the responsibility of the caller.
func (*MultiRequest) RemoveRequest ¶
func (m *MultiRequest) RemoveRequest(req *Request)
RemoveRequest removes a request from the MultiRequest. Useful if the send fails.
func (*MultiRequest) Responses ¶
func (m *MultiRequest) Responses() map[string]*Response
Responses returns responses for all of the requests, keyed on the request name (as opposed to request id). Blocks until all requests are accounted for.
type Request ¶
type Request struct { ID string `json:"id"` Task string `json:"task"` TaskURL *url.URL `json:"taskURL"` ResponseHook *url.URL `json:"responseHook"` StreamURL *url.URL `json:"streamURL"` Args *json.RawMessage `json:"args"` SuccessHandler ResponseHandler `json:"-"` ErrorHandler ResponseHandler `json:"-"` // contains filtered or unexported fields }
Request is a request data structure for asynchronous requests. The ID is used to identify the request throught its life cycle. The ResponseHook is a URL where response data should be sent. SuccessHandler and ErrorHandler will be called appropriately to handle a response.
func NewRequest ¶
func NewRequest(opts RequestOptions) (*Request, error)
NewRequest creates a new Request instance.
func (*Request) HandleResponse ¶
HandleResponse determines whether a response indicates success or error and runs the appropriate handler. If the appropriate handler is not defined, it is assumed no handling is necessary and silently finishes.
func (*Request) SetResponseHook ¶
SetResponseHook is a convenience method to set the ResponseHook from a string url.
func (*Request) SetStreamURL ¶
SetStreamURL is a convenience method to set the StreamURL from a string url.
func (*Request) SetTaskURL ¶
SetTaskURL is a convenience method to set the TaskURL from a string url.
func (*Request) UnmarshalArgs ¶
UnmarshalArgs unmarshals the request args into the destination object.
type RequestOptions ¶
type RequestOptions struct { Task string TaskURL *url.URL TaskURLString string ResponseHook *url.URL ResponseHookString string StreamURL *url.URL StreamURLString string Args interface{} SuccessHandler ResponseHandler `json:"-"` ErrorHandler ResponseHandler `json:"-"` }
RequestOptions are properties and options used to create a new Request object. There are options to either directly specify a URL or provide a string that will be parsed.
type Response ¶
type Response struct { ID string `json:"id"` Result *json.RawMessage `json:"result"` StreamURL *url.URL `json:"streamURL"` Error error `json:"error"` }
Response is a response data structure for asynchronous requests. The ID should be the same as the Request it corresponds to. Result should be nil if Error is present and vice versa.
func NewResponse ¶
func NewResponse(req *Request, result interface{}, streamURL *url.URL, respErr error) (*Response, error)
NewResponse creates a new Response instance based on a Request.
func (*Response) MarshalJSON ¶
MarshalJSON marshals a Response into JSON.
func (*Response) UnmarshalJSON ¶
UnmarshalJSON unmarshals JSON data into a Response.
func (*Response) UnmarshalResult ¶
UnmarshalResult unmarshals the response result into the destination object.
type ResponseHandler ¶
ResponseHandler is a function to run when a request receives a response.
type Tracker ¶
type Tracker struct {
// contains filtered or unexported fields
}
Tracker keeps track of requests waiting on a response.
func NewTracker ¶
func NewTracker(socketPath string, httpStreamURL, externalProxyURL *url.URL, defaultTimeout time.Duration) (*Tracker, error)
NewTracker creates and initializes a new Tracker. If a socketPath is not provided, the response socket will be created in a temporary directory.
func (*Tracker) Addr ¶
Addr returns the string representation of the Tracker's response listener socket.
func (*Tracker) HandleResponse ¶
HandleResponse associates a response with a request and either forwards the response or calls the request's handler.
func (*Tracker) NewStreamUnix ¶
NewStreamUnix sets up an ad-hoc unix listner to stream data.
func (*Tracker) NumRequests ¶
NumRequests returns the number of tracked requests
func (*Tracker) ProxyExternal ¶
ProxyExternal proxies a request intended for an external destination
func (*Tracker) ProxyExternalHandler ¶
func (t *Tracker) ProxyExternalHandler(w http.ResponseWriter, r *http.Request)
ProxyExternalHandler is an HTTP HandlerFunc for proxying an external request.
func (*Tracker) ProxyStreamHTTPURL ¶
ProxyStreamHTTPURL generates the url for proxying streaming data from a unix socket.
func (*Tracker) ProxyUnix ¶
ProxyUnix proxies requests that have response hooks and stream urls of non-unix sockets. If the response hook and stream url are already unix sockets, it returns the original request. If the response hook is not, it tracks the original request and returns a new request with a unix socket response hook. If the stream url is not, it pipes the original stream through a new unix socket and updates the stream url. The purpose of this is so that there can be a single entry and exit point for external communication, while local services can reply directly to each other.
func (*Tracker) RemoveRequest ¶
RemoveRequest should be used to remove a tracked request. Use in cases such as sending failures, where there is no hope of a response being received.
func (*Tracker) Start ¶
Start activates the tracker. This allows tracking of requests as well as listening for and handling responses.
func (*Tracker) Stop ¶
func (t *Tracker) Stop()
Stop deactivates the tracker. It blocks until all active connections or tracked requests to finish.
func (*Tracker) SyncRequest ¶
func (t *Tracker) SyncRequest(dest *url.URL, opts RequestOptions, timeout time.Duration) (*Response, error)
SyncRequest is a convenience method for creating and sending a synchronous request.
func (*Tracker) TrackRequest ¶
TrackRequest tracks a request. This does not need to be called after using ProxyUnix.
type UnixListener ¶
type UnixListener struct {
// contains filtered or unexported fields
}
UnixListener is a wrapper for a unix socket. It handles creation and listening for new connections, as well as graceful shutdown.
func NewUnixListener ¶
func NewUnixListener(socketPath string, acceptLimit int) *UnixListener
NewUnixListener creates and initializes a new UnixListener. AcceptLimit controls how many connections it will listen for before stopping; 0 and below is unlimited.
func (*UnixListener) Addr ¶
func (ul *UnixListener) Addr() string
Addr returns the string representation of the unix address.
func (*UnixListener) DoneConn ¶
func (ul *UnixListener) DoneConn(conn net.Conn)
DoneConn completes the handling of a connection.
func (*UnixListener) NextConn ¶
func (ul *UnixListener) NextConn() net.Conn
NextConn blocks and returns the next connection. It will return nil when the listener is stopped and all existing connections have been handled. Connections should be handled in a go routine to take advantage of concurrency. When done, the connection MUST be finished with a call to DoneConn.
func (*UnixListener) Start ¶
func (ul *UnixListener) Start() error
Start prepares the listener and starts listening for new connections.
func (*UnixListener) Stop ¶
func (ul *UnixListener) Stop(timeout time.Duration)
Stop stops listening for new connections. It blocks until existing connections are handled and the listener closed.
func (*UnixListener) URL ¶
func (ul *UnixListener) URL() *url.URL
URL returns the URL representation of the unix address.