Documentation ¶
Overview ¶
Trigger processor for monitoring data
Index ¶
- Constants
- func CouchbaseSaverBucketKey(hostKey string, itemKey string, timestamp uint64) string
- func SetConfigDefaults(cfg *Config) error
- func StartHttp(rh RequestHandler, cfg Config, metricsNamespace string) error
- func StatesEq(a *State, b *State) bool
- type Cmd
- type CmdBuffer
- type CmdBufferProcessor
- type CmdBufferProcessorImpl
- type Config
- type CouchbaseDataReader
- type CouchbaseSaver
- type CouchbaseStateKeeper
- type CouchbaseTriggersGetter
- type CounterMetric
- type DataReader
- type DataTimeSorter
- type Executer
- type HttpServer
- type IncomingData
- type IncomingHostData
- type IncomingMessage
- type IncomingValueData
- type MetricSet
- type MetricType
- type MySQLDataReader
- type MySQLSaver
- type MySQLStateKeeper
- type MySQLTriggersGetter
- type RequestHandler
- type RequestHandlerImpl
- type ResponseMessage
- type SPExecuter
- type SendBuffer
- type SendBufferImpl
- type State
- type StateElem
- type StateKeeper
- type StateKeeperAnswer
- type TimerMetric
- type TimerMetricObservation
- type TriggersGetter
- type WorkerProcessInput
- type WorkerProcessOutput
Constants ¶
const CouchbaseDataBucketQuantum = 7200 // 2 hours
Variables ¶
This section is empty.
Functions ¶
func CouchbaseSaverBucketKey ¶
func SetConfigDefaults ¶
Setup defaults for empty values in configs Returns an error if mandatory field omited
Types ¶
type CmdBufferProcessor ¶
Iterface for command commiter Returns data for next processing stage or error
type CmdBufferProcessorImpl ¶
type CmdBufferProcessorImpl struct { // Send buffer SBuffer SendBuffer // Data saver Saver SendBuffer // FIXME Other commands CmdOutput io.Writer }
type Config ¶
type Config struct { // Http interface for incoming data IncomingHttp struct { // Addr for incomming-data // e.g. "0.0.0.0:4000" or ":4000" for ipv6 Addr string } // Http interface for data requests DataHttp struct { // Addr for data request // e.g. "0.0.0.0:4000" or ":4000" for ipv6 Addr string // Http prefix // e.g. /data (default) Prefix string } // Logging options Log struct { // Files for logging (stderr if empty) // For hammyd daemon HammyDFile string // For hammydatad daemon HammyDataDFile string // FIXME CmdOutput CmdOutputFile string } // Debug and statistics Debug struct { // Addrs for debug and statistic information // e.g. "localhost:6060" (default) // For hammyd daemon HammyDAddr string // For hammydatad daemon HammyDataDAddr string } // Workers Workers struct { // Count of workers PoolSize uint // Worker cmd CmdLine string // Worker live limit MaxIter uint // Worker timeout (before kill) Timeout uint } // Send buffer SendBuffer struct { SleepTime float32 } // Coucbase for triggers configuration CouchbaseTriggers struct { // Use this implementation Active bool // e.g. "http://dev-couchbase.example.com:8091/" ConnectTo string // e.g. "default" Pool string // e.g. "default" Bucket string } // Coucbase for state storage CouchbaseStates struct { // Use this implementation Active bool // e.g. "http://dev-couchbase.example.com:8091/" ConnectTo string // e.g. "default" Pool string // e.g. "default" Bucket string // TTL in seconds, default 86400 (day) Ttl int } // Data saver CouchbaseSaver struct { // Use this implementation Active bool // e.g. "http://dev-couchbase.example.com:8091/" ConnectTo string // e.g. "default" Pool string // e.g. "default" Bucket string // Internal write queue size QueueSize uint // Connections for saving SavePoolSize uint // TTL in seconds, default 259200 (3 days) Ttl int } // Data reader CouchbaseDataReader struct { // Use this implementation Active bool // e.g. "http://dev-couchbase.example.com:8091/" ConnectTo string // e.g. "default" Pool string // e.g. "default" Bucket string } // MySQL for triggers configuration MySQLTriggers struct { // Use this implementation Active bool // Database to connect Database string // DB user User string // DB user password Password string // table that contains triggers (host, trigger) Table string // Limit for parallel connections MaxConn int } // MySQL for states MySQLStates struct { // Use this implementation Active bool // Database to connect Database string // DB user User string // DB user password Password string // table that contains states (host, state, cas) Table string // Limit for parallel connections MaxConn int } // MySQL historical data saver MySQLSaver struct { // Use this implementation Active bool // Database to connect Database string // DB user User string // DB user password Password string // table that contains numeric history Table string // table that contains text history LogTable string // table that contains hosts HostTable string // table that contains items ItemTable string // Limit for parallel connections MaxConn int } // MySQL historical data reader MySQLDataReader struct { // Use this implementation Active bool // Database to connect Database string // DB user User string // DB user password Password string // table that contains numeric history Table string // table that contains text history LogTable string // table that contains hosts HostTable string // table that contains items ItemTable string // Limit for parallel connections MaxConn int } }
Programm configuration
type CouchbaseDataReader ¶
type CouchbaseDataReader struct {
// contains filtered or unexported fields
}
Reads data from write cache (couchbase-based)
func NewCouchbaseDataReader ¶
func NewCouchbaseDataReader(cfg Config) (*CouchbaseDataReader, error)
Create new saver
func (*CouchbaseDataReader) Read ¶
func (cr *CouchbaseDataReader) Read(hostKey string, itemKey string, from uint64, to uint64) (data []IncomingValueData, err error)
type CouchbaseSaver ¶
type CouchbaseSaver struct { Ttl uint32 // contains filtered or unexported fields }
Saves historical data to write chache (based on couchbase)
func NewCouchbaseSaver ¶
func NewCouchbaseSaver(cfg Config) (*CouchbaseSaver, error)
Create new saver
func (*CouchbaseSaver) Push ¶
func (s *CouchbaseSaver) Push(data *IncomingData)
Enqueue data for saving
type CouchbaseStateKeeper ¶
type CouchbaseStateKeeper struct { Client *couchbase.Client Pool *couchbase.Pool Bucket *couchbase.Bucket Ttl uint32 }
func NewCouchbaseStateKeeper ¶
func NewCouchbaseStateKeeper(cfg Config) (*CouchbaseStateKeeper, error)
func (*CouchbaseStateKeeper) Get ¶
func (sk *CouchbaseStateKeeper) Get(key string) StateKeeperAnswer
func (*CouchbaseStateKeeper) MGet ¶
func (sk *CouchbaseStateKeeper) MGet(keys []string) (states map[string]StateKeeperAnswer)
type CouchbaseTriggersGetter ¶
type CouchbaseTriggersGetter struct { Client *couchbase.Client Pool *couchbase.Pool Bucket *couchbase.Bucket }
func NewCouchbaseTriggersGetter ¶
func NewCouchbaseTriggersGetter(cfg Config) (*CouchbaseTriggersGetter, error)
type CounterMetric ¶
type CounterMetric struct {
// contains filtered or unexported fields
}
func (*CounterMetric) Add ¶
func (m *CounterMetric) Add(n uint64)
type DataReader ¶
type DataReader interface {
Read(hostKey string, itemKey string, from uint64, to uint64) (data []IncomingValueData, err error)
}
Reads data from write cache or storage
type DataTimeSorter ¶
type DataTimeSorter struct {
Data *[]IncomingValueData
}
Struct for sorting []IncomingValueData slice by Timestamp
func (*DataTimeSorter) Len ¶
func (ds *DataTimeSorter) Len() int
func (*DataTimeSorter) Less ¶
func (ds *DataTimeSorter) Less(i, j int) bool
func (*DataTimeSorter) Swap ¶
func (ds *DataTimeSorter) Swap(i, j int)
type Executer ¶
type Executer interface { // Process trigger for one host ProcessTrigger(key string, trigger string, state *State, data IncomingHostData) (newState *State, cmdb *CmdBuffer, err error) }
Interface for trigger executer
type HttpServer ¶
type HttpServer struct { // Request handler object RHandler RequestHandler // contains filtered or unexported fields }
Http server object InitMetric must be called before use
func (*HttpServer) InitMetrics ¶
func (h *HttpServer) InitMetrics(metricsNamespace string)
Initialize metric objects
func (*HttpServer) ServeHTTP ¶
func (h *HttpServer) ServeHTTP(w http.ResponseWriter, r *http.Request)
Request handler
type IncomingData ¶
type IncomingData map[string]IncomingHostData
Type for incoming monitoring data Format (in json notation):
{ "host1": { "key1.1": [{ "Timestamp": 1361785778, "Value": 3.14 }] }, "host2": { "key2.1": [{ "Timestamp": 1361785817, "Value": "test string" }], "key2.2": [{ "Timestamp": 1361785858, "Value": 12345 }, { "Timestamp": 1361785927, "Value": 999.3 }] } }
type IncomingHostData ¶
type IncomingHostData map[string][]IncomingValueData
type IncomingMessage ¶
type IncomingMessage struct { // Incoming monitoring data Data IncomingData // Processing level (0 for new data) // Increments after each resend Level uint32 }
Type for incoming monitoring data request
type IncomingValueData ¶
type IncomingValueData struct { Timestamp uint64 Value interface{} }
type MetricSet ¶
type MetricSet struct {
// contains filtered or unexported fields
}
Namespaced set of metrics
func NewMetricSet ¶
Creates new metric set or panic if namespace exists
func (*MetricSet) NewCounter ¶
func (ms *MetricSet) NewCounter(name string) *CounterMetric
func (*MetricSet) NewTimer ¶
func (ms *MetricSet) NewTimer(name string) *TimerMetric
type MySQLDataReader ¶
type MySQLDataReader struct {
// contains filtered or unexported fields
}
Driver for retriving historical data from MySQL database See mysql_saver.go for details about schema
func NewMySQLDataReader ¶
func NewMySQLDataReader(cfg Config) (dr *MySQLDataReader, err error)
func (*MySQLDataReader) Read ¶
func (dr *MySQLDataReader) Read(hostKey string, itemKey string, from uint64, to uint64) (data []IncomingValueData, err error)
type MySQLSaver ¶
type MySQLSaver struct {
// contains filtered or unexported fields
}
Driver for saving historical data in MySQL database It's assumes the tables structure like this:
CREATE TABLE `history_host` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `by_name` (`name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 CREATE TABLE `history_item` ( `id` int(11) NOT NULL AUTO_INCREMENT, `host_id` int(11) NOT NULL, `name` varchar(255) NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `by_name` (`host_id`, `name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 CREATE TABLE `history` ( `item_id` int(11) NOT NULL, `timestamp` DATETIME NOT NULL, `value` DOUBLE NOT NULL, PRIMARY KEY (`item_id`, `timestamp`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 CREATE TABLE `history_log` ( `item_id` int(11) NOT NULL, `timestamp` DATETIME NOT NULL, `value` TEXT NOT NULL, PRIMARY KEY (`item_id`, `timestamp`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
func NewMySQLSaver ¶
func NewMySQLSaver(cfg Config) (s *MySQLSaver, err error)
func (*MySQLSaver) Push ¶
func (s *MySQLSaver) Push(data *IncomingData)
type MySQLStateKeeper ¶
type MySQLStateKeeper struct {
// contains filtered or unexported fields
}
Driver for retriving and saving state in MySQL database It's assumes the table structure like this:
CREATE TABLE `states` ( `host` varchar(255) NOT NULL, `state` text, `cas` BIGINT NOT NULL DEFAULT 0, PRIMARY KEY (`host`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
func NewMySQLStateKeeper ¶
func NewMySQLStateKeeper(cfg Config) (sk *MySQLStateKeeper, err error)
func (*MySQLStateKeeper) Get ¶
func (sk *MySQLStateKeeper) Get(key string) (ans StateKeeperAnswer)
func (*MySQLStateKeeper) MGet ¶
func (sk *MySQLStateKeeper) MGet(keys []string) (states map[string]StateKeeperAnswer)
type MySQLTriggersGetter ¶
type MySQLTriggersGetter struct {
// contains filtered or unexported fields
}
Driver for retriving triggers from MySQL database It's assumes the table structure like this:
CREATE TABLE `triggers` ( `host` varchar(255) NOT NULL, `trigger` text, PRIMARY KEY (`host`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
func NewMySQLTriggersGetter ¶
func NewMySQLTriggersGetter(cfg Config) (tg *MySQLTriggersGetter, err error)
type RequestHandler ¶
type RequestHandler interface {
Handle(data IncomingData) map[string]error
}
Interface for incoming data handler
type RequestHandlerImpl ¶
type RequestHandlerImpl struct { // Interface for triggers retriving TGetter TriggersGetter // Interface for state storage SKeeper StateKeeper // Interface for executer Exec Executer // Interface for command commiter CBProcessor CmdBufferProcessor // contains filtered or unexported fields }
Main data processor implementation You must call InitMetrics before use new object
func (*RequestHandlerImpl) Handle ¶
func (rh *RequestHandlerImpl) Handle(data IncomingData) (errs map[string]error)
func (*RequestHandlerImpl) InitMetrics ¶
func (rh *RequestHandlerImpl) InitMetrics(metricsNamespace string)
Initializes statistical metrics
type SPExecuter ¶
type SPExecuter struct {
// contains filtered or unexported fields
}
Executer implementation for subprocesses with MessagePack-based RPC
func NewSPExecuter ¶
func NewSPExecuter(cfg Config, metricNamespace string) *SPExecuter
Create new instance of SPExecutor per process
func (*SPExecuter) ProcessTrigger ¶
func (e *SPExecuter) ProcessTrigger(key string, trigger string, state *State, data IncomingHostData) (newState *State, cmdb *CmdBuffer, err error)
type SendBuffer ¶
type SendBuffer interface {
Push(data *IncomingData)
}
Interface for internal sendbuffers and for external data savers
type SendBufferImpl ¶
type SendBufferImpl struct {
// contains filtered or unexported fields
}
Buffer for reprocessed data
func NewSendBufferImpl ¶
func NewSendBufferImpl(rh RequestHandler, cfg Config, metricsNamespace string) (sb *SendBufferImpl)
Creates and initialize new SendBuffer
func (*SendBufferImpl) Push ¶
func (sb *SendBufferImpl) Push(data *IncomingData)
Enqueue data for reprocessing
type State ¶
Type for state of an host maps keys to values with timestamps of last update Format (in json notation):
{ "key1": { "Value": 10.3, "LastUpdate": 1361785927 }, "key2": { "Value": "booo!", "LastUpdate": 1361785778 }
type StateElem ¶
type StateElem struct { LastUpdate uint64 Value interface{} }
Type for State element
type StateKeeper ¶
type StateKeeper interface { Get(key string) StateKeeperAnswer MGet(keys []string) map[string]StateKeeperAnswer Set(key string, data State, cas *uint64) (retry bool, err error) }
Interface for state storage
type StateKeeperAnswer ¶
Answer of StateKeeper's get requests
type TimerMetric ¶
type TimerMetric struct {
// contains filtered or unexported fields
}
func (*TimerMetric) Add ¶
func (m *TimerMetric) Add(τ time.Duration)
func (*TimerMetric) NewObservation ¶
func (m *TimerMetric) NewObservation() *TimerMetricObservation
type TimerMetricObservation ¶
type TimerMetricObservation struct {
// contains filtered or unexported fields
}
func (*TimerMetricObservation) End ¶
func (τ *TimerMetricObservation) End()
type TriggersGetter ¶
Interface for trigger configuration
type WorkerProcessInput ¶
type WorkerProcessInput struct { Hostname string Trigger string State *State IData IncomingHostData }
type WorkerProcessOutput ¶
Source Files ¶
- cmd_buffer.go
- cmd_buffer_processor.go
- config.go
- couchbase_datareader.go
- couchbase_saver.go
- couchbase_state.go
- couchbase_triggers.go
- data_time_sorter.go
- drivers.go
- executer.go
- http.go
- metrics.go
- mysql_datareader.go
- mysql_saver.go
- mysql_state.go
- mysql_triggers.go
- overview.go
- request.go
- request_impl.go
- send_buffer.go
- spexecuter.go