app

package
v0.0.0-...-7323fb3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2021 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GenerateCPUTimeRecords

func GenerateCPUTimeRecords(recordChan chan *tipb.CPUTimeRecord)

func GeneratePlanMeta

func GeneratePlanMeta(planMetaChan chan *tipb.PlanMeta)

func GenerateSQLMeta

func GenerateSQLMeta(sqlMetaChan chan *tipb.SQLMeta)

func QueryInfluxDB

func QueryInfluxDB()

func QueryTiDB

func QueryTiDB(dsn string, workers, queryNum int, randomQuery bool)

func SendRequest

func SendRequest()

func StartMonkeyServer

func StartMonkeyServer(proxyAddress string)

func WriteInfluxDB

func WriteInfluxDB()

func WriteTiDB

func WriteTiDB(dsn string)

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(
	cpuTimeRecordStream tipb.TopSQLAgent_ReportCPUTimeRecordsClient,
	sqlMetaStream tipb.TopSQLAgent_ReportSQLMetaClient,
	planMetaStream tipb.TopSQLAgent_ReportPlanMetaClient,
) *Client

func (*Client) Start

func (s *Client) Start()

Start starts a goroutine, which sends tidb-server's last minute's data to the gRPC server

type DiskWAL

type DiskWAL struct {
}

func NewDiskWAL

func NewDiskWAL() *DiskWAL

func (*DiskWAL) ReadNext

func (wal *DiskWAL) ReadNext() interface{}

ReadNext reads the next record from the current segment file

func (*DiskWAL) WriteMulti

func (wal *DiskWAL) WriteMulti(data []interface{})

WriteMulti writes multiple encoded bytes to the current segment file

type MemStore

type MemStore struct {
	// contains filtered or unexported fields
}

MemStore is for testing purpose

func NewMemStore

func NewMemStore() *MemStore

func (*MemStore) InitSchema

func (s *MemStore) InitSchema() error

func (*MemStore) WriteCPUTimeRecord

func (s *MemStore) WriteCPUTimeRecord(record *tipb.CPUTimeRecord, instanceID string) error

func (*MemStore) WritePlanMeta

func (s *MemStore) WritePlanMeta(meta *tipb.PlanMeta) error

func (*MemStore) WriteSQLMeta

func (s *MemStore) WriteSQLMeta(meta *tipb.SQLMeta) error

type MemWAL

type MemWAL struct {
	// contains filtered or unexported fields
}

func NewMemWAL

func NewMemWAL() *MemWAL

func (*MemWAL) Acknowledge

func (wal *MemWAL) Acknowledge(count uint32)

func (*MemWAL) ReadMulti

func (wal *MemWAL) ReadMulti(count uint32) interface{}

func (*MemWAL) WriteMulti

func (wal *MemWAL) WriteMulti(data interface{})

type Prefetcher

type Prefetcher struct {
	// contains filtered or unexported fields
}

func NewPrefetcher

func NewPrefetcher(wal WAL, config PrefetcherConfig) *Prefetcher

TODO: load from WAL Currently we write prefetch buffer directly from the receiver.

func (*Prefetcher) ReadOneCPUTimeRecord

func (p *Prefetcher) ReadOneCPUTimeRecord() *tipb.CPUTimeRecord

func (*Prefetcher) ReadOnePlanMetaOrNil

func (p *Prefetcher) ReadOnePlanMetaOrNil() *tipb.PlanMeta

func (*Prefetcher) ReadOneSQLMetaOrNil

func (p *Prefetcher) ReadOneSQLMetaOrNil() *tipb.SQLMeta

func (*Prefetcher) WriteOneCPUTimeRecordOrDrop

func (p *Prefetcher) WriteOneCPUTimeRecordOrDrop(record *tipb.CPUTimeRecord)

func (*Prefetcher) WriteOnePlanMetaOrDrop

func (p *Prefetcher) WriteOnePlanMetaOrDrop(meta *tipb.PlanMeta)

func (*Prefetcher) WriteOneSQLMetaOrDrop

func (p *Prefetcher) WriteOneSQLMetaOrDrop(meta *tipb.SQLMeta)

type PrefetcherConfig

type PrefetcherConfig struct {
	CPUTimeRecordCount uint32
	SQLMetaCount       uint32
	PlanMetaCount      uint32
}

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

TODO: add back pressure to ensure memory not growing to da moon

func NewReceiver

func NewReceiver(wal WAL, prefetcher *Prefetcher) *Receiver

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

func NewSender

func NewSender(prefetcher *Prefetcher, store Store) *Sender

func (*Sender) Start

func (s *Sender) Start()

type Store

type Store interface {
	WriteCPUTimeRecord(record *tipb.CPUTimeRecord, instanceID string) error
	WriteSQLMeta(*tipb.SQLMeta) error
	WritePlanMeta(*tipb.PlanMeta) error
	InitSchema() error
}

type TiDBStore

type TiDBStore struct {
	// contains filtered or unexported fields
}

TiDBStore uses TiDB as the storage bakend

func NewTiDBStore

func NewTiDBStore(dsn string, clusterID uint64) *TiDBStore

func (*TiDBStore) InitSchema

func (s *TiDBStore) InitSchema() error

func (*TiDBStore) WriteCPUTimeRecord

func (s *TiDBStore) WriteCPUTimeRecord(record *tipb.CPUTimeRecord, instanceID string) error

func (*TiDBStore) WritePlanMeta

func (s *TiDBStore) WritePlanMeta(meta *tipb.PlanMeta) error

func (*TiDBStore) WriteSQLMeta

func (s *TiDBStore) WriteSQLMeta(meta *tipb.SQLMeta) error

type TopSQLAgentServer

type TopSQLAgentServer struct {
	// contains filtered or unexported fields
}

TopSQLAgentServer is the main struct which controls the Top SQL metrics collecting and reporting.

In general, there are several parts in the agent which are working together:

  • gRPC server receives raw data from tidb-server/tikv-server
  • receiver writes gRPC data to WAL, which typically resides in the disk. There're also in-memory implementations which is used in testing.
  • WAL-buffer-prefetcher prefetches WAL data to an in-memory buffer, which is a sliding-window-like in-memory view of the WAL data. The prefetch buffer has an upper limit, which can be seemed as the max window size of the view.
  • sender reads data from prefetched buffer, and asynchronously sends it to the remote store.

When the data is receiving and sending quickly enough, the data will only be written to WAL once, and served directly to the sender from receiver in memory, which is IO efficient. When the agent process crashes, it will re-construct the prefetch buffer, and restarts sending data to the store as usual. After all, the prefetched buffer is the only source for data from which to get out of the agent process.

func NewAgentServer

func NewAgentServer(wal WAL, store Store) *TopSQLAgentServer

func StartGrpcServer

func StartGrpcServer(addr string, store Store) *TopSQLAgentServer

func (*TopSQLAgentServer) ReportCPUTimeRecords

func (as *TopSQLAgentServer) ReportCPUTimeRecords(stream tipb.TopSQLAgent_ReportCPUTimeRecordsServer) error

func (*TopSQLAgentServer) ReportPlanMeta

func (as *TopSQLAgentServer) ReportPlanMeta(stream tipb.TopSQLAgent_ReportPlanMetaServer) error

func (*TopSQLAgentServer) ReportSQLMeta

func (as *TopSQLAgentServer) ReportSQLMeta(stream tipb.TopSQLAgent_ReportSQLMetaServer) error

func (*TopSQLAgentServer) Start

func (as *TopSQLAgentServer) Start()

type Type

type Type uint8
const (
	// TODO: this needs to be rearranged to fit any collecting interval larger than 1s
	TypeCPUTimeRecord Type = iota
	TypeSQLMeta
	TypePlanMeta
)

type WAL

type WAL interface {
	WriteMulti(interface{})
	ReadMulti(count uint32) interface{}
	Acknowledge(count uint32)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL