client

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2024 License: Apache-2.0, BSD-2-Clause, BSD-3-Clause, + 2 more Imports: 26 Imported by: 98

README

English | 中文

tRPC-Go Client Package

Background

User invoke RPC requests through stub code, and then the request enters client package, where the client package is responsible for the service discovery, interceptor execution, serialization, and compression, and finally, it's sent to the network via the transport package. Upon receiving a network response, the client package executes decompression, deserialization, interceptor execution, and ultimately returns the response to the user. Each step in the client package can be customized, allowing users to define their own service discovery methods, interceptors, serialization, compression, etc.

Client Options

Users can specify different client options when making RPC requests. These options include the target address, network type, and can also specify the implementation details for each step, such as the service discovery method and serialization method.

proxy := pb.NewGreeterClientProxy()
rsp, err := proxy.Hello(
    context.Background(),
    &pb.HelloRequest{Msg: "world"},
    client.WithTarget("ip://127.0.0.1:9000"), // Specify client options
)

Commonly used options include:

  • WithTarget(target string): Set the server address.

  • WithNetwork(network string): Set the network type.

  • WithNamespace(ns string): Set the server namespace (Production/Development).

  • WithServiceName(name string): Set the server service name, which is used for service discovery.

  • WithTimeout(timeout time.Duration): Set the request timeout.

  • WithNamedFilter(name string, f filter.ClientFilter): Set client filter.

  • WithSerializationType(t int): Set the serialization type. Built-in serialization methods include protobuf, JSON, flatbuffer, and custom serialization types can be registered using codec.RegisterSerializer.

  • WithCompressType(t int): Set the compression type. Built-in compression methods include gzip, snappy, zlib, and custom compression types can be registered using codec.RegisterCompressor.

  • WithProtocol(s string): Set a custom protocol type (default is trpc), which can be registered using codec.Register.

Client Configs

Users can not only pass client options when making RPC requests but also define client configs in a configuration file. Client options and client configs serve partially overlapping purposes, with client options taking precedence over client configs. When both configs and options are present, the content in the options will override the configs. Using client configs is advantageous as it allows for easy modification of settings without frequent code changes.

client: # Client configs
  timeout: 1000 # Timeout(ms) for all requests
  namespace: Development # Server environment for all requests
  filter: # Filters for all requests
    - debuglog # Use debuglog to log request and response
  service: # Configs for requests to specific services
    - callee: trpc.test.helloworld.Greeter # callee name in the pb protocol file, can be omitted if it matches 'name' below
      name: trpc.test.helloworld.Greeter1 # Service name for service discovery
      target: ip://127.0.0.1:8000 # Server address, e.g., ip://ip:port or polaris://servicename, can be omitted if using naming discovery with name
      network: tcp # Network type for the request (tcp or udp)
      protocol: trpc # Application-layer protocol (trpc or http)
      timeout: 800 # Request timeout(ms)
      serialization: 0 # Serialization type (0-pb, 2-json, 3-flatbuffer); no need to configure by default
      compression: 1 # Compression type (1-gzip, 2-snappy, 3-zlib); no need to configure by default

Difference between callee and name:

callee refers to the service name from the pb protocol file, formatted as pbpackage.service.

For example, if the pb protocol is:

package trpc.a.b;
service Greeter {
    rpc SayHello(request) returns reply
}

Then callee would be trpc.a.b.Greeter.

name refers to the service name registered in the naming service. It corresponds to the server.service.name field in the target service's configuration file.

In most cases, callee and name are the same, and you only need to configure one of them. However, in scenarios like storage services where multiple instances of the same pb file are deployed, the service name registered in the naming service may differ from the pb service name. In such cases, you must configure both callee and name.

client:
  service:
    - callee: pbpackage.service # Must configure both callee and name; callee is the pb service name used for matching client proxy and configuration
      name: polaris-service-name # Service name in the naming service used for addressing
      protocol: trpc

Client-generated stub code from protobuf by default includes the pb service name in the client. Therefore, when the client searches for configurations, it matches them based on the "callee" key, which is the pb service name.

On the other hand, clients generated using constructs like redis.NewClientProxy("trpc.a.b.c") (including all plugins under the "database" category and HTTP) use the user-provided string as the default service name. Consequently, when searching for configurations, the client utilizes the input parameter of NewClientProxy as the key (e.g., trpc.a.b.c).

Additionally, the framework supports finding configurations using both "callee" and "name" as keys. For example, in the following two client configurations, they share the same "callee" but have different "name":

client:
  service:
    - callee: pbpackage.service # "callee" is the pb service name
      name: polaris-service-name1 # The service name registered in the naming service for addressing
      network: tcp
    - callee: pbpackage.service # "callee" is the pb service name
      name: polaris-service-name2 # Another service name registered in the naming service for addressing
      network: udp

In your code, you can use client.WithServiceName to find configurations using both callee and name as keys:

// proxy1 uses the first configuration, using TCP
proxy1 := pb.NewClientProxy(client.WithServiceName("polaris-service-name1"))
// proxy2 uses the second configuration, using UDP
proxy2 := pb.NewClientProxy(client.WithServiceName("polaris-service-name2"))

Client Invocation Workflow

  1. The user submits a request using stub code to invoke an RPC call.
  2. The request enters the client package.
  3. Client configurations are completed based on the specified options and configuration file information.
  4. Service discovery is performed to obtain the actual service address based on the service name.
  5. Interceptors are invoked, executing the pre-interceptor phase.
  6. The request body is serialized, resulting in binary data.
  7. The request body is compressed.
  8. The complete request is packaged, including the protocol header.
  9. The transport package sends a network request.
  10. The transport package receives the network response.
  11. The response is unpacked to obtain the protocol header and response body.
  12. The response body is decompressed.
  13. The response is deserialized to obtain the response structure.
  14. Interceptors are invoked, executing the post-interceptor phase.
  15. The response is returned to the user.

Documentation

Overview

Package client is tRPC-Go clientside implementation, including network transportation, resolving, routing etc.

Index

Constants

This section is empty.

Variables

View Source
var DefaultClient = New()

DefaultClient is the default global client. It's thread-safe.

View Source
var (
	// DefaultSelectorFilterName is the default name of selector filter.
	// It can be modified if conflict exists.
	DefaultSelectorFilterName = "selector"
)
View Source
var DefaultStream = NewStream()

DefaultStream is the default client Stream.

View Source
var New = func() Client {
	return &client{}
}

New creates a client that uses default client transport.

View Source
var NewStream = func() Stream {
	return &stream{}
}

NewStream is the function that returns a Stream.

Functions

func DefaultClientConfig

func DefaultClientConfig() map[string]*BackendConfig

DefaultClientConfig returns the default client config.

Note: if multiple client configs with same callee and different service name exist in trpc_go.yaml, this function will only return the last config for the same callee key.

func IsOptionsImmutable

func IsOptionsImmutable(ctx context.Context) bool

IsOptionsImmutable checks the ctx if options are immutable.

func LoadClientConfig

func LoadClientConfig(path string, opts ...config.LoadOption) error

LoadClientConfig loads client config by path.

func RegisterClientConfig

func RegisterClientConfig(callee string, conf *BackendConfig) error

RegisterClientConfig is called to replace backend config of single callee service by name.

func RegisterConfig

func RegisterConfig(conf map[string]*BackendConfig) error

RegisterConfig is called to replace the global backend config, allowing updating backend config regularly.

func RegisterStreamFilter

func RegisterStreamFilter(name string, filter StreamFilter)

RegisterStreamFilter registers a StreamFilter by name.

func WithOptionsImmutable

func WithOptionsImmutable(ctx context.Context) context.Context

WithOptionsImmutable marks options of outermost layer immutable. Cloning options is needed for modifying options in lower layers.

It should only be used by filters that call the next filter concurrently.

Types

type Attachment

type Attachment struct {
	// contains filtered or unexported fields
}

Attachment stores the Attachment of tRPC requests/responses.

func NewAttachment

func NewAttachment(request io.Reader) *Attachment

NewAttachment returns a new Attachment whose response Attachment is a NoopAttachment.

func (*Attachment) Response

func (a *Attachment) Response() io.Reader

Response returns Response Attachment.

type BackendConfig

type BackendConfig struct {
	// Callee is the name of the backend service.
	// The config file uses it as the key to set the parameters.
	// Usually, it is the proto name of the callee service defined in proto stub file,
	// and it is the same as ServiceName below.
	Callee      string `yaml:"callee"`   // Name of the backend service.
	ServiceName string `yaml:"name"`     // Backend service name.
	EnvName     string `yaml:"env_name"` // Env name of the callee.
	SetName     string `yaml:"set_name"` // "Set" name of the callee.

	// DisableServiceRouter, despite its inherent inappropriate and vague nomenclature,
	// is an option for naming service that denotes the de-facto meaning of disabling
	// out-rule routing for the source service.
	DisableServiceRouter bool              `yaml:"disable_servicerouter"`
	Namespace            string            `yaml:"namespace"`       // Namespace of the callee: Production/Development.
	CalleeMetadata       map[string]string `yaml:"callee_metadata"` // Set callee metadata.

	Target   string `yaml:"target"`   // Polaris by default, generally no need to configure this.
	Password string `yaml:"password"` // Password for authentication.

	// Naming service four swordsmen.
	// Discovery.List => ServiceRouter.Filter => Loadbalancer.Select => Circuitbreaker.Report
	Discovery      string `yaml:"discovery"`      // Discovery for the backend service.
	ServiceRouter  string `yaml:"servicerouter"`  // Service router for the backend service.
	Loadbalance    string `yaml:"loadbalance"`    // Load balancing algorithm.
	Circuitbreaker string `yaml:"circuitbreaker"` // Circuit breaker configuration.

	Network   string `yaml:"network"`   // Transport protocol type: tcp or udp.
	Timeout   int    `yaml:"timeout"`   // Client timeout in milliseconds.
	Protocol  string `yaml:"protocol"`  // Business protocol type: trpc, http, http_no_protocol, etc.
	Transport string `yaml:"transport"` // Transport type.

	// Serialization type. Use a pointer to check if it has been set (0 means pb).
	Serialization *int `yaml:"serialization"`
	Compression   int  `yaml:"compression"` // Compression type.

	TLSKey  string `yaml:"tls_key"`  // Client TLS key.
	TLSCert string `yaml:"tls_cert"` // Client TLS certificate.
	// CA certificate used to validate the server cert when calling a TLS service (e.g., an HTTPS server).
	CACert string `yaml:"ca_cert"`
	// Server name used to validate the server (default: hostname) when calling an HTTPS server.
	TLSServerName string `yaml:"tls_server_name"`

	Filter       []string `yaml:"filter"`        // Filters for the backend service.
	StreamFilter []string `yaml:"stream_filter"` // Stream filters for the backend service.

	// Report any error to the selector if this value is true.
	ReportAnyErrToSelector bool `yaml:"report_any_err_to_selector"`
}

BackendConfig defines the configuration needed to call the backend service. It's empty by default and can be replaced.

func Config

func Config(callee string) *BackendConfig

Config returns BackendConfig by callee service name.

type Client

type Client interface {
	// Invoke performs a unary RPC.
	Invoke(ctx context.Context, reqBody interface{}, rspBody interface{}, opt ...Option) error
}

Client is the interface that initiates RPCs and sends request messages to a server.

type ClientStream

type ClientStream interface {
	// RecvMsg receives messages.
	RecvMsg(m interface{}) error
	// SendMsg sends messages.
	SendMsg(m interface{}) error
	// CloseSend closes sender.
	// No more sending messages,
	// but it's still allowed to continue to receive messages.
	CloseSend() error
	// Context gets the Context.
	Context() context.Context
}

ClientStream is the interface returned to users to call its methods.

type ClientStreamDesc

type ClientStreamDesc struct {
	// StreamName is the name of the stream, corresponding to Method of unary RPC.
	StreamName string
	// ClientStreams indicates whether it's client streaming.
	ClientStreams bool
	// ServerStreams indicates whether it's server streaming.
	ServerStreams bool
}

ClientStreamDesc is the client stream description.

type Option

type Option func(*Options)

Option sets client options.

func WithAttachment

func WithAttachment(attachment *Attachment) Option

WithAttachment returns an Option that sets attachment.

func WithBalancerName

func WithBalancerName(balancerName string) Option

WithBalancerName returns an Option that sets load balancer by name.

func WithCalleeEnvName

func WithCalleeEnvName(envName string) Option

WithCalleeEnvName returns an Option that sets env name of the callee service.

func WithCalleeMetadata

func WithCalleeMetadata(key string, val string) Option

WithCalleeMetadata returns an Option that sets metadata of callee. It should not be used for env/set as specific methods are provided for env/set.

func WithCalleeMethod

func WithCalleeMethod(method string) Option

WithCalleeMethod returns an Option that sets callee method name.

func WithCalleeSetName

func WithCalleeSetName(setName string) Option

WithCalleeSetName returns an Option that sets "Set" of the callee service.

func WithCallerEnvName

func WithCallerEnvName(envName string) Option

WithCallerEnvName returns an Option that sets env name of the caller service itself.

func WithCallerMetadata

func WithCallerMetadata(key string, val string) Option

WithCallerMetadata returns an Option that sets metadata of caller. It should not be used for env/set as specific methods are provided for env/set.

func WithCallerNamespace

func WithCallerNamespace(s string) Option

WithCallerNamespace returns an Option that sets namespace of the caller service itself.

func WithCallerServiceName

func WithCallerServiceName(s string) Option

WithCallerServiceName returns an Option that sets service name of the caller service itself.

func WithCallerSetName

func WithCallerSetName(setName string) Option

WithCallerSetName returns an Option that sets "Set" of the caller service itself.

func WithCircuitBreakerName

func WithCircuitBreakerName(name string) Option

WithCircuitBreakerName returns an Option that sets circuit breaker by name.

func WithClientStreamQueueSize

func WithClientStreamQueueSize(size int) Option

WithClientStreamQueueSize returns an Option that sets the size of client stream's buffer queue, that is, max number of received messages to put into the channel.

func WithCompressType

func WithCompressType(t int) Option

WithCompressType returns an Option that sets compression type of backend service. Generally, only WithCompressType will be used as WithCurrentCompressType is used for reverse proxy.

func WithConnectionMode

func WithConnectionMode(connMode transport.ConnectionMode) Option

WithConnectionMode returns an Option that sets whether connection mode is connected. If connection mode is connected, udp will isolate packets from non-same path.

func WithCurrentCompressType

func WithCurrentCompressType(t int) Option

WithCurrentCompressType returns an Option that sets compression type of caller itself. WithCompressType should be used to set compression type of backend service.

func WithCurrentSerializationType

func WithCurrentSerializationType(t int) Option

WithCurrentSerializationType returns an Option that sets serialization type of caller itself. WithSerializationType should be used to set serialization type of backend service.

func WithDialTimeout

func WithDialTimeout(dur time.Duration) Option

WithDialTimeout returns an Option that sets timeout.

func WithDisableConnectionPool

func WithDisableConnectionPool() Option

WithDisableConnectionPool returns an Option that disables connection pool.

func WithDisableFilter

func WithDisableFilter() Option

WithDisableFilter returns an Option that sets whether to disable filter. It's used when a plugin setup and need a client to send request but filters' initialization has not been done.

func WithDisableServiceRouter

func WithDisableServiceRouter() Option

WithDisableServiceRouter returns an Option that disables service router.

func WithDiscoveryName

func WithDiscoveryName(name string) Option

WithDiscoveryName returns an Option that sets service discovery by name.

func WithEnvKey

func WithEnvKey(key string) Option

WithEnvKey returns an Option that sets env key.

func WithFilter

func WithFilter(f filter.ClientFilter) Option

WithFilter returns an Option that appends client filter to client filter chain. ClientFilter processing could be before encoding or after decoding. Selector filter is built-in filter and is at the end of the client filter chain by default. It is also supported to set pos of selector filter through the yaml config file.

func WithFilters

func WithFilters(fs []filter.ClientFilter) Option

WithFilters returns an Option that appends multiple client filters to the client filter chain.

func WithKey

func WithKey(key string) Option

WithKey returns an Option that sets the hash key of stateful routing.

func WithLocalAddr

func WithLocalAddr(addr string) Option

WithLocalAddr returns an Option that sets local addr. Randomly picking for multiple NICs.

for non-persistent conn, ip & port can be specified: client.WithLocalAddr("127.0.0.1:8080") for conn pool or multiplexed, only ip can be specified: client.WithLocalAddr("127.0.0.1:")

func WithMaxWindowSize

func WithMaxWindowSize(s uint32) Option

WithMaxWindowSize returns an Option that sets max size of receive window. Client as the receiver will notify the sender of the window.

func WithMetaData

func WithMetaData(key string, val []byte) Option

WithMetaData returns an Option that sets transparent transmitted metadata.

func WithMultiplexed

func WithMultiplexed(enable bool) Option

WithMultiplexed returns an Option that enables multiplexed. WithMultiplexedPool should be used for custom Multiplexed.

func WithMultiplexedPool

func WithMultiplexedPool(p multiplexed.Pool) Option

WithMultiplexedPool returns an Option that sets multiplexed pool. Calling this method enables multiplexing.

func WithNamedFilter

func WithNamedFilter(name string, f filter.ClientFilter) Option

WithNamedFilter returns an Option that adds named filter

func WithNamespace

func WithNamespace(namespace string) Option

WithNamespace returns an Option that sets namespace of backend service: Production/Development.

func WithNetwork

func WithNetwork(s string) Option

WithNetwork returns an Option that sets dial network: tcp/udp, tcp by default.

func WithPassword

func WithPassword(s string) Option

WithPassword returns an Option that sets dial password.

func WithPool

func WithPool(pool connpool.Pool) Option

WithPool returns an Option that sets dial pool.

func WithProtocol

func WithProtocol(s string) Option

WithProtocol returns an Option that sets protocol of backend service like trpc.

func WithRecvControl

func WithRecvControl(rc RecvControl) Option

WithRecvControl returns an Option that sets recv control.

func WithReplicas

func WithReplicas(r int) Option

WithReplicas returns an Option that sets node replicas of stateful routing.

func WithReqHead

func WithReqHead(h interface{}) Option

WithReqHead returns an Option that sets req head. It's default to clone server req head from source request.

func WithRspHead

func WithRspHead(h interface{}) Option

WithRspHead returns an Option that sets rsp head. Usually used for gateway service.

func WithSelectorNode

func WithSelectorNode(n *registry.Node) Option

WithSelectorNode returns an Option that records the selected node. It's usually used for debugging.

func WithSendControl

func WithSendControl(sc SendControl) Option

WithSendControl returns an Option that sets send control.

func WithSendOnly

func WithSendOnly() Option

WithSendOnly returns an Option that sets CallType SendOnly. Generally it's used for udp async sending.

func WithSerializationType

func WithSerializationType(t int) Option

WithSerializationType returns an Option that sets serialization type of backend service. Generally, only WithSerializationType will be used as WithCurrentSerializationType is used for reverse proxy.

func WithServiceName

func WithServiceName(s string) Option

WithServiceName returns an Option that sets service name of backend service.

func WithServiceRouterName

func WithServiceRouterName(name string) Option

WithServiceRouterName returns an Option that sets service router by name.

func WithShouldErrReportToSelector

func WithShouldErrReportToSelector(f func(error) bool) Option

WithShouldErrReportToSelector returns an Option that sets should err report to selector

func WithStreamFilter

func WithStreamFilter(sf StreamFilter) Option

WithStreamFilter returns an Option that appends a client stream filter to the client stream filter chain.

func WithStreamFilters

func WithStreamFilters(sfs ...StreamFilter) Option

WithStreamFilters returns an Option that appends multiple client stream filters to the client stream filter chain. StreamFilter processing could be before or after stream's establishing, before or after sending data, before or after receiving data.

func WithStreamTransport

func WithStreamTransport(st transport.ClientStreamTransport) Option

WithStreamTransport returns an Option that sets client stream transport.

func WithTLS

func WithTLS(certFile, keyFile, caFile, serverName string) Option

WithTLS returns an Option that sets client tls files. If caFile="none", no server cert validation. If caFile="root", local ca cert will be used to validate server. certFile is only used for mTLS or should be empty. serverName is used to validate the name of server. hostname by default for https.

func WithTarget

func WithTarget(t string) Option

WithTarget returns an Option that sets target address using URI scheme://endpoint. e.g. ip://ip_addr:port

func WithTimeout

func WithTimeout(t time.Duration) Option

WithTimeout returns an Option that sets timeout.

func WithTransport

func WithTransport(t transport.ClientTransport) Option

WithTransport returns an Option that sets client transport plugin.

type Options

type Options struct {
	ServiceName       string        // Backend service name.
	CallerServiceName string        // Service name of caller itself.
	CalleeMethod      string        // Callee method name, usually used for metrics.
	Timeout           time.Duration // Timeout.

	// Target is address of backend service: name://endpoint,
	// also compatible with old addressing like ip://ip:port
	Target string

	Network           string
	Protocol          string
	CallType          codec.RequestType           // Type of request, referring to transport.RequestType.
	CallOptions       []transport.RoundTripOption // Options for client transport to call server.
	Transport         transport.ClientTransport
	EnableMultiplexed bool
	StreamTransport   transport.ClientStreamTransport

	SelectOptions        []selector.Option
	Selector             selector.Selector
	DisableServiceRouter bool

	CurrentSerializationType int
	CurrentCompressType      int
	SerializationType        int
	CompressType             int

	Codec                 codec.Codec
	MetaData              codec.MetaData
	ClientStreamQueueSize int // Size of client stream's queue.

	Filters       filter.ClientChain // Filter chain.
	FilterNames   []string           // The name of filters.
	DisableFilter bool               // Whether to disable filter.

	ReqHead interface{} // Allow custom req head.
	RspHead interface{} // Allow custom rsp head.
	Node    *onceNode   // For getting node info.

	MaxWindowSize uint32            // Max size of stream receiver's window.
	SControl      SendControl       // Sender's flow control.
	RControl      RecvControl       // Receiver's flow control.
	StreamFilters StreamFilterChain // Stream filter chain.
	// contains filtered or unexported fields
}

Options are clientside options.

func NewOptions

func NewOptions() *Options

NewOptions creates a new Options with fields set to default value.

func OptionsFromContext

func OptionsFromContext(ctx context.Context) *Options

OptionsFromContext returns options from context.

func (*Options) LoadNodeConfig

func (opts *Options) LoadNodeConfig(node *registry.Node)

LoadNodeConfig loads node config from config center.

type RecvControl

type RecvControl interface {
	OnRecv(n uint32) error
}

RecvControl is the interface used for receiver's flow control.

type SendControl

type SendControl interface {
	GetWindow(uint32) error
	UpdateWindow(uint32)
}

SendControl is the interface used for sender's flow control.

type Stream

type Stream interface {
	// Send sends stream messages.
	Send(ctx context.Context, m interface{}) error
	// Recv receives stream messages.
	Recv(ctx context.Context) ([]byte, error)
	// Init initiates all stream related options.
	Init(ctx context.Context, opt ...Option) (*Options, error)
	// Invoke initiates the lower layer connection to build the stream.
	Invoke(ctx context.Context) error
	// Close closes the stream.
	Close(ctx context.Context) error
}

Stream is the interface that performs streaming RPCs.

type StreamFilter

type StreamFilter func(ctx context.Context, desc *ClientStreamDesc, streamer Streamer) (ClientStream, error)

StreamFilter is the client stream filter. StreamFilter processing happens before or after stream's establishing.

func GetStreamFilter

func GetStreamFilter(name string) StreamFilter

GetStreamFilter returns a StreamFilter by name.

type StreamFilterChain

type StreamFilterChain []StreamFilter

StreamFilterChain client stream filters chain.

func (StreamFilterChain) Filter

func (c StreamFilterChain) Filter(ctx context.Context,
	desc *ClientStreamDesc, streamer Streamer) (ClientStream, error)

Filter implements StreamFilter for multi stream filters.

type Streamer

type Streamer func(ctx context.Context, desc *ClientStreamDesc) (ClientStream, error)

Streamer is the wrapper filter function used to filter all methods of ClientStream.

Directories

Path Synopsis
Package mockclient is a generated GoMock package.
Package mockclient is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL