baseagg

package
v1.8.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2024 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MaxLogCount     = 1024
	MaxLogGroupSize = 3 * 1024 * 1024
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AggregatorBase

type AggregatorBase struct {
	MaxLogGroupCount int    // the maximum log group count to trigger flush operation
	MaxLogCount      int    // the maximum log in a log group
	PackFlag         bool   // whether to add config name as a tag
	Topic            string // the output topic

	Lock *sync.Mutex
	// contains filtered or unexported fields
}

Other aggregators can use AggregatorBase as base aggregator.

For inner usage, note about following information. There is a quick flush design in AggregatorBase, which is implemented in Add method (search p.queue.Add in current file). Therefore, not all LogGroups are returned through Flush method. If you want to do some operations (such as adding tags) on LogGroups returned by AggregatorBase in your own aggregator, you should do some extra works, just see the sample code in doc.go.

func NewAggregatorBase

func NewAggregatorBase() *AggregatorBase

NewAggregatorBase create a default aggregator with default value.

func (*AggregatorBase) Add

func (p *AggregatorBase) Add(log *protocol.Log, ctx map[string]interface{}) error

Add adds @log to aggregator. It uses defaultLogGroup to store log groups which contain logs as following: defaultLogGroup => [LG1: log1->log2->log3] -> [LG2: log1->log2->log3] -> .. The last log group is set as nowLogGroup, @log will be appended to nowLogGroup if the size and log count of the log group don't exceed limits (MaxLogCount and MAX_LOG_GROUP_SIZE). When nowLogGroup exceeds limits, Add creates a new log group and switch nowLogGroup to it, then append @log to it. When the count of log group reaches MaxLogGroupCount, the first log group will be popped from defaultLogGroup list and add to queue (after adding pack_id tag). Add returns any error encountered, nil means success.

@return error. **For inner usage, must handle this error!!!!**

func (*AggregatorBase) Description

func (*AggregatorBase) Description() string

func (*AggregatorBase) Flush

func (p *AggregatorBase) Flush() []*protocol.LogGroup

Flush ...

func (*AggregatorBase) GetResult added in v1.4.0

func (p *AggregatorBase) GetResult(ctx pipeline.PipelineContext) error

GetResult the current aggregates to the accumulator.

func (*AggregatorBase) Init

func (p *AggregatorBase) Init(context pipeline.Context, que pipeline.LogGroupQueue) (int, error)

Init method would be trigger before working. 1. context store the metadata of this Logstore config 2. que is a transfer channel for flushing LogGroup when reaches the maximum in the cache.

func (*AggregatorBase) InitInner

func (p *AggregatorBase) InitInner(packFlag bool, packString string, lock *sync.Mutex, logstore string, topic string, maxLogCount int, maxLoggroupCount int)

InitInner initializes instance for other aggregators.

func (*AggregatorBase) Record added in v1.4.0

func (*AggregatorBase) Reset

func (p *AggregatorBase) Reset()

Reset ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL