Documentation ¶
Index ¶
- Constants
- Variables
- func AcquireWorker(jobOp *AppJobOp) error
- func DeleteBrokenReferences(isDeleted func(parent skyhook.ExecParent) bool)
- func DeleteReferencesToDataset(dataset *DBDataset)
- func DeleteReferencesToNode(node *DBExecNode, newOutputs []skyhook.ExecOutput)
- func GetKeyFromFilename(fname string) string
- func GetNodeByGraphID(id skyhook.GraphID) skyhook.Node
- func HandleUpload(w http.ResponseWriter, r *http.Request, ...)
- func ImportDataset(path string, opts ImportOptions) error
- func ImportURL(url string, opts ImportOptions, f func(path string) error) error
- func IncorporateIntoGraph(graph skyhook.ExecutionGraph, subgraph skyhook.ExecutionGraph)
- func InitDB(init bool)
- func ReleaseWorker()
- func RunNode(targetNode *DBExecNode, opts RunNodeOptions) error
- func ToSkyhookInputDatasets(datasets map[string][]*DBDataset) map[string][]skyhook.Dataset
- func ToSkyhookOutputDatasets(datasets map[string]*DBDataset) map[string]skyhook.Dataset
- func UncacheDB(fname string)
- func UnzipThen(fname string, f func(path string) error) error
- type AnnotateDatasetUpdate
- type AnnotateResponse
- type AppJobOp
- func (op *AppJobOp) Cleanup()
- func (op *AppJobOp) Encode() string
- func (op *AppJobOp) IsStopping() bool
- func (op *AppJobOp) ReadFrom(r io.Reader)
- func (op *AppJobOp) SetCleanupFunc(f func())
- func (op *AppJobOp) SetDone(err error)
- func (op *AppJobOp) Stop() error
- func (op *AppJobOp) Update(lines []string)
- type AppJobState
- type ContainerInfo
- type DBAnnotateDataset
- type DBDataset
- func (ds *DBDataset) AddExecRef(nodeID int)
- func (ds *DBDataset) AddItem(item skyhook.Item) (*DBItem, error)
- func (ds *DBDataset) Clear()
- func (ds *DBDataset) Delete()
- func (ds *DBDataset) DeleteExecRef(nodeID int)
- func (ds *DBDataset) Export(outFname string, opts ImportOptions) error
- func (ds *DBDataset) ExportFiles(outFname string, opts ImportOptions) error
- func (ds *DBDataset) GetItem(key string) *DBItem
- func (ds *DBDataset) ImportDir(path string, opts ImportOptions) error
- func (ds *DBDataset) ImportFiles(fnames []string, opts ImportOptions) error
- func (ds *DBDataset) ImportIntoFileDataset(fnames []string, opts ImportOptions) error
- func (ds *DBDataset) ListItems() []*DBItem
- func (ds *DBDataset) SetDone(done bool)
- func (ds *DBDataset) Update(req DatasetUpdate)
- func (ds *DBDataset) WriteItem(key string, data interface{}, metadata skyhook.DataMetadata) (*DBItem, error)
- type DBExecNode
- func (node *DBExecNode) DatasetRefs() []int
- func (node *DBExecNode) Delete()
- func (node *DBExecNode) GetComputedKeys() map[string]bool
- func (node *DBExecNode) GetDatasets(create bool) (map[string]*DBDataset, bool)
- func (node *DBExecNode) GetGraph() skyhook.ExecutionGraph
- func (node *DBExecNode) GetGraphID() skyhook.GraphID
- func (node *DBExecNode) GetVirtualDatasets(vnode *skyhook.VirtualNode) map[string]*DBDataset
- func (node *DBExecNode) Hash() string
- func (node *DBExecNode) Incremental(opts IncrementalOptions) error
- func (node *DBExecNode) IsDone() bool
- func (node *DBExecNode) PrepareRun(opts ExecRunOptions) (*RunData, error)
- func (node *DBExecNode) Update(req ExecNodeUpdate)
- type DBItem
- type DBJob
- type DBPytorchArch
- type DBPytorchComponent
- type DBWorkspace
- type Database
- type DatasetUpdate
- type ExecNodeUpdate
- type ExecRunOptions
- type ImportOptions
- type IncrementalOptions
- type MultiExecJobOp
- func (op *MultiExecJobOp) ChangeJob(job skyhook.Job)
- func (op *MultiExecJobOp) ChangePlan(plan []*skyhook.VirtualNode, planIndex int)
- func (op *MultiExecJobOp) Encode() string
- func (op *MultiExecJobOp) SetPlanFromGraph(graph skyhook.ExecutionGraph, ready map[skyhook.GraphID]map[string]*DBDataset, ...)
- func (op *MultiExecJobOp) SetPlanFromMap(nodes map[int]*DBExecNode, done map[int]bool, curID int)
- func (op *MultiExecJobOp) Stop() error
- func (op *MultiExecJobOp) Update(lines []string)
- type MultiExecJobState
- type ProgressJobOp
- type PytorchArchUpdate
- type PytorchComponentUpdate
- type Result
- type Row
- type Rows
- type RunData
- type RunNodeOptions
- type Tx
Constants ¶
const AnnotateDatasetQuery = "" /* 149-byte string literal not displayed */
const DatasetQuery = "SELECT id, name, type, data_type, metadata, hash, done FROM datasets"
const DbDebug bool = false
const ExecNodeQuery = "SELECT id, name, op, params, parents, workspace FROM exec_nodes"
const ItemQuery = "SELECT k, ext, format, metadata, provider, provider_info FROM items"
const JobQuery = "SELECT id, name, type, op, metadata, start_time, done, error FROM jobs"
const PytorchArchQuery = "SELECT id, params FROM pytorch_archs"
const PytorchComponentQuery = "SELECT id, params FROM pytorch_components"
Variables ¶
var Config struct { // URL where main program can be reached. // This is used when telling workers which coordinator a request came from, // so that we can get back responses or serve any API calls worker needs to make. CoordinatorURL string // URL where the worker is, which runs as a separate program. // Can also point to a worker pool. WorkerURL string // Optional instance ID. // If set, the worker should launch container in a subdirectory with this name. InstanceID string }
Global config object, set by main.go
var Router = mux.NewRouter()
var SetupFuncs []func(*socketio.Server)
Functions ¶
func AcquireWorker ¶
Acquire worker and return nil. Or returns error if interrupted (i.e. job terminated by user).
func DeleteBrokenReferences ¶
func DeleteBrokenReferences(isDeleted func(parent skyhook.ExecParent) bool)
Delete ExecParent references that match with an isDeleted function.
func DeleteReferencesToDataset ¶
func DeleteReferencesToDataset(dataset *DBDataset)
Delete broken ExecParent references to a dataset that has been deleted.
func DeleteReferencesToNode ¶
func DeleteReferencesToNode(node *DBExecNode, newOutputs []skyhook.ExecOutput)
Delete broken ExecParent references to a node when the node is deleted or its outputs have changed.
func GetKeyFromFilename ¶
func GetNodeByGraphID ¶
Retrieves the Node (VirtualNode or Dataset) based on GraphID.
func HandleUpload ¶
func HandleUpload(w http.ResponseWriter, r *http.Request, f func(fname string, cleanupFunc func()) error)
handle parts of standard upload where we save to a temporary file with same extension as uploaded file
func ImportDataset ¶
func ImportDataset(path string, opts ImportOptions) error
Import a local path that contains a Skyhook dataset. If symlink is true, we will copy the db.sqlite3 but symlink all the other
files in the dataset. We do not symlink the whole directory currently because we don't want to modify it.
If symlink is false, we just copy all the files.
func ImportURL ¶
func ImportURL(url string, opts ImportOptions, f func(path string) error) error
Import from a URL. Calls handler function after URL is downloaded and unzipped. Updates opts with progress.
func IncorporateIntoGraph ¶
func IncorporateIntoGraph(graph skyhook.ExecutionGraph, subgraph skyhook.ExecutionGraph)
Incorporate a subgraph (graph of new nodes) into an existing execution graph. The subgraph may present new dependencies that don't exist yet in the graph, so we need to search those.
func InitDB ¶
func InitDB(init bool)
Initialize the database on startup with cleanup operations. If init is true, we also first initialize the schema and populate certain tables.
func ReleaseWorker ¶
func ReleaseWorker()
func RunNode ¶
func RunNode(targetNode *DBExecNode, opts RunNodeOptions) error
func ToSkyhookInputDatasets ¶
func ToSkyhookOutputDatasets ¶
Types ¶
type AnnotateDatasetUpdate ¶
type AnnotateResponse ¶
type AnnotateResponse struct { // The key that we're labeling. // May be an existing key in the destination dataset, or a new key. Key string IsExisting bool }
info needed to annotate one item, which may or may not be present in the destination dataset
type AppJobOp ¶
type AppJobOp struct { Job *DBJob TailOp *skyhook.TailJobOp WrappedJobOps map[string]skyhook.JobOp LastWrappedDatas map[string]string // stopping support Stopping bool Stopped bool // function to call when stopping a job // we also call this on SetDone CleanupFunc func() // contains filtered or unexported fields }
A JobOp that wraps a TailOp for console, plus arbitrary number of other JobOps. It also provides functionality for stopping via mutex/condition.
func (*AppJobOp) IsStopping ¶
func (*AppJobOp) SetCleanupFunc ¶
func (op *AppJobOp) SetCleanupFunc(f func())
type AppJobState ¶
type ContainerInfo ¶
Allocate a container on the worker. Caller is responsible for acquiring worker.
func AcquireContainer ¶
func AcquireContainer(node skyhook.Runnable, jobOp *AppJobOp) (ContainerInfo, error)
type DBAnnotateDataset ¶
type DBAnnotateDataset struct { skyhook.AnnotateDataset InputDatasets []skyhook.Dataset // contains filtered or unexported fields }
func GetAnnotateDataset ¶
func GetAnnotateDataset(id int) *DBAnnotateDataset
func ListAnnotateDatasets ¶
func ListAnnotateDatasets() []*DBAnnotateDataset
func NewAnnotateDataset ¶
func NewAnnotateDataset(dataset skyhook.Dataset, inputs []skyhook.ExecParent, tool string, params string) (*DBAnnotateDataset, error)
func (*DBAnnotateDataset) Delete ¶
func (s *DBAnnotateDataset) Delete()
func (*DBAnnotateDataset) Load ¶
func (s *DBAnnotateDataset) Load()
func (*DBAnnotateDataset) SampleMissingKey ¶
func (s *DBAnnotateDataset) SampleMissingKey() string
samples a key that is present in all input datasets but not yet labeled in this annotate dataset TODO: have sampler object so that hash tables can be stored in memory instead of loaded from db each time
func (*DBAnnotateDataset) Update ¶
func (s *DBAnnotateDataset) Update(req AnnotateDatasetUpdate)
type DBDataset ¶
func ExecParentToDataset ¶
func ExecParentToDataset(parent skyhook.ExecParent) (*DBDataset, error)
Resolves an ExecParent to a dataset. If the dataset is unavailable, returns an error.
func FindDataset ¶
func GetDataset ¶
func ListDatasets ¶
func ListDatasets() []*DBDataset
func NewDataset ¶
func (*DBDataset) AddExecRef ¶
func (*DBDataset) DeleteExecRef ¶
func (*DBDataset) Export ¶
func (ds *DBDataset) Export(outFname string, opts ImportOptions) error
Export a dataset into the Skyhook .zip format. Returns the zip filename or error.
func (*DBDataset) ExportFiles ¶
func (ds *DBDataset) ExportFiles(outFname string, opts ImportOptions) error
Export files in a file dataset. Unlike Export, this produces a .zip file where items in the dataset are named based on their filenames specified in FileMetadata.
func (*DBDataset) ImportFiles ¶
func (ds *DBDataset) ImportFiles(fnames []string, opts ImportOptions) error
func (*DBDataset) ImportIntoFileDataset ¶
func (ds *DBDataset) ImportIntoFileDataset(fnames []string, opts ImportOptions) error
specialized function for importing files if the dataset is File type in this case, the metadata specifies the original filename also in this case we want to recursively scan since filenames in subdirectories should be imported too
func (*DBDataset) Update ¶
func (ds *DBDataset) Update(req DatasetUpdate)
type DBExecNode ¶
func GetExecNode ¶
func GetExecNode(id int) *DBExecNode
func ListExecNodes ¶
func ListExecNodes() []*DBExecNode
func NewExecNode ¶
func NewExecNode(name string, op string, params string, parents map[string][]skyhook.ExecParent, workspace string) *DBExecNode
func (*DBExecNode) DatasetRefs ¶
func (node *DBExecNode) DatasetRefs() []int
func (*DBExecNode) Delete ¶
func (node *DBExecNode) Delete()
func (*DBExecNode) GetComputedKeys ¶
func (node *DBExecNode) GetComputedKeys() map[string]bool
Helper function to compute the keys already computed at a node. This only works for incremental nodes, which must produce the same keys across all output datasets.
func (*DBExecNode) GetDatasets ¶
func (node *DBExecNode) GetDatasets(create bool) (map[string]*DBDataset, bool)
Get datasets for each output of this node. If create=true, creates new datasets to cover missing ones. Also returns bool, which is true if all datasets exist.
func (*DBExecNode) GetGraph ¶
func (node *DBExecNode) GetGraph() skyhook.ExecutionGraph
Build the execution graph rooted at this node. Execution graph maps from GraphID to Node.
func (*DBExecNode) GetGraphID ¶
func (node *DBExecNode) GetGraphID() skyhook.GraphID
func (*DBExecNode) GetVirtualDatasets ¶
func (node *DBExecNode) GetVirtualDatasets(vnode *skyhook.VirtualNode) map[string]*DBDataset
Get dataset for a virtual node that comes from this node. If the datasets don't exist already, we create them.
func (*DBExecNode) Hash ¶
func (node *DBExecNode) Hash() string
func (*DBExecNode) Incremental ¶
func (node *DBExecNode) Incremental(opts IncrementalOptions) error
func (*DBExecNode) IsDone ¶
func (node *DBExecNode) IsDone() bool
Returns true if all the output datasets are done.
func (*DBExecNode) PrepareRun ¶
func (node *DBExecNode) PrepareRun(opts ExecRunOptions) (*RunData, error)
Prepare to run this node. Returns a RunData. Or error on error. Or nil RunData and error if the node is already done.
func (*DBExecNode) Update ¶
func (node *DBExecNode) Update(req ExecNodeUpdate)
type DBItem ¶
func (*DBItem) SetMetadata ¶
func (item *DBItem) SetMetadata(format string, metadata skyhook.DataMetadata)
func (*DBItem) SetMetadataFromFile ¶
Set metadata based on the file.
type DBPytorchArch ¶
type DBPytorchArch struct{ skyhook.PytorchArch }
func GetPytorchArch ¶
func GetPytorchArch(id string) *DBPytorchArch
func GetPytorchArchByName ¶
func GetPytorchArchByName(id string) *DBPytorchArch
func ListPytorchArchs ¶
func ListPytorchArchs() []*DBPytorchArch
func NewPytorchArch ¶
func NewPytorchArch(id string) *DBPytorchArch
func (*DBPytorchArch) Delete ¶
func (arch *DBPytorchArch) Delete()
func (*DBPytorchArch) Update ¶
func (arch *DBPytorchArch) Update(req PytorchArchUpdate)
type DBPytorchComponent ¶
type DBPytorchComponent struct{ skyhook.PytorchComponent }
func GetPytorchComponent ¶
func GetPytorchComponent(id string) *DBPytorchComponent
func ListPytorchComponents ¶
func ListPytorchComponents() []*DBPytorchComponent
func NewPytorchComponent ¶
func NewPytorchComponent(id string) *DBPytorchComponent
func (*DBPytorchComponent) Delete ¶
func (c *DBPytorchComponent) Delete()
func (*DBPytorchComponent) Update ¶
func (c *DBPytorchComponent) Update(req PytorchComponentUpdate)
type DBWorkspace ¶
type DBWorkspace string
func GetWorkspace ¶
func GetWorkspace(wsName string) *DBWorkspace
func (DBWorkspace) Delete ¶
func (ws DBWorkspace) Delete()
func (DBWorkspace) ListExecNodes ¶
func (ws DBWorkspace) ListExecNodes() []*DBExecNode
type Database ¶
type Database struct {
// contains filtered or unexported fields
}
func GetCachedDB ¶
Get a cached database connection to the specified sqlite3 file. If initFunc is set, we create the sqlite3 if it doesn't already exist, and
call initFunc each time a new connection is opened.
Otherwise, we do not create new files, and instead return nil.
func (*Database) Transaction ¶
type DatasetUpdate ¶
type ExecNodeUpdate ¶
type ExecRunOptions ¶
type ExecRunOptions struct { // If force, we run even if outputs were already available. Force bool // Whether to try incremental execution at this node. // If false, we throw error if parent datasets are not done. Incremental bool // If set, limit execution to these keys. // Only supported by incremental ops. LimitOutputKeys map[string]bool }
type ImportOptions ¶
type ImportOptions struct { Symlink bool // import will call Update and IsStopping on this AppJobOp if set AppJobOp *AppJobOp // import will call Increment if set ProgressJobOp *ProgressJobOp }
func (ImportOptions) CompletedTask ¶
func (opts ImportOptions) CompletedTask(line string, increment int) bool
increment the ProgressJobOp, write a line to AppJobOp, and check IsStopping
func (ImportOptions) SetTasks ¶
func (opts ImportOptions) SetTasks(total int)
type IncrementalOptions ¶
type IncrementalOptions struct { // Number of random outputs to compute at this node. // Only one of Count or Keys should be specified. Count int // Compute outputs matching these keys. Keys []string // MultiExecJob to update during incremental execution. // For non-incremental ancestors, we pass this JobOp to RunNode. JobOp *MultiExecJobOp }
Get some number of incremental outputs from this node.
type MultiExecJobOp ¶
type MultiExecJobOp struct { Job *DBJob // current wrapped job (current ExecJob) CurJob *skyhook.Job // current execution plan // the field can change but the slice itself must not Plan []*skyhook.VirtualNode // which index in the plan are we executing next (or right now)? PlanIndex int // contains filtered or unexported fields }
A JobOp for running multiple ExecNodes.
func (*MultiExecJobOp) ChangeJob ¶
func (op *MultiExecJobOp) ChangeJob(job skyhook.Job)
func (*MultiExecJobOp) ChangePlan ¶
func (op *MultiExecJobOp) ChangePlan(plan []*skyhook.VirtualNode, planIndex int)
Set the plan. The plan must be immutable.
func (*MultiExecJobOp) Encode ¶
func (op *MultiExecJobOp) Encode() string
func (*MultiExecJobOp) SetPlanFromGraph ¶
func (op *MultiExecJobOp) SetPlanFromGraph(graph skyhook.ExecutionGraph, ready map[skyhook.GraphID]map[string]*DBDataset, needed map[skyhook.GraphID]skyhook.Node, cur *skyhook.VirtualNode)
Get a []*skyhook.VirtualNode plan based on current execution graph and related state.
func (*MultiExecJobOp) SetPlanFromMap ¶
func (op *MultiExecJobOp) SetPlanFromMap(nodes map[int]*DBExecNode, done map[int]bool, curID int)
Used in DBExecNode.Incremental.
func (*MultiExecJobOp) Stop ¶
func (op *MultiExecJobOp) Stop() error
func (*MultiExecJobOp) Update ¶
func (op *MultiExecJobOp) Update(lines []string)
type MultiExecJobState ¶
type MultiExecJobState struct { CurJob *skyhook.Job Plan []*skyhook.VirtualNode PlanIndex int }
type ProgressJobOp ¶
func (*ProgressJobOp) Encode ¶
func (op *ProgressJobOp) Encode() string
func (*ProgressJobOp) Increment ¶
func (op *ProgressJobOp) Increment()
func (*ProgressJobOp) SetProgressPercent ¶
func (op *ProgressJobOp) SetProgressPercent(percent int) bool
Set the progress to the specified percentage. Returns true if the percent was updated.
func (*ProgressJobOp) SetTotal ¶
func (op *ProgressJobOp) SetTotal(total int)
func (*ProgressJobOp) Stop ¶
func (op *ProgressJobOp) Stop() error
func (*ProgressJobOp) Update ¶
func (op *ProgressJobOp) Update(lines []string)
type PytorchArchUpdate ¶
type PytorchArchUpdate struct {
Params *skyhook.PytorchArchParams
}
type PytorchComponentUpdate ¶
type PytorchComponentUpdate struct {
Params *skyhook.PytorchComponentParams
}
type Result ¶
type Result struct {
// contains filtered or unexported fields
}
func (Result) LastInsertId ¶
func (Result) RowsAffected ¶
type RunData ¶
type RunData struct { Name string Node skyhook.Runnable Tasks []skyhook.ExecTask // whether we'll be done with the node after running Tasks // i.e., whether Tasks contains all pending tasks at this node WillBeDone bool // job-related things to update JobOp *AppJobOp ProgressJobOp *ProgressJobOp // Saved error if any Error error }
A RunData provides a Run function that executes a Runnable over the specified tasks.
type RunNodeOptions ¶
type RunNodeOptions struct { // If force, we run even if outputs were already available. Force bool // If NoRunTree, we do not run if the parents of targetNode are not available. NoRunTree bool // MultiExecJobOp to update with jobs for each ExecNode run. JobOp *MultiExecJobOp }
Run the specified node, while running ancestors first if needed.