core

package
v0.0.0-...-5e0ed37 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2021 License: Apache-2.0 Imports: 3 Imported by: 0

Documentation

Overview

Package core contains the core basic transforms which are available in PipeScript. It should be imported by default by basically all users of PipeScript

Index

Constants

This section is empty.

Variables

View Source
var I = &pipescript.Transform{
	Name:          "i",
	Description:   "Gives array index of the timeseries",
	Documentation: string(resources.MustAsset("docs/transforms/i.md")),
	OutputSchema: map[string]interface{}{
		"type": "integer",
	},
	Constructor: func(transform *pipescript.Transform, consts []interface{}, pipes []*pipescript.Pipe) (pipescript.TransformIterator, error) {
		return &iIter{}, nil
	},
}
View Source
var Map = &pipescript.Transform{
	Name:          "map",
	Description:   "Splits the timeseries by the first arg, and returns an object where each key is the result of running the pipe in the transform in the second arg on the split series",
	Documentation: string(resources.MustAsset("docs/transforms/map.md")),
	Args: []pipescript.TransformArg{
		{
			Description: "The value to split on. This must be something that can be converted to string.",
			Type:        pipescript.TransformArgType,
			Schema: map[string]interface{}{
				"type": "boolean",
			},
		},
		{
			Description: "The transform to instantiate for each different value of the first argument.",
			Type:        pipescript.PipeArgType,
		},
	},
	Constructor: pipescript.NewAggregator(func(e *pipescript.TransformEnv, consts []interface{}, pipes []*pipescript.Pipe, out *pipescript.Datapoint) (*pipescript.Datapoint, error) {

		data := make(map[string]interface{})

		cp := make(map[string]*mapChannel)

		defer func() {
			for _, p := range cp {
				p.cp.Close()
			}
		}()

		argarray := make([]*pipescript.Datapoint, 1)

		dp, args, err := e.Next(argarray)
		if err != nil || dp == nil {
			return nil, err
		}
		dp2 := dp
		out.Timestamp = dp.Timestamp
		for dp != nil {
			dp2 = dp
			key := args[0].ToString()
			p, ok := cp[key]
			if !ok {
				p = &mapChannel{pipescript.NewChannelPipe(pipes[0].Copy()), false}
				cp[key] = p
			}
			sentDP := false
			for !sentDP && !p.done {
				select {
				case res := <-p.cp.Receiver:
					if res.Err != nil {
						return nil, res.Err
					}
					if res.DP == nil {
						p.done = true
					} else {
						data[key] = res.DP.Data
					}
				case p.cp.Sender <- dp:
					sentDP = true
				}
			}

			dp, args, err = e.Next(argarray)
			if err != nil {
				return nil, err
			}
		}

		for key, p := range cp {
			sentDP := false
			for !sentDP && !p.done {
				select {
				case res := <-p.cp.Receiver:
					if res.Err != nil {
						return nil, res.Err
					}
					if res.DP == nil {
						p.done = true
					} else {
						data[key] = res.DP.Data
					}
				case p.cp.Sender <- nil:
					sentDP = true
				}
			}
		}

		for key, p := range cp {
			for !p.done {
				res := <-p.cp.Receiver
				if res.Err != nil {
					return nil, res.Err
				}
				if res.DP == nil {
					p.done = true
				} else {
					data[key] = res.DP.Data
				}
			}
		}

		out.Duration = dp2.Timestamp + dp2.Duration - out.Timestamp
		out.Data = data
		return out, nil
	}),
}
View Source
var Reduce = &pipescript.Transform{
	Name:        "reduce",
	Description: "Takes a json object, and considers each field to be a separate datapoint's data. It then runs the transform in its argument over the elements",
	Args: []pipescript.TransformArg{
		{
			Description: "The transform to instantiate for each datapoint's values.",
			Type:        pipescript.PipeArgType,
		},
	},
	Documentation: string(resources.MustAsset("docs/transforms/reduce.md")),
	Constructor: pipescript.NewBasic(nil, func(dp *pipescript.Datapoint, args []*pipescript.Datapoint, consts []interface{}, pipes []*pipescript.Pipe, out *pipescript.Datapoint) (*pipescript.Datapoint, error) {
		ai := &arrIterator{
			Timestamp: dp.Timestamp,
			Duration:  dp.Duration,
		}
		switch d := dp.Data.(type) {
		case []interface{}:
			ai.vals = d
		case map[string]interface{}:
			ai.vals = make([]interface{}, 0, len(d))
			for _, v := range d {
				ai.vals = append(ai.vals, v)
			}
		default:
			return nil, errors.New("Can't reduce non-object/array datapoint")
		}
		p := pipes[0].Copy()
		p.InputIterator(ai)

		var data interface{}
		for {
			dd, err := p.Next(out)
			if err != nil {
				return nil, err
			}
			if dd != nil {
				data = dd.Data
			} else {
				break
			}
		}
		out.Timestamp = dp.Timestamp
		out.Duration = dp.Duration
		out.Data = data
		return out, nil
	}),
}
View Source
var Where = &pipescript.Transform{
	Name:          "where",
	Description:   "Filters all datapoints that do not pass the given conditional",
	Documentation: string(resources.MustAsset("docs/transforms/where.md")),
	Args: []pipescript.TransformArg{
		{
			Description: "Statement to check for truth value",
			Type:        pipescript.TransformArgType,
			Schema: map[string]interface{}{
				"type": "boolean",
			},
		},
	},
	Constructor: func(transform *pipescript.Transform, consts []interface{}, pipes []*pipescript.Pipe) (pipescript.TransformIterator, error) {
		return whereIter{make([]*pipescript.Datapoint, 1)}, nil
	},
}
View Source
var While = &pipescript.Transform{
	Name:          "while",
	Description:   "Equivalent to a while loop that runs while the first argument is true. Restarts the loop when the argument is false.",
	Documentation: string(resources.MustAsset("docs/transforms/while.md")),
	Args: []pipescript.TransformArg{
		{
			Description: "The statement to check for truth",
			Type:        pipescript.TransformArgType,
			Schema: map[string]interface{}{
				"type": "boolean",
			},
		},
		{
			Description: "transform to run, and to reset when the first arg is false",
			Type:        pipescript.PipeArgType,
		},
	},
	Constructor: func(transform *pipescript.Transform, consts []interface{}, pipes []*pipescript.Pipe) (pipescript.TransformIterator, error) {
		return &whileIter{args: make([]*pipescript.Datapoint, 1), pipe: pipes[0]}, nil
	},
}

Functions

func Register

func Register()

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL