telemetry

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// SizeLimit maximum incoming payload size from the vehicle
	SizeLimit = 1000000 // 1mb

)

Variables

View Source
var ErrMessageTooBig = fmt.Errorf("can't process message, size above 1mb")

ErrMessageTooBig handles error when incoming payload is too large

Functions

func BuildTopicName added in v0.0.5

func BuildTopicName(namespace, recordName string) string

BuildTopicName creates a topic from a namespace and a recordName

func ParseLocation added in v0.1.1

func ParseLocation(s string) (*protos.LocationValue, error)

ParseLocation parses a location string (such as "(37.412374 N, 122.145867 W)") into a *proto.Location type.

Types

type BinarySerializer

type BinarySerializer struct {
	DispatchRules   map[string][]Producer
	RequestIdentity *RequestIdentity
	// contains filtered or unexported fields
}

BinarySerializer serializes records

func NewBinarySerializer

func NewBinarySerializer(requestIdentity *RequestIdentity, dispatchRules map[string][]Producer, logger *logrus.Logger) *BinarySerializer

NewBinarySerializer returns a dedicated serializer for a current socket connection

func (*BinarySerializer) Ack

func (bs *BinarySerializer) Ack(record *Record) []byte

Ack returns an ack response

func (*BinarySerializer) Deserialize

func (bs *BinarySerializer) Deserialize(msg []byte, socketID string) (record *Record, err error)

Deserialize transforms a csv byte array into a Record

func (*BinarySerializer) Dispatch

func (bs *BinarySerializer) Dispatch(record *Record)

Dispatch pushes the record to kafka for every rule associated to it

func (*BinarySerializer) Error

func (bs *BinarySerializer) Error(err error, record *Record) []byte

Error returns an error response

func (*BinarySerializer) Logger

func (bs *BinarySerializer) Logger() *logrus.Logger

Logger returns logger for the serializer

type Dispatcher

type Dispatcher string

Dispatcher type of telemetry record dispatcher

const (
	// Pubsub registers a Google pubsub dispatcher
	Pubsub Dispatcher = "pubsub"
	// Kafka registers a kafka dispatcher
	Kafka Dispatcher = "kafka"
	// Kinesis registers a kinesis publisher
	Kinesis Dispatcher = "kinesis"
	// Logger registers a simple logger
	Logger Dispatcher = "logger"
	// ZMQ registers a zmq logger
	ZMQ Dispatcher = "zmq"
)

type NonAnonymizedError

type NonAnonymizedError struct {
}

NonAnonymizedError is an error struct representing mismatch ID

func (*NonAnonymizedError) Error

func (e *NonAnonymizedError) Error() string

Error returns an error string implementing the error interface

type Producer

type Producer interface {
	Produce(entry *Record)
	ProcessReliableAck(entry *Record)
	ReportError(message string, err error, logInfo logrus.LogInfo)
}

Producer handles dispatching data received from the vehicle

type Record

type Record struct {
	ProduceTime       time.Time
	ReceivedTimestamp int64
	Serializer        *BinarySerializer
	SocketID          string
	Timestamp         int64
	Txid              string
	TxType            string
	TripID            string
	Version           int
	Vin               string
	PayloadBytes      []byte
	RawBytes          []byte
	// contains filtered or unexported fields
}

Record is a structs that represents the telemetry records vehicles send to the backend vin is used as kafka produce partitioning key by default, can be configured to random

func NewRecord

func NewRecord(ts *BinarySerializer, msg []byte, socketID string, transmitDecodedRecords bool) (*Record, error)

NewRecord Sanitizes and instantiates a Record from a message !! caller expect *Record to not be nil !!

func (*Record) Ack

func (record *Record) Ack() []byte

Ack returns an ack response from the serializer

func (*Record) Dispatch

func (record *Record) Dispatch()

Dispatch uses the configuration to send records to the list of backends/data stores they belong

func (*Record) Encode

func (record *Record) Encode() ([]byte, error)

Encode encodes the records into bytes

func (*Record) Error

func (record *Record) Error(err error) []byte

Error returns an error response from the serializer

func (*Record) GetJSONPayload added in v0.1.9

func (record *Record) GetJSONPayload() ([]byte, error)

func (*Record) GetProtoMessage added in v0.1.9

func (record *Record) GetProtoMessage() (proto.Message, error)

GetProtoMessage converts the record to a proto Message

func (*Record) Length

func (record *Record) Length() int

Length gets the records byte size

func (*Record) LengthRawBytes added in v0.2.1

func (record *Record) LengthRawBytes() int

Length gets the records flatbuffer payload byte size

func (*Record) Metadata

func (record *Record) Metadata() map[string]string

Metadata converts record to metadata map

func (*Record) Payload

func (record *Record) Payload() []byte

Payload returns the bytes of the telemetry record gdata

func (*Record) Raw

func (record *Record) Raw() []byte

Raw returns the raw telemetry record

type RequestIdentity

type RequestIdentity struct {
	DeviceID string
	SenderID string
}

RequestIdentity stores identifiers for the socket connection

type UnauthorizedSenderIDError

type UnauthorizedSenderIDError struct {
	ExpectedSenderID string
	ReceivedSenderID string
}

UnauthorizedSenderIDError is an error struct representing mismatch ID

func (*UnauthorizedSenderIDError) Error

func (e *UnauthorizedSenderIDError) Error() string

Error returns an error string implementing the error interface

type UnknownMessageType

type UnknownMessageType struct {
	Bytes       []byte
	Txid        string
	GuessedType byte
}

UnknownMessageType is an error struct representing a message that cannot be parsed

func (*UnknownMessageType) Error

func (e *UnknownMessageType) Error() string

Error returns an error string implementing the error interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL