Documentation ¶
Index ¶
- Constants
- Variables
- func AddContentTypeMap(codecType CodecType, contentType string)
- func GetContentType(ctype CodecType) string
- func NewSeqID() uint32
- type AttributeMap
- type BaseConn
- func (c *BaseConn) Attributes() AttributeMap
- func (c *BaseConn) GetChain() FilterChain
- func (c *BaseConn) GetReadBuffer() bytex.Buffer
- func (c *BaseConn) GetWriter() *NetWriter
- func (c *BaseConn) ID() uint32
- func (c *BaseConn) Init(tran Tran, client bool, tag string)
- func (c *BaseConn) IsActive() bool
- func (c *BaseConn) IsClient() bool
- func (c *BaseConn) IsStatus(s Status) bool
- func (c *BaseConn) LocalAddr() string
- func (c *BaseConn) Protocol() interface{}
- func (c *BaseConn) RemoteAddr() string
- func (c *BaseConn) SetLocalAddr(addr string)
- func (c *BaseConn) SetProtocol(p interface{})
- func (c *BaseConn) SetRemoteAddr(addr string)
- func (c *BaseConn) SetStatus(s Status)
- func (c *BaseConn) Status() Status
- func (c *BaseConn) Tag() string
- func (c *BaseConn) Tran() Tran
- func (c *BaseConn) Write(buff WriterTo) error
- type BaseFilter
- type BaseTran
- type Body
- type BodyWriter
- type Callback
- type Codec
- type CodecType
- type Conn
- type Detector
- type DialCallback
- type DialFunc
- type Endpoint
- type Factory
- type Filter
- type FilterChain
- type FilterCtx
- type Frame
- type FrameType
- type Header
- type Identifier
- type ListenFunc
- type Listener
- type Method
- type Middleware
- type MsgType
- type NetWriter
- type Option
- func WithDial(fn DialFunc) Option
- func WithDialCallback(fn DialCallback) Option
- func WithDialNonBlocking() Option
- func WithDialTimeout(v time.Duration) Option
- func WithExtra(key string, value interface{}) Option
- func WithListen(fn ListenFunc) Option
- func WithNetwork(v string) Option
- func WithTag(tag string) Option
- type Options
- type Packet
- type Params
- type Processor
- type Protocol
- type Recycler
- type Request
- type Response
- type Status
- type Tran
- type WriterTo
Constants ¶
const ( StatusInternalServerError = http.StatusInternalServerError StatusTimeout = http.StatusRequestTimeout )
const ( KeyGroupConn = "conn" KeyGroupFilter = "filter" )
unique key group define
const ( // CLOSED 关闭状态 CLOSED = Status(iota) // CONNECTING 连接中 CONNECTING // OPEN 已经建立连接 OPEN // CLOSING 关闭中 CLOSING )
const ( XLogId = "X-Log-Id" XTraceId = "X-Trace-Id" )
常见Header
const ( FrameTypeHeader = 0 // 消息头 FrameTypeData = 1 FrameTypeTrailer = 2 )
Variables ¶
var ( ErrConnOpened = errors.New("conn is opened.") ErrConnClosed = errors.New("conn is closed") ErrFilterIndexOverflow = errors.New("filter index overflow") ErrNotSupport = errors.New("not support") ErrClosed = errors.New("closed") ErrInvalidFrame = errors.New("invalid frame") ErrInvalidIdentifier = errors.New("invalid identifier") )
network error
Functions ¶
func AddContentTypeMap ¶
AddContentTypeMap 添加MIME ContentType 到CodecType映射
func GetContentType ¶
GetContentType codecType转换为contentType
Types ¶
type AttributeMap ¶
type AttributeMap interface { // 属性数目 Len() int // 是否包含key Contains(key unique.Key) bool // 获取属性,若不存在且creator不为nil,则会创建 Get(key unique.Key, creator func() interface{}) interface{} // 设置属性 Put(key unique.Key, value interface{}) // 删除属性 Remove(key unique.Key) interface{} // 清空属性 Clear() }
AttributeMap 存储key-value数据,线程安全
数据通常不多,底层使用有序数据存储
func NewAttributeMap ¶
func NewAttributeMap() AttributeMap
type BaseConn ¶
BaseConn 实现最基础的BaseConn功能
func (*BaseConn) Attributes ¶
func (c *BaseConn) Attributes() AttributeMap
func (*BaseConn) SetProtocol ¶
func (c *BaseConn) SetProtocol(p interface{})
type BaseFilter ¶
type BaseFilter struct { }
BaseFilter 默认实现Filter接口,便于使用者无须实现不必要的接口
func (*BaseFilter) HandleClose ¶
func (f *BaseFilter) HandleClose(ctx FilterCtx) error
HandleClose ...
func (*BaseFilter) HandleError ¶
func (f *BaseFilter) HandleError(ctx FilterCtx) error
HandleError ...
func (*BaseFilter) HandleWrite ¶
func (f *BaseFilter) HandleWrite(ctx FilterCtx) error
HandleWrite ...
type BaseTran ¶
type BaseTran struct {
// contains filtered or unexported fields
}
BaseTran basic netx
type Body ¶
type Body interface { io.ReadCloser // 转换为Buffer,仅支持BufferBody,其他类型会返回NotSupport,用于rpc消息解码 Buffer() (bytex.Buffer, error) // 快速读取,直接返回buffer,返回io.EOF表明没有数据了 ReadFast(blocking bool) (bytex.Buffer, error) // 数据是否已读取完 End() bool }
Body 消息体接口,应用层只需要读操作
支持标准的io.Read接口,效率比较低,会额外拷贝一次内存,当数据不足时,会阻塞 ReadFast扩展了Read接口,直接返回底层的Buffer结构,更加高效,支持阻塞和非阻塞两种模式 当Read/ReadFast返回io.EOF时,表名没有数据了,但本次读取返回结果中,可能有数据,也可能没数据
type Codec ¶
type Codec interface { Type() CodecType Name() string Encode(p bytex.Buffer, msg interface{}) error Decode(p bytex.Buffer, msg interface{}) error }
Codec 用于消息中body的编解码,常见的格式为Json,Xml,Protobuf,Thrift
不同协议间可能有不同的标识,比如http使用ContextType标识
type CodecType ¶
type CodecType = uint
const ( CodecTypeUnknown CodecType = 0 CodecTypeText CodecType = 1 // text/plain CodecTypeBinary CodecType = 2 // application/octet-stream CodecTypeForm CodecType = 3 // application/x-www-form-urlencoded CodecTypeJson CodecType = 4 CodecTypeXml CodecType = 5 CodecTypeProtobuf CodecType = 6 CodecTypeThrift CodecType = 7 CodecTypeMsgpack CodecType = 8 CodecTypeAvro CodecType = 9 CodecTypeGob CodecType = 10 )
常见已知CodecType枚举
func GetCodecType ¶
GetCodecType contentType转换为codecType
type Conn ¶
type Conn interface { ID() uint32 // 唯一自增ID,由底层自动生成 Tag() string // 标签 Tran() Tran // Transport Status() Status // 当前状态 IsActive() bool // 是否已经建立好连接 IsClient() bool // 是否是通过Dial建立的Client连接 LocalAddr() string // 本地地址 RemoteAddr() string // 远程地址 Write(p WriterTo) error // 异步写数据,线程安全 Send(msg interface{}) error // 异步发消息,会触发Filter Write操作 Close() error // 调用后将不再接收任何读写操作,并等待所有发送完成后再安全关闭 Attributes() AttributeMap // 扩展属性 Protocol() interface{} // 绑定协议 SetProtocol(p interface{}) // 设置解析协议 }
Conn 异步Socket
type Detector ¶
type Detector interface { Detect(bytex.Peeker) Protocol // 服务端探测协议 Default() Protocol // 默认协议,客户端不需要 }
Detector 用于自动探测协议,某些协议有magic number,可以方便的感知协议类型,某些则不支持
服务端需要探测协议,但仅需要探测一次即可,便于自动识别http,dubbo,grpc等协议 客户端则不需要探测协议,因为调用方是知道使用哪种协议
type Endpoint ¶
Endpoint represent one method for calling from remote.
func Apply ¶
func Apply(endpoint Endpoint, mws []Middleware) Endpoint
Apply 将Endpoint添加middleware,并返回最终的Endpoint
type Filter ¶
type Filter interface { Name() string HandleRead(ctx FilterCtx) error HandleWrite(ctx FilterCtx) error HandleOpen(ctx FilterCtx) error HandleClose(ctx FilterCtx) error HandleError(ctx FilterCtx) error }
Filter 用于链式处理Conn各种回调 InBound: 从前向后执行,包括Read,Open,Error OutBound:从后向前执行,包括Write,Close
type FilterChain ¶
type FilterChain interface { Len() int // 长度 Front() Filter // 第一个 Back() Filter // 最后一个 Get(index int) Filter // 通过索引获取filter Index(name string) int // 通过名字查询索引 AddFirst(filters ...Filter) // 在前边插入 AddLast(filters ...Filter) // 在末尾插入 HandleOpen(conn Conn) // 建立连接 HandleClose(conn Conn) // 关闭连接 HandleError(conn Conn, err error) // 发生错误 HandleRead(conn Conn, msg interface{}) // 读事件 HandleWrite(conn Conn, msg interface{}) error // 写事件 }
FilterChain 管理Filter,并链式调用所有Filter Filter分为Inbound和Outbound InBound: 从前向后执行,包括Read,Open,Error OutBound:从后向前执行,包括Write,Close
type FilterCtx ¶
type FilterCtx interface { context.Context // Recycler // 可回收复用 Attributes() AttributeMap // 自定义数据,Ctx运行结束后,则会失效 Conn() Conn // Socket Connection Data() interface{} // 获取数据 SetData(data interface{}) // 设置数据 Error() error // 错误信息 SetError(err error) // 设置错误信息 IsAbort() bool // 是否已经强制终止 Abort() // 终止调用 Next() error // 调用下一个 Jump(index int) error // 跳转到指定位置,可以是负索引 JumpBy(name string) error // 通过名字跳转 Call() error // 开始执行,执行完成后会释放FilterCtx Clone() FilterCtx // 拷贝当前状态,可用于转移到其他协程中继续执行 }
FilterCtx Filter上下文,默认会自动调用Next,如需终止,需要主动调用Abort
type Frame ¶
type Frame interface { Recycler Type() FrameType EndFlag() bool StreamID() uint32 SetStreamID(id uint32) Identifier() *Identifier SetIdentifier(v *Identifier) Header() Header SetHeader(v Header) Trailer() Header SetTrailer(v Header) Payload() bytex.Buffer SetPayload(v bytex.Buffer) }
Frame 最底层消息帧,一个消息可以由一帧组成,也可以由多帧组成
一个消息通常由三部分组成,header,body,trailer EndFlag标记是否是消息的最后一帧 StreamID,用于组装成Message Header帧会包含Identifier,Header,Payload Data帧只会使用Payload Trailer帧只会使用Trailer Header和Trailer只能有一个,Data帧可以有多个
参考http2: https://hpbn.co/http2/ https://halfrost.com/http2-http-semantics/ https://www.cnblogs.com/yudar/p/4642603.html
type Identifier ¶
type Identifier struct { Version uint // 版本信息,对于http1则对应[09,10,11] IsResponse bool // 是否是应答消息 IsOneway bool // 是否需要应答 SeqID uint32 // 动态唯一ID,用于查询Response回调 CmdID uint32 // CmdID,用于查询Request回调 Method Method // http method Service string // http host URI string // http URI Codec uint32 // payload编码,区别于Content-Type,Codec只支持有限的编码方式 StatusCode int32 // response status code StatusInfo string // response status info Params Params // 从path中解析获得的参数 // contains filtered or unexported fields }
Identifier 消息标识,只会在header中使用
func NewIdentifier ¶
func NewIdentifier() *Identifier
func (*Identifier) MsgType ¶
func (ident *Identifier) MsgType() MsgType
func (*Identifier) URL ¶
func (ident *Identifier) URL() *url.URL
type ListenFunc ¶
ListenFunc 标准的Listen接口
type Middleware ¶
Middleware deal with input Endpoint and output Endpoint.
type MsgType ¶
type MsgType uint8
MsgType 消息类型,定义同thrift https://github.com/apache/thrift/blob/master/doc/specs/thrift-binary-protocol.md
type NetWriter ¶
type NetWriter struct {
// contains filtered or unexported fields
}
NetWriter 用于队列存储WriterTo,便于单独协程中发送消息
type Options ¶
type Options struct { Tag string // 额外标签 Network string // 默认TCP,某些场景下会使用unix socket DialTimeout time.Duration // 连接超时设置 DialCallback DialCallback // 连接回调 DialNonBlocking bool // 连接是否阻塞,默认阻塞 Listen ListenFunc // Listen Dial DialFunc // Dial Extra map[string]interface{} // 其他扩展配置 }
Options 可选参数
type Packet ¶
type Packet interface { Recycler Identifier() *Identifier SetIdentifier(v *Identifier) Header() Header SetHeader(v Header) Trailer() Header SetTrailer(v Header) Body() Body SetBody(v Body) }
Packet 完整消息,由多个Frame组合而成,
同Request和Response底层使用相同结构,不同的是Packet通常由底层系统使用,而另外两个对用户使用更方便 在通信模型上通常分为两种: 1: 消息模式:内容为特定编码的消息结构,用于RPC通信 2: Chunk模式:内容通常为文件内容,用于收发文件等
type Params ¶
type Params struct {
// contains filtered or unexported fields
}
Params path中参数,values长度可能比keys大,但不会小,用于cache value
type Processor ¶
Processor 用于接收Frame然后拼装成Packet,并调用回调函数 通常有以下几种模式:
1:普通消息,一次能收发完完整消息,处理比较简单,没有并发问题,比如ping-pong,oneway模式rpc 2:流式消息,一个包由多帧组成,有并发问题,需要保证同时只能有一个线程在处理,但可能会被多次触发 常见场景:a:文件分片传输,b:stream rpc
type Protocol ¶
type Protocol interface { Name() string Detect(data bytex.Peeker) bool Decode(conn Conn, data bytex.Buffer) (Frame, error) Encode(conn Conn, frame Frame) (bytex.Buffer, error) }
Protocol 通信协议编解码
类似HTTP2协议,这里区分了Frame和Message两个概念,但弱化了http2协议 Protocol只负责Frame的编解码,并不负责Frame到Message的组包和拆包
multiplexing https://github.com/apache/incubator-brpc/blob/master/docs/cn/baidu_std.md
type Request ¶
type Request interface { Recycler Version() uint SetVersion(v uint) SeqID() uint32 SetSeqID(v uint32) Codec() uint32 SetCodec(v uint32) IsOneway() bool SetOneway(v bool) CmdID() uint32 SetCmdID(v uint32) Service() string SetService(v string) URI() string SetURI(v string) URL() *url.URL Params() Params Method() Method SetMethod(v Method) Header() Header SetHeader(v Header) Trailer() Header SetTrailer(v Header) Body() Body SetBody(v Body) }
Request see http.Request
func NewRequest ¶
func NewRequest() Request
type Response ¶
type Response interface { Recycler Version() uint SetVersion(v uint) SeqID() uint32 SetSeqID(v uint32) Codec() uint32 SetCodec(v uint32) StatusCode() int32 StatusInfo() string SetStatus(code int32, info string) Header() Header SetHeader(v Header) Trailer() Header SetTrailer(v Header) Body() Body SetBody(v Body) }
Response see http.Response
func NewResponse ¶
func NewResponse() Response
type Tran ¶
type Tran interface { String() string GetChain() FilterChain SetChain(chain FilterChain) AddFilters(filters ...Filter) Dial(addr string, opts ...Option) (Conn, error) Listen(addr string, opts ...Option) (Listener, error) Close() error }
Tran 创建Conn,可以是tcp,websocket等协议 不同的Tran可以配置不同的FilterChain