Documentation ¶
Index ¶
- Constants
- Variables
- func AddContentTypeMap(codecType CodecType, contentType string)
- func BadGateway(format string, args ...interface{}) error
- func BadRequest(format string, args ...interface{}) error
- func Conflict(format string, args ...interface{}) error
- func Decode(buf bytex.Buffer, ctype CodecType, msg interface{}) error
- func Encode(ctype CodecType, msg interface{}) (bytex.Buffer, error)
- func Forbidden(format string, args ...interface{}) error
- func GatewayTimeout(format string, args ...interface{}) error
- func GetContentType(ctype CodecType) string
- func InternalServerError(format string, args ...interface{}) error
- func MethodNotAllowed(format string, args ...interface{}) error
- func NewSeqID() uint32
- func NotFound(format string, args ...interface{}) error
- func NotImplemented(format string, args ...interface{}) error
- func RegisterCodec(c Codec)
- func RequestTimeout(format string, args ...interface{}) error
- func ServiceUnavailable(format string, args ...interface{}) error
- func Unauthorized(format string, args ...interface{}) error
- type AttributeMap
- type BaseConn
- func (c *BaseConn) Attributes() AttributeMap
- func (c *BaseConn) GetChain() FilterChain
- func (c *BaseConn) GetReadBuffer() bytex.Buffer
- func (c *BaseConn) GetWriter() *NetWriter
- func (c *BaseConn) ID() uint32
- func (c *BaseConn) Init(tran Tran, client bool, tag string)
- func (c *BaseConn) IsActive() bool
- func (c *BaseConn) IsClient() bool
- func (c *BaseConn) IsStatus(s Status) bool
- func (c *BaseConn) LocalAddr() string
- func (c *BaseConn) Protocol() interface{}
- func (c *BaseConn) RemoteAddr() string
- func (c *BaseConn) SetLocalAddr(addr string)
- func (c *BaseConn) SetProtocol(p interface{})
- func (c *BaseConn) SetRemoteAddr(addr string)
- func (c *BaseConn) SetStatus(s Status)
- func (c *BaseConn) Status() Status
- func (c *BaseConn) Tag() string
- func (c *BaseConn) Tran() Tran
- func (c *BaseConn) Write(buff WriterTo) error
- type BaseFilter
- type BaseTran
- type Body
- type BodyWriter
- type CallOption
- type CallOptions
- type Callback
- type Client
- type Codec
- type CodecType
- type Conn
- type Detector
- type DialCallback
- type DialFunc
- type Endpoint
- type Error
- type Executor
- type Factory
- type Filter
- type FilterChain
- type FilterCtx
- type Frame
- type FrameType
- type Group
- type Header
- type Identifier
- type ListenFunc
- type Listener
- type Method
- type Middleware
- type Module
- type MsgType
- type NetWriter
- type Option
- func WithDial(fn DialFunc) Option
- func WithDialCallback(fn DialCallback) Option
- func WithDialNonBlocking() Option
- func WithDialTimeout(v time.Duration) Option
- func WithExtra(key string, value interface{}) Option
- func WithListen(fn ListenFunc) Option
- func WithNetwork(v string) Option
- func WithTag(tag string) Option
- type Options
- type Packet
- type Params
- type Processor
- type Protocol
- type Recycler
- type Request
- type Response
- type RetryPolicy
- type Route
- type Router
- type RunFunc
- type Runnable
- type Server
- type Status
- type Tran
- type WriterTo
Constants ¶
const ( StatusInternalServerError = http.StatusInternalServerError StatusTimeout = http.StatusRequestTimeout )
const ( KeyGroupConn = "conn" KeyGroupFilter = "filter" )
unique key group define
const ( // CLOSED 关闭状态 CLOSED = Status(iota) // CONNECTING 连接中 CONNECTING // OPEN 已经建立连接 OPEN // CLOSING 关闭中 CLOSING )
const ( XLogId = "X-Log-Id" XTraceId = "X-Trace-Id" )
常见Header
const ( FrameTypeHeader = 0 // 消息头 FrameTypeData = 1 FrameTypeTrailer = 2 )
Variables ¶
var ( ErrConnOpened = errors.New("conn is opened.") ErrConnClosed = errors.New("conn is closed") ErrFilterIndexOverflow = errors.New("filter index overflow") ErrNotSupport = errors.New("not support") ErrClosed = errors.New("closed") ErrInvalidFrame = errors.New("invalid frame") ErrInvalidIdentifier = errors.New("invalid identifier") )
network error
var ( // ErrNotFoundCodec not found codec err ErrNotFoundCodec = errors.New("not found codec") )
Functions ¶
func AddContentTypeMap ¶
AddContentTypeMap 添加MIME ContentType 到CodecType映射
func BadGateway ¶
BadGateway generates a 502 error.
func BadRequest ¶
BadRequest generates a 400 error.
func GatewayTimeout ¶
GatewayTimeout generates a 504 error.
func GetContentType ¶
GetContentType codecType转换为contentType
func InternalServerError ¶
InternalServerError generates a 500 error.
func MethodNotAllowed ¶
MethodNotAllowed generates a 405 error.
func NotImplemented ¶
NotImplemented generates a 501 error.
func RequestTimeout ¶
RequestTimeout generates a 408 error.
func ServiceUnavailable ¶
ServiceUnavailable generates a 503 error.
func Unauthorized ¶
Unauthorized generates a 401 error.
Types ¶
type AttributeMap ¶
type AttributeMap interface { // 属性数目 Len() int // 是否包含key Contains(key unique.Key) bool // 获取属性,若不存在且creator不为nil,则会创建 Get(key unique.Key, creator func() interface{}) interface{} // 设置属性 Put(key unique.Key, value interface{}) // 删除属性 Remove(key unique.Key) interface{} // 清空属性 Clear() }
AttributeMap 存储key-value数据,线程安全
数据通常不多,底层使用有序数据存储
func NewAttributeMap ¶
func NewAttributeMap() AttributeMap
type BaseConn ¶
BaseConn 实现最基础的BaseConn功能
func (*BaseConn) Attributes ¶
func (c *BaseConn) Attributes() AttributeMap
func (*BaseConn) SetProtocol ¶
func (c *BaseConn) SetProtocol(p interface{})
type BaseFilter ¶
type BaseFilter struct { }
BaseFilter 默认实现Filter接口,便于使用者无须实现不必要的接口
func (*BaseFilter) HandleClose ¶
func (f *BaseFilter) HandleClose(ctx FilterCtx) error
HandleClose ...
func (*BaseFilter) HandleError ¶
func (f *BaseFilter) HandleError(ctx FilterCtx) error
HandleError ...
func (*BaseFilter) HandleWrite ¶
func (f *BaseFilter) HandleWrite(ctx FilterCtx) error
HandleWrite ...
type BaseTran ¶
type BaseTran struct {
// contains filtered or unexported fields
}
BaseTran basic netx
type CallOptions ¶
type CallOptions struct { DialTimeout time.Duration // 连接超时 CallTimeout time.Duration // Callback interface{} // 异步回调函数 RetryPolicy RetryPolicy // 重试策略 }
CallOptions Call时可选参数
type Client ¶
type Client interface { Call(ctx context.Context, req Request, opts ...CallOption) (Response, error) Close() error }
Client 客户端接口
type Codec ¶
type Codec interface { Type() CodecType Name() string Encode(p bytex.Buffer, msg interface{}) error Decode(p bytex.Buffer, msg interface{}) error }
Codec 用于消息中body的编解码,常见的格式为Json,Xml,Protobuf,Thrift
不同协议间可能有不同的标识,比如http使用ContextType标识
type CodecType ¶
type CodecType = uint
const ( CodecTypeUnknown CodecType = 0 CodecTypeText CodecType = 1 // text/plain CodecTypeBinary CodecType = 2 // application/octet-stream CodecTypeForm CodecType = 3 // application/x-www-form-urlencoded CodecTypeJson CodecType = 4 CodecTypeXml CodecType = 5 CodecTypeProtobuf CodecType = 6 CodecTypeThrift CodecType = 7 CodecTypeMsgpack CodecType = 8 CodecTypeAvro CodecType = 9 CodecTypeGob CodecType = 10 )
常见已知CodecType枚举
func GetCodecType ¶
GetCodecType contentType转换为codecType
type Conn ¶
type Conn interface { ID() uint32 // 唯一自增ID,由底层自动生成 Tag() string // 标签 Tran() Tran // Transport Status() Status // 当前状态 IsActive() bool // 是否已经建立好连接 IsClient() bool // 是否是通过Dial建立的Client连接 LocalAddr() string // 本地地址 RemoteAddr() string // 远程地址 Write(p WriterTo) error // 异步写数据,线程安全 Send(msg interface{}) error // 异步发消息,会触发Filter Write操作 Close() error // 调用后将不再接收任何读写操作,并等待所有发送完成后再安全关闭 Attributes() AttributeMap // 扩展属性 Protocol() interface{} // 绑定协议 SetProtocol(p interface{}) // 设置解析协议 }
Conn 异步Socket
type Detector ¶
type Detector interface { Detect(bytex.Peeker) Protocol // 服务端探测协议 Default() Protocol // 默认协议,客户端不需要 }
Detector 用于自动探测协议,某些协议有magic number,可以方便的感知协议类型,某些则不支持
服务端需要探测协议,但仅需要探测一次即可,便于自动识别http,dubbo,grpc等协议 客户端则不需要探测协议,因为调用方是知道使用哪种协议
type Endpoint ¶
Endpoint represent one method for calling from remote.
func Apply ¶
func Apply(endpoint Endpoint, mws []Middleware) Endpoint
Apply 将Endpoint添加middleware,并返回最终的Endpoint
type Executor ¶
Executor 用于执行任务
常见线程模型有 1:同步执行,runner.New() 2:异步单线程,有序执行,single.New() 3:异步线程池,无序但线程数不超过最大值,pooled.New() 4:异步hash线程,不同功能指定不同线程,hashing.New() 5:异步go routine,gorunner.New
type Filter ¶
type Filter interface { Name() string HandleRead(ctx FilterCtx) error HandleWrite(ctx FilterCtx) error HandleOpen(ctx FilterCtx) error HandleClose(ctx FilterCtx) error HandleError(ctx FilterCtx) error }
Filter 用于链式处理Conn各种回调 InBound: 从前向后执行,包括Read,Open,Error OutBound:从后向前执行,包括Write,Close
type FilterChain ¶
type FilterChain interface { Len() int // 长度 Front() Filter // 第一个 Back() Filter // 最后一个 Get(index int) Filter // 通过索引获取filter Index(name string) int // 通过名字查询索引 AddFirst(filters ...Filter) // 在前边插入 AddLast(filters ...Filter) // 在末尾插入 HandleOpen(conn Conn) // 建立连接 HandleClose(conn Conn) // 关闭连接 HandleError(conn Conn, err error) // 发生错误 HandleRead(conn Conn, msg interface{}) // 读事件 HandleWrite(conn Conn, msg interface{}) error // 写事件 }
FilterChain 管理Filter,并链式调用所有Filter Filter分为Inbound和Outbound InBound: 从前向后执行,包括Read,Open,Error OutBound:从后向前执行,包括Write,Close
type FilterCtx ¶
type FilterCtx interface { context.Context // Recycler // 可回收复用 Attributes() AttributeMap // 自定义数据,Ctx运行结束后,则会失效 Conn() Conn // Socket Connection Data() interface{} // 获取数据 SetData(data interface{}) // 设置数据 Error() error // 错误信息 SetError(err error) // 设置错误信息 IsAbort() bool // 是否已经强制终止 Abort() // 终止调用 Next() error // 调用下一个 Jump(index int) error // 跳转到指定位置,可以是负索引 JumpBy(name string) error // 通过名字跳转 Call() error // 开始执行,执行完成后会释放FilterCtx Clone() FilterCtx // 拷贝当前状态,可用于转移到其他协程中继续执行 }
FilterCtx Filter上下文,默认会自动调用Next,如需终止,需要主动调用Abort
type Frame ¶
type Frame interface { Recycler Type() FrameType EndFlag() bool StreamID() uint32 SetStreamID(id uint32) Identifier() *Identifier SetIdentifier(v *Identifier) Header() Header SetHeader(v Header) Trailer() Header SetTrailer(v Header) Payload() bytex.Buffer SetPayload(v bytex.Buffer) }
Frame 最底层消息帧,一个消息可以由一帧组成,也可以由多帧组成
一个消息通常由三部分组成,header,body,trailer EndFlag标记是否是消息的最后一帧 StreamID,用于组装成Message Header帧会包含Identifier,Header,Payload Data帧只会使用Payload Trailer帧只会使用Trailer Header和Trailer只能有一个,Data帧可以有多个
参考http2: https://hpbn.co/http2/ https://halfrost.com/http2-http-semantics/ https://www.cnblogs.com/yudar/p/4642603.html
type Group ¶
type Group interface { Group(prefix string, middlewares ...Middleware) Group Use(middlewares ...Middleware) CONNECT(path string, handler interface{}, middlewares ...Middleware) DELETE(path string, handler interface{}, middlewares ...Middleware) GET(path string, handler interface{}, middlewares ...Middleware) HEAD(path string, handler interface{}, middlewares ...Middleware) OPTIONS(path string, handler interface{}, middlewares ...Middleware) PATCH(path string, handler interface{}, middlewares ...Middleware) POST(path string, handler interface{}, middlewares ...Middleware) PUT(path string, handler interface{}, middlewares ...Middleware) TRACE(path string, handler interface{}, middlewares ...Middleware) }
Group .
type Identifier ¶
type Identifier struct { Version uint // 版本信息,对于http1则对应[09,10,11] IsResponse bool // 是否是应答消息 IsOneway bool // 是否需要应答 SeqID uint32 // 动态唯一ID,用于查询Response回调 CmdID uint32 // CmdID,用于查询Request回调 Method Method // http method Service string // http host URI string // http URI Codec uint32 // payload编码,区别于Content-Type,Codec只支持有限的编码方式 StatusCode int32 // response status code StatusInfo string // response status info Params Params // 从path中解析获得的参数 // contains filtered or unexported fields }
Identifier 消息标识,只会在header中使用
func NewIdentifier ¶
func NewIdentifier() *Identifier
func (*Identifier) MsgType ¶
func (ident *Identifier) MsgType() MsgType
func (*Identifier) URL ¶
func (ident *Identifier) URL() *url.URL
type ListenFunc ¶
ListenFunc 标准的Listen接口
type Middleware ¶
Middleware deal with input Endpoint and output Endpoint.
type MsgType ¶
type MsgType uint8
MsgType 消息类型,定义同thrift https://github.com/apache/thrift/blob/master/doc/specs/thrift-binary-protocol.md
type NetWriter ¶
type NetWriter struct {
// contains filtered or unexported fields
}
NetWriter 用于队列存储WriterTo,便于单独协程中发送消息
type Options ¶
type Options struct { Tag string // 额外标签 Network string // 默认TCP,某些场景下会使用unix socket DialTimeout time.Duration // 连接超时设置 DialCallback DialCallback // 连接回调 DialNonBlocking bool // 连接是否阻塞,默认阻塞 Listen ListenFunc // Listen Dial DialFunc // Dial Extra map[string]interface{} // 其他扩展配置 }
Options 可选参数
type Packet ¶
type Packet interface { Recycler Identifier() *Identifier SetIdentifier(v *Identifier) Header() Header SetHeader(v Header) Trailer() Header SetTrailer(v Header) Body() Body SetBody(v Body) }
Packet 完整消息,由多个Frame组合而成,
同Request和Response底层使用相同结构,不同的是Packet通常由底层系统使用,而另外两个对用户使用更方便 在通信模型上通常分为两种: 1: 消息模式:内容为特定编码的消息结构,用于RPC通信 2: Chunk模式:内容通常为文件内容,用于收发文件等
type Params ¶
type Params struct {
// contains filtered or unexported fields
}
Params path中参数,values长度可能比keys大,但不会小,用于cache value
type Processor ¶
Processor 用于接收Frame然后拼装成Packet,并调用回调函数 通常有以下几种模式:
1:普通消息,一次能收发完完整消息,处理比较简单,没有并发问题,比如ping-pong,oneway模式rpc 2:流式消息,一个包由多帧组成,有并发问题,需要保证同时只能有一个线程在处理,但可能会被多次触发 常见场景:a:文件分片传输,b:stream rpc
type Protocol ¶
type Protocol interface { Name() string Detect(data bytex.Peeker) bool Decode(conn Conn, data bytex.Buffer) (Frame, error) Encode(conn Conn, frame Frame) (bytex.Buffer, error) }
Protocol 通信协议编解码
类似HTTP2协议,这里区分了Frame和Message两个概念,但弱化了http2协议 Protocol只负责Frame的编解码,并不负责Frame到Message的组包和拆包
multiplexing https://github.com/apache/incubator-brpc/blob/master/docs/cn/baidu_std.md
type Request ¶
type Request interface { Recycler Version() uint SetVersion(v uint) SeqID() uint32 SetSeqID(v uint32) Codec() uint32 SetCodec(v uint32) IsOneway() bool SetOneway(v bool) CmdID() uint32 SetCmdID(v uint32) Service() string SetService(v string) URI() string SetURI(v string) URL() *url.URL Params() Params Method() Method SetMethod(v Method) Header() Header SetHeader(v Header) Trailer() Header SetTrailer(v Header) Body() Body SetBody(v Body) Encode(codecType CodecType, msg interface{}) error Decode(msg interface{}) error }
Request see http.Request
func NewRequest ¶
func NewRequest() Request
type Response ¶
type Response interface { Recycler Version() uint SetVersion(v uint) SeqID() uint32 SetSeqID(v uint32) Codec() uint32 SetCodec(v uint32) StatusCode() int32 StatusInfo() string SetStatus(code int32, info string) Header() Header SetHeader(v Header) Trailer() Header SetTrailer(v Header) Body() Body SetBody(v Body) Encode(codecType CodecType, msg interface{}) error Decode(msg interface{}) error }
Response see http.Response
func NewResponse ¶
func NewResponse() Response
type RetryPolicy ¶
RetryPolicy 重试策略,比如基于次数
type Route ¶
type Route struct { Name string // Method Method // Path string // CmdID uint // 不宜过大尽量保持在uint16以内,底层数组存储 Metadata map[string]string // 自定义字段,可用于服务发现中注册额外字段 Handler interface{} // 原始Handler,see handler.go中toEndpoint原型定义 Middlewares []Middleware // 中间件 Callback Callback // Handler经过middleware加工后,转换成callback }
Route 路由信息
type Router ¶
type Router interface { Routes() []*Route Register(route *Route) NoRoute(callback Callback) Find(packet Packet) Callback }
Router .
type Server ¶
type Server interface { Group Addr() net.Addr // 服务器监听地址 Register(route *Route) // 注册router NoRoute(handler interface{}, middlewares ...Middleware) Run() error Exit() }
Server 服务端接口
type Tran ¶
type Tran interface { String() string GetChain() FilterChain SetChain(chain FilterChain) AddFilters(filters ...Filter) Dial(addr string, opts ...Option) (Conn, error) Listen(addr string, opts ...Option) (Listener, error) Close() error }
Tran 创建Conn,可以是tcp,websocket等协议 不同的Tran可以配置不同的FilterChain
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package metadata is a way of defining message headers
|
Package metadata is a way of defining message headers |