primitive

package
v0.0.0-...-30a31b1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2019 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

* Define the ctx key and value type.

Index

Constants

View Source
const (

	// method name in  producer
	SendSync   CommunicationMode = "SendSync"
	SendOneway CommunicationMode = "SendOneway"
	SendAsync  CommunicationMode = "SendAsync"
	// method name in consumer
	ConsumerPush = "ConsumerPush"
	ConsumerPull = "ConsumerPull"

	PropCtxType                       = "ConsumeContextType"
	SuccessReturn   ConsumeReturnType = "SUCCESS"
	TimeoutReturn   ConsumeReturnType = "TIMEOUT"
	ExceptionReturn ConsumeReturnType = "EXCEPTION"
	NullReturn      ConsumeReturnType = "RETURNNULL"
	FailedReturn    ConsumeReturnType = "FAILED"
)
View Source
const (
	PropertyKeySeparator                   = " "
	PropertyKeys                           = "KEYS"
	PropertyTags                           = "TAGS"
	PropertyWaitStoreMsgOk                 = "WAIT"
	PropertyDelayTimeLevel                 = "DELAY"
	PropertyRetryTopic                     = "RETRY_TOPIC"
	PropertyRealTopic                      = "REAL_TOPIC"
	PropertyRealQueueId                    = "REAL_QID"
	PropertyTransactionPrepared            = "TRAN_MSG"
	PropertyProducerGroup                  = "PGROUP"
	PropertyMinOffset                      = "MIN_OFFSET"
	PropertyMaxOffset                      = "MAX_OFFSET"
	PropertyBuyerId                        = "BUYER_ID"
	PropertyOriginMessageId                = "ORIGIN_MESSAGE_ID"
	PropertyTransferFlag                   = "TRANSFER_FLAG"
	PropertyCorrectionFlag                 = "CORRECTION_FLAG"
	PropertyMQ2Flag                        = "MQ2_FLAG"
	PropertyReconsumeTime                  = "RECONSUME_TIME"
	PropertyMsgRegion                      = "MSG_REGION"
	PropertyTraceSwitch                    = "TRACE_ON"
	PropertyUniqueClientMessageIdKeyIndex  = "UNIQ_KEY"
	PropertyMaxReconsumeTimes              = "MAX_RECONSUME_TIMES"
	PropertyConsumeStartTime               = "CONSUME_START_TIME"
	PropertyTranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
	PropertyTranscationCheckTimes          = "TRANSACTION_CHECK_TIMES"
	PropertyCheckImmunityTimeInSeconds     = "CHECK_IMMUNITY_TIME_IN_SECONDS"
)

Variables

View Source
var (
	ErrNoNameserver = errors.New("nameServerAddrs can't be empty.")
	ErrMultiIP      = errors.New("multiple IP addr does not support")
	ErrIllegalIP    = errors.New("IP addr error")
)
View Source
var (
	CompressedFlag = 0x1

	MultiTagsFlag = 0x1 << 1

	TransactionNotType = 0

	TransactionPreparedType = 0x1 << 2

	TransactionCommitType = 0x2 << 2

	TransactionRollbackType = 0x3 << 2
)

Functions

func ClearCompressedFlag

func ClearCompressedFlag(flag int) int

func CreateUniqID

func CreateUniqID() string

func GetTransactionValue

func GetTransactionValue(flag int) int

func Pid

func Pid() int16

func ResetTransactionValue

func ResetTransactionValue(flag int, typeFlag int) int

func WithConsumerCtx

func WithConsumerCtx(ctx context.Context, c *ConsumeMessageContext) context.Context

WithConsumerCtx set ConsumeMessageContext in PushConsumer

func WithMethod

func WithMethod(ctx context.Context, m CommunicationMode) context.Context

WithMethod set call method name

func WithProducerCtx

func WithProducerCtx(ctx context.Context, c *ProducerCtx) context.Context

Types

type AccessChannel

type AccessChannel int
const (
	// connect to private IDC cluster.
	Local AccessChannel = iota
	// connect to Cloud service.
	Cloud
)

type CommunicationMode

type CommunicationMode string

func GetMethod

func GetMethod(ctx context.Context) CommunicationMode

GetMethod get call method name

type ConsumeConcurrentlyContext

type ConsumeConcurrentlyContext struct {
	MQ                        MessageQueue
	DelayLevelWhenNextConsume int
	AckIndex                  int32
}

func GetConcurrentlyCtx

func GetConcurrentlyCtx(ctx context.Context) (*ConsumeConcurrentlyContext, bool)

func NewConsumeConcurrentlyContext

func NewConsumeConcurrentlyContext() *ConsumeConcurrentlyContext

type ConsumeMessageContext

type ConsumeMessageContext struct {
	ConsumerGroup string
	Msgs          []*MessageExt
	MQ            *MessageQueue
	Success       bool
	Status        string
	// mqTractContext
	Properties map[string]string
}

func GetConsumerCtx

func GetConsumerCtx(ctx context.Context) (*ConsumeMessageContext, bool)

GetConsumerCtx get ConsumeMessageContext, only legal in PushConsumer. so should add bool return param indicate whether exist.

type ConsumeOrderlyContext

type ConsumeOrderlyContext struct {
	MQ                            MessageQueue
	AutoCommit                    bool
	SuspendCurrentQueueTimeMillis int
}

func GetOrderlyCtx

func GetOrderlyCtx(ctx context.Context) (*ConsumeOrderlyContext, bool)

func NewConsumeOrderlyContext

func NewConsumeOrderlyContext() *ConsumeOrderlyContext

type ConsumeReturnType

type ConsumeReturnType string

func (ConsumeReturnType) Ordinal

func (c ConsumeReturnType) Ordinal() int

type Credentials

type Credentials struct {
	AccessKey     string
	SecretKey     string
	SecurityToken string
}

func (Credentials) IsEmpty

func (c Credentials) IsEmpty() bool

type CtxKey

type CtxKey int

type Interceptor

type Interceptor func(ctx context.Context, req, reply interface{}, next Invoker) error

Interceptor intercepts the invoke of a producer/consumer on messages. In PushConsumer call, the req is []*MessageExt type and the reply is ConsumeResultHolder, use type assert to get real type.

func ChainInterceptors

func ChainInterceptors(interceptors ...Interceptor) Interceptor

type Invoker

type Invoker func(ctx context.Context, req, reply interface{}) error

Invoker finish a message invoke on producer/consumer.

type LocalTransactionState

type LocalTransactionState int
const (
	CommitMessageState LocalTransactionState = iota + 1
	RollbackMessageState
	UnknowState
)

type Message

type Message struct {
	Topic         string
	Body          []byte
	Flag          int32
	TransactionId string
	Batch         bool
	// QueueID is the queue that messages will be sent to. the value must be set if want to custom the queue of message,
	// just ignore if not.
	Queue *MessageQueue
	// contains filtered or unexported fields
}

func NewMessage

func NewMessage(topic string, body []byte) *Message

func (*Message) GetKeys

func (m *Message) GetKeys() string

func (*Message) GetProperty

func (m *Message) GetProperty(key string) string

func (*Message) GetTags

func (m *Message) GetTags() string

func (*Message) MarshallProperties

func (m *Message) MarshallProperties() string

func (*Message) RemoveProperty

func (m *Message) RemoveProperty(key string) string

func (*Message) String

func (m *Message) String() string

func (*Message) UnmarshalProperties

func (m *Message) UnmarshalProperties(data []byte)

unmarshalProperties parse data into property kv pairs.

func (*Message) WithDelayTimeLevel

func (m *Message) WithDelayTimeLevel(level int) *Message

WithDelayTimeLevel set message delay time to consume. reference delay level definition: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h delay level starts from 1. for example, if we set param level=1, then the delay time is 1s.

func (*Message) WithKeys

func (m *Message) WithKeys(keys []string) *Message

func (*Message) WithProperty

func (m *Message) WithProperty(key, value string)

func (*Message) WithTag

func (m *Message) WithTag(tags string) *Message

type MessageExt

type MessageExt struct {
	Message
	MsgId                     string
	QueueId                   int32
	StoreSize                 int32
	QueueOffset               int64
	SysFlag                   int32
	BornTimestamp             int64
	BornHost                  string
	StoreTimestamp            int64
	StoreHost                 string
	CommitLogOffset           int64
	BodyCRC                   int32
	ReconsumeTimes            int32
	PreparedTransactionOffset int64
}

func DecodeMessage

func DecodeMessage(data []byte) []*MessageExt

func (*MessageExt) GetRegionID

func (msgExt *MessageExt) GetRegionID() string

func (*MessageExt) GetTags

func (msgExt *MessageExt) GetTags() string

func (*MessageExt) IsTraceOn

func (msgExt *MessageExt) IsTraceOn() string

func (*MessageExt) String

func (msgExt *MessageExt) String() string

type MessageID

type MessageID struct {
	Addr   string
	Port   int
	Offset int64
}

func UnmarshalMsgID

func UnmarshalMsgID(msgID []byte) (*MessageID, error)

type MessageQueue

type MessageQueue struct {
	Topic      string `json:"topic"`
	BrokerName string `json:"brokerName"`
	QueueId    int    `json:"queueId"`
}

MessageQueue message queue

func (MessageQueue) Equals

func (mq MessageQueue) Equals(queue *MessageQueue) bool

func (*MessageQueue) HashCode

func (mq *MessageQueue) HashCode() int

func (*MessageQueue) String

func (mq *MessageQueue) String() string

type MessageType

type MessageType int
const (
	NormalMsg MessageType = iota
	TransMsgHalf
	TransMsgCommit
	DelayMsg
)

type NamesrvAddr

type NamesrvAddr []string

func NewNamesrvAddr

func NewNamesrvAddr(s ...string) (NamesrvAddr, error)

func (NamesrvAddr) Check

func (addr NamesrvAddr) Check() error

type ProducerCtx

type ProducerCtx struct {
	ProducerGroup     string
	Message           Message
	MQ                MessageQueue
	BrokerAddr        string
	BornHost          string
	CommunicationMode CommunicationMode
	SendResult        *SendResult
	Props             map[string]string
	MsgType           MessageType
	Namespace         string
}

func GetProducerCtx

func GetProducerCtx(ctx context.Context) *ProducerCtx

type PullResult

type PullResult struct {
	NextBeginOffset      int64
	MinOffset            int64
	MaxOffset            int64
	Status               PullStatus
	SuggestWhichBrokerId int64
	// contains filtered or unexported fields
}

PullResult the pull result

func (*PullResult) GetBody

func (result *PullResult) GetBody() []byte

func (*PullResult) GetMessageExts

func (result *PullResult) GetMessageExts() []*MessageExt

func (*PullResult) GetMessages

func (result *PullResult) GetMessages() []*Message

func (*PullResult) SetBody

func (result *PullResult) SetBody(data []byte)

func (*PullResult) SetMessageExts

func (result *PullResult) SetMessageExts(msgExts []*MessageExt)

func (*PullResult) String

func (result *PullResult) String() string

type PullStatus

type PullStatus int

PullStatus pull Status

const (
	PullFound PullStatus = iota
	PullNoNewMsg
	PullNoMsgMatched
	PullOffsetIllegal
	PullBrokerTimeout
)

predefined pull Status

type SendResult

type SendResult struct {
	Status        SendStatus
	MsgID         string
	MessageQueue  *MessageQueue
	QueueOffset   int64
	TransactionID string
	OffsetMsgID   string
	RegionID      string
	TraceOn       bool
}

SendResult RocketMQ send result

func (*SendResult) String

func (result *SendResult) String() string

SendResult send message result to string(detail result)

type SendStatus

type SendStatus int

SendStatus of message

const (
	SendOK SendStatus = iota
	SendFlushDiskTimeout
	SendFlushSlaveTimeout
	SendSlaveNotAvailable
	SendUnknownError

	FlagCompressed = 0x1
	MsgIdLength    = 8 + 8
)

type TraceConfig

type TraceConfig struct {
	TraceTopic   string
	Access       AccessChannel
	NamesrvAddrs []string
}

config for message trace.

type TransactionListener

type TransactionListener interface {
	//  When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
	ExecuteLocalTransaction(Message) LocalTransactionState

	// When no response to prepare(half) message. broker will send check message to check the transaction status, and this
	// method will be invoked to get local transaction status.
	CheckLocalTransaction(MessageExt) LocalTransactionState
}

type TransactionSendResult

type TransactionSendResult struct {
	*SendResult
	State LocalTransactionState
}

SendResult RocketMQ send result

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL