supbnervous

package
v0.0.0-...-5fda629 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2022 License: MulanPSL-2.0 Imports: 0 Imported by: 0

README

Nervous说明文档

一、总线SDK使用说明

1. 安装

  • 将src/nervous目录拷贝到GOPATH的src目录下
  • 将nervous/config.json拷贝至某个目录下。config.json中KAFKA_BROKERS为broker地址,其他均为kafka中配置,详情见kafka文档,建议直接保持默认

二、总线 基础功能

1. 示例

     n,_:=NewNervous(context.Background(),"./config.json","test917")
        wg:=sync.WaitGroup{}
        wg.Add(1)
        n.Subscribe("test917t")
        go func(){
                msg,_:=n.Receive("test917t")
                fmt.Println(msg.Topic)
                fmt.Println(msg.Key)
                fmt.Println(msg.Value)
                wg.Done()
        }()
        n.Send("test917t","keyf","fff")
        wg.Wait()
        n.Close()

2. 接口说明

1. 初始化
/*
初始化总线
@param 上下文,配置文件目录,全局id
@return 总线指针,错误
*/
func NewNervous(ctx context.Context, configAddress string,guid string)(*Nervous,error)
2. 向topic中写入数据
    /*
发送消息
@param 发送消息的topic,消息的key(可以为""),消息的json字符串
@return 错误
*/
func (n *Nervous)Send(topic string, key string, value string) error 
3. 订阅topic
   /*
订阅topic
@param 要订阅的topic
@return 错误
*/
func (n *Nervous)Subscribe(topic string) error

/*
取消订阅topic
@param 要取消订阅的topic
@return 错误
*/
func (n *Nervous)Unsubscribe(topic string) error
4. 从topic中拉取数据
    /*
从某个topic获取一条消息
@param 要获取消息的topic
@return 获得的消息,错误
*/
func (n *Nervous)Receive(topic string) (NervousMessage,error)
5. 释放资源
   /*
关闭Nervous
@param
@return 错误
*/
func (n *Nervous)Close() error

三、总线 RPC 功能

1. 示例


        c,_:=NewNervous(context.Background(),"./config.json","testrpcclient")
        c.Run()
        s,_:=NewNervous(context.Background(),"./config.json","testrpcserver")
        s.Run()
        testAdd := func(toAdd ...interface{})(interface{},error){
                fmt.Println("in testAdd")
                return toAdd[0].(float64)+1,nil
        }
        s.RPCRegister("testAdd",testAdd)
        if result,err:=c.RPCCall("testrpcserver","testAdd",2);err!=nil{
                fmt.Printf("rpc call error:%v\n",err)
        }else{
                fmt.Printf("result: %d\n",int(result.(float64)))
        }
        if result,err:=c.RPCCall("testrpcserver","funcList");err!=nil{
                fmt.Printf("rpc call error:%v\n",err)
        }else{
                fmt.Println("rcp",result)
                var tmpR []interface{}
                tmpR = result.([]interface{})
                for i:=0;i<len(tmpR);i++{
                        fmt.Println(tmpR[i].(string))
                }
        }
        fmt.Println("close rpc client and server")
        c.Close()
        s.Close()


2. 接口说明

- 1. 初始化
   /*
初始化rpc server 和rpc client,server会开始监听rpc请求,并注册基础rpc函数;client则开始监听rpc请求的返回值
@param
@return 错误
*/
func (n *Nervous)Run()error
- 2. 注册方法
   /*
向rcp server注册函数
@param 注册的函数名,注册的函数
@return 错误
*/
func (n *Nervous)RPCRegister(registerName string, rpcProcess func(args ...interface{})(interface{},error))error

- 3. 查询方法
   /*
查看当前rpc server上所有注册了的函数
@param
@return 函数列表,错误
*/
func (n *Nervous)RPCList()([]string,error)

    /*
查看当前rpc server上是否注册了某个函数
@param 查询的函数名
@return 是否存在,错误
*/
func (n *Nervous)RPCContains(funcName string)(bool,error)

- 4. 同步请求
   /*
发起rpc请求
@param 目标rpc server的guid,调用的函数名,传入的参数,默认重试5次,每次间隔500ms
@return 返回值,错误
*/
func (n *nervous) RPCCall(targetGuid string, funcName string, params ...interface{}) (interface{}, error) 

/*
发起rpc请求
@param 目标rpc server的guid,重试的次数,每次重试等待的毫秒数,调用的函数名,传入的参数
@return 返回值,错误
*/
func (n *nervous) RPCCall(targetGuid string, tryTime int, tryInterval int, funcName string, params ...interface{}) (interface{}, error)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Controller

type Controller interface {
	Subscribe(topic string) error
	Receive(topic string) (NervousMessage, error)
	ReceiveEncoder(topic string) (string, []byte, []byte, error)
	Send(topic string, key string, value string) error

	RPCRegister(registerName string, rpcProcess func(args ...interface{}) (interface{}, error)) error
	RPCRemove(registerName string) error

	RPCCall(targetGuid string, funcName string, params ...interface{}) (interface{}, error)
	RPCCallCustom(targetGuid string, tryTime int, tryInterval int, funcName string, params ...interface{}) (interface{}, error)
	RPCList() ([]string, error)
	RPCContains(funcName string) (bool, error)

	//Run() error
	Close() error
}

type NervousMessage

type NervousMessage struct {
	Topic string
	Key   string
	Value string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL