scramjet

package module
v0.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2021 License: MIT Imports: 15 Imported by: 0

README

Vivo Scramjet

module github.com/OIT-ADS-Web/scramjet

A persistent cache of arbitrary json objects associated with an id, that can be validated against a service and, when valid, examined for changes.

This makes it possible to gather some entities for ingest into a store of some sort - and be able to send only adds, updates or deletes to that store.

** NOTE ** this is in very early development and likely to change substantially. So for the time being, it is here for instructional purposes only, I would not recommend using it with any projects.

as library (API)

  • Staging Table

import (
	sj "github.com/OIT-ADS-Web/scramjet"
)
...

	typeName := "person"
  // 1) add data
	person1 := TestPerson{Id: "per0000001", Name: "Test1"}
	person2 := TestPerson{Id: "per0000002", Name: "Test2"}
	// must use anything of interface 'Storeable'
	pass1 := sj.Packet{Id: sj.Identifier{Id: person1.Id, Type: typeName}, Obj: person1}
	pass2 := sj.Packet{Id: sj.Identifier{Id: person2.Id, Type: typeName}, Obj: person2}

	people := []sj.Storeable{pass1, pass2}
	err := sj.BulkAddStaging(people...)

  // 2) run through a 'validator' function - would likely
  //    be a json schema validator
	alwaysOkay := func(json string) bool { return true }
	valid, rejects, err := sj.FilterTypeStaging("person", alwaysOkay)

  err = sj.BatchMarkValidInStaging(valid)
  err = sj.BatchMarkInValidInStaging(rejects)

  // 3) Now the valid ones are marked and ready to go into
  //    'resource' table
    ...


    
  • Resources Table

import (
	sj "github.com/OIT-ADS-Web/scramjet"
)

...

	typeName := "person"
	list, err := sj.RetrieveValidStaging(typeName)
	err = sj.BulkMoveStagingTypeToResources(typeName, list...)

    ...

as executable (CLI)

Have not done anything with this so far

General Idea

two tables, staging and resources

  • staging: [id+type=uid]

    actions:

    • stash ->
    • validate ->
    • stash and validate ->
  • resources: [uri(type)=uid]

    actions:

    • move over valid (could be updates)
    • get actual updates (only)

Operations

Moving entire 'type' as bulk


  import (
	  sj "github.com/OIT-ADS-Web/scramjet"
  )

	typeName := "person"
  // see above - gather data however it can be gathered
	//err := sj.BulkAddStaging(people...)
  err := sj.StashStaging(people...)
  // own validator function ...
	alwaysOkay := func(json string) bool { return true }
	valid, rejects, err := sj.FilterTypeStaging("person", alwaysOkay)

  err = sj.BatchMarkValidInStaging(valid)
  err = sj.BatchMarkInValidInStaging(rejects)
  
  list, err := sj.RetrieveValidStaging(typeName)
	err = sj.BulkMoveStagingTypeToResources(typeName, list...)


Moving by id (single items)


  import (
	  sj "github.com/OIT-ADS-Web/scramjet"
  )

	typeName := "person"
  // see above - grab single record however necessary
  // and stash in staging table
	err := sj.StashStaging(people...)
  // just need basic 'id' to grab to validate
  identifier := sj.Identifier{Id: id, Type: typeName}
	stub := sj.Stub{Id: identifier}
  // validate however you want
	alwaysOkay := func(json string) bool { return true }
	err = sj.ProcessSingleStaging(stub, alwaysOkay)

  // move it over
  staging, err := sj.RetrieveSingleStagingValid(id, typeName)
	err = sj.BulkMoveStagingTypeToResources(typeName, staging)

Moving by query (for instance per person)


  import (
	  sj "github.com/OIT-ADS-Web/scramjet"
  )

	typeName := "person"
  // see above - get records of person and stash in staging table
	err := sj.StashStaging(people...)
  // make a validator
	alwaysOkay := func(json string) bool { return true }
	// make a filter - fairly crude on field matcher at this point
  filter := sj.Filter{Field: "externalId", Value: "x200", Compare: sj.Eq}
	// 2. but only get one out
	valid, rejects, err = sj.FilterTypeStagingByQuery(typeName, filter, alwaysOkay)
	err = sj.BatchMarkValidInStaging(valid)
	// move over to resources, based on same filter
  list2, err := sj.RetrieveValidStagingFiltered(typeName, filter)
	err = sj.BulkMoveStagingTypeToResources(typeName, list2...)


More configurable intake

For more advanced use - intake can be run as a series of chunks of input, given a listmaker etc... see ChunkableIntakeConfig

# Basic structure
![image of basic structure](docs/ScramjetBasic.png "A diagram of basic ideas")


# Tables
![image of tables](docs/ScramjetTables.png "A diagram of table structure")

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DBPool *pgxpool.Pool
View Source
var Name string

Functions

func AddStagingResource

func AddStagingResource(obj interface{}, id string, typeName string) error

only add (presumed existence already checked)

func BatchDeleteResourcesFromResources

func BatchDeleteResourcesFromResources(resources ...Identifiable) error

func BatchDeleteStagingFromResources

func BatchDeleteStagingFromResources(resources ...Identifiable) error

func BatchMarkInvalidInStaging

func BatchMarkInvalidInStaging(resources []Identifiable) error

func BatchMarkValidInStaging

func BatchMarkValidInStaging(resources []Identifiable) error

func BulkAddStaging

func BulkAddStaging(items ...Storeable) error

func BulkAddStagingForDelete

func BulkAddStagingForDelete(items ...Identifiable) error

func BulkAddStagingResources

func BulkAddStagingResources(resources ...StagingResource) error

func BulkMoveStagingToResourcesByFilter added in v0.0.8

func BulkMoveStagingToResourcesByFilter(typeName string, filter Filter, items ...StagingResource) error

NOTE: still need typname to clear from staging

func BulkMoveStagingTypeToResources

func BulkMoveStagingTypeToResources(typeName string, items ...StagingResource) error

NOTE: only need 'typeName' param for clearing out from staging

func BulkRemoveResources

func BulkRemoveResources(items ...Identifiable) error

func BulkRemoveStagingDeletedFromResources

func BulkRemoveStagingDeletedFromResources(typeName string) error

func ClearAllResources

func ClearAllResources() error

func ClearAllStaging

func ClearAllStaging() error

func ClearDeletedFromStaging

func ClearDeletedFromStaging(id string, typeName string) error

func ClearFilteredDeletedFromStaging added in v0.0.13

func ClearFilteredDeletedFromStaging(filter Filter, typeName string) error

func ClearResourceType

func ClearResourceType(typeName string) error

func ClearStagingType

func ClearStagingType(typeName string) error

call where valid = true? (after transfering to resources)

func ClearStagingTypeDeletes

func ClearStagingTypeDeletes(typeName string) error

func ClearStagingTypeValid

func ClearStagingTypeValid(typeName string) error

leave the is_valid = false for investigation

func ClearStagingTypeValidByFilter added in v0.0.8

func ClearStagingTypeValidByFilter(typeName string, filter Filter) error

func DeleteFromStaging

func DeleteFromStaging(res StagingResource) error

func DropResources

func DropResources() error

func DropStaging

func DropStaging() error

func FilterTypeStaging

func FilterTypeStaging(typeName string, validator ValidatorFunc) ([]Identifiable, []Identifiable, error)

func FilterTypeStagingByQuery added in v0.0.6

func FilterTypeStagingByQuery(typeName string,
	filter Filter, validator ValidatorFunc) ([]Identifiable, []Identifiable, error)

NOTE: this needs a 'typeName' param because it assumes validator is different per type

func FlagDeletes added in v0.0.9

func FlagDeletes(sourceDataIds []string, existingData []Resource, config DiffProcessConfig) error

func GetDbName

func GetDbName() string

func GetMaxUpdatedAt

func GetMaxUpdatedAt(typeName string) time.Time

func GetPool

func GetPool() *pgxpool.Pool

func IntakeInChunks

func IntakeInChunks(ins ChunkableIntakeConfig) error

func MakeConnectionPool

func MakeConnectionPool(conf Config) error

NOTE: Prepared statements can be manually created with the Prepare method. However, this is rarely necessary because pgx includes an automatic statement cache by default

func MakeResourceSchema

func MakeResourceSchema()

NOTE: this calls Fatalf with errors

func MakeStagingSchema

func MakeStagingSchema()

func MarkInvalidInStaging

func MarkInvalidInStaging(res Storeable) error

TODO: should probably batch these when validating and mark valid, invalid in groups of 500 or something

func MarkValidInStaging

func MarkValidInStaging(res StagingResource) error

func ProcessDiff added in v0.0.9

func ProcessDiff(config DiffProcessConfig) error

func ProcessOutake

func ProcessOutake(config OutakeProcessConfig) error

func ProcessSingleStaging

func ProcessSingleStaging(item Identifiable, validator ValidatorFunc) error

func ProcessTypeStaging

func ProcessTypeStaging(typeName string, validator ValidatorFunc) error

func ProcessTypeStagingFiltered added in v0.0.6

func ProcessTypeStagingFiltered(typeName string, filter Filter, validator ValidatorFunc) error

TODO: no test for this so far

func RemoveStagingDeletedFromResources

func RemoveStagingDeletedFromResources(id string, typeName string) error

func RemoveStagingDeletedFromResourcesFiltered added in v0.0.13

func RemoveStagingDeletedFromResourcesFiltered(filter Filter, typeName string) error

func ResourceCount

func ResourceCount(typeName string) int

func ResourceTableExists

func ResourceTableExists() bool

TODO: the 'table_catalog' changes

func SaveResource

func SaveResource(obj Storeable) error

only does one at a time (not typically used)

func SaveStagingResource

func SaveStagingResource(obj Storeable) error

is there a need for this function?

func SaveStagingResourceDirect

func SaveStagingResourceDirect(res StagingResource, typeName string) error

func StagingCount added in v0.0.8

func StagingCount() int

just for verification

func StagingDeleteCount

func StagingDeleteCount(typeName string) int

NOTE: only used in test - for verification

func StagingResourceExists

func StagingResourceExists(id string, typeName string) bool

returns false if error - maybe should not

func StagingTableExists

func StagingTableExists() bool

NOTE: could call Fatalf

func StashStaging

func StashStaging(docs ...Storeable) error

Types

type ChunkableIntakeConfig

type ChunkableIntakeConfig struct {
	Count     int
	ChunkSize int
	JustTest  bool
	TypeName  string
	ListMaker IntakeListMaker
	Progress  ProgressChecker
	Inspector JustTestingInspector
}

type CompareOpt added in v0.0.6

type CompareOpt string
const (
	Eq  CompareOpt = "="
	Gt  CompareOpt = ">"
	Lt  CompareOpt = "<"
	Gte CompareOpt = ">="
	Lte CompareOpt = "<="
	In  CompareOpt = "IN"
)

type Config

type Config struct {
	Database DatabaseInfo
}

more flexible?

type DatabaseInfo

type DatabaseInfo struct {
	Server         string
	Port           int
	Database       string
	User           string
	Password       string
	MaxConnections int
	AcquireTimeout int
}

type DeleteChecker

type DeleteChecker func([]string)

type DiffProcessConfig

type DiffProcessConfig struct {
	TypeName          string
	ExistingListMaker ExistingListMaker
	ListMaker         OutakeListMaker
	JustTest          bool
	Checker           DeleteChecker
	Inspector         JustTestingInspector
}

to look for diffs for duid (for instance) both lists have to be sent in

type ExistingListMaker

type ExistingListMaker func() ([]Resource, error)

type Filter added in v0.0.6

type Filter struct {
	Field     string
	Value     string
	Compare   CompareOpt
	SubFilter *SubFilter
}

type Identifiable

type Identifiable interface {
	Identifier() Identifier
}

func RetrieveDeletedStaging

func RetrieveDeletedStaging(typeName string) ([]Identifiable, error)

type Identifier

type Identifier struct {
	Id, Type string
}

type IntakeListMaker

type IntakeListMaker func(int) ([]Storeable, error)

type JustTestingInspector

type JustTestingInspector func(...interface{})

type OutakeListMaker

type OutakeListMaker func() ([]string, error)

maybe interface instead of func type in struct?

type OutakeProcessConfig

type OutakeProcessConfig struct {
	TypeName  string
	ListMaker OutakeListMaker
	JustTest  bool
	Checker   DeleteChecker
	Inspector JustTestingInspector
}

NOTE: this is mostly the same as DiffProcessConfig

type Packet

type Packet struct {
	Id  Identifier
	Obj interface{} // this will be serialized
}

func (Packet) Identifier

func (p Packet) Identifier() Identifier

func (Packet) Object

func (ps Packet) Object() interface{}

type ProgressChecker

type ProgressChecker func(int)

type Resource

type Resource struct {
	Id        string       `db:"id"`
	Type      string       `db:"type"`
	Hash      string       `db:"hash"`
	Data      pgtype.JSON  `db:"data"`
	DataB     pgtype.JSONB `db:"data_b"`
	CreatedAt time.Time    `db:"created_at"`
	UpdatedAt time.Time    `db:"updated_at"`
}

this is the raw structure in the database two json columms: * 'data' can be used for change comparison with hash * 'data_b' can be used for searches

func RetrieveSingleResource added in v0.0.3

func RetrieveSingleResource(id string, typeName string) (Resource, error)

func RetrieveTypeResources

func RetrieveTypeResources(typeName string) ([]Resource, error)

func RetrieveTypeResourcesByQuery added in v0.0.6

func RetrieveTypeResourcesByQuery(typeName string, filter Filter) ([]Resource, error)

func RetrieveTypeResourcesLimited

func RetrieveTypeResourcesLimited(typeName string, limit int) ([]Resource, error)

func ScanResources added in v0.0.6

func ScanResources(rows pgx.Rows) ([]Resource, error)

TODO: could just send in date - leave it up to library user to determine how it's figured out

func (Resource) Identifier

func (res Resource) Identifier() Identifier

type ResourceListMaker added in v0.0.9

type ResourceListMaker func() ([]Resource, error)

type StagingResource

type StagingResource struct {
	Id       string       `db:"id"`
	Type     string       `db:"type"`
	Data     []byte       `db:"data"`
	IsValid  sql.NullBool `db:"is_valid"`
	ToDelete sql.NullBool `db:"to_delete"`
}

NOTE: just making json []byte instead of pgtype.JSON

func RetrieveAllStaging added in v0.0.8

func RetrieveAllStaging() ([]StagingResource, error)

just in case we need to look at all records there

func RetrieveFilteredStagingDelete added in v0.0.13

func RetrieveFilteredStagingDelete(filter Filter, typeName string) ([]StagingResource, error)

func RetrieveInvalidStaging

func RetrieveInvalidStaging(typeName string) ([]StagingResource, error)

func RetrieveSingleStaging

func RetrieveSingleStaging(id string, typeName string) (StagingResource, error)

func RetrieveSingleStagingDelete

func RetrieveSingleStagingDelete(id string, typeName string) (StagingResource, error)

func RetrieveSingleStagingValid

func RetrieveSingleStagingValid(id string, typeName string) (StagingResource, error)

func RetrieveTypeStaging

func RetrieveTypeStaging(typeName string) ([]StagingResource, error)

func RetrieveTypeStagingFiltered added in v0.0.6

func RetrieveTypeStagingFiltered(typeName string, filter Filter) ([]StagingResource, error)

func RetrieveValidStaging

func RetrieveValidStaging(typeName string) ([]StagingResource, error)

func RetrieveValidStagingFiltered added in v0.0.6

func RetrieveValidStagingFiltered(typeName string, filter Filter) ([]StagingResource, error)

func ScanStaging added in v0.0.6

func ScanStaging(rows pgx.Rows) ([]StagingResource, error)

func (StagingResource) Identifier

func (res StagingResource) Identifier() Identifier

kind of like dual primary key

type Storeable

type Storeable interface {
	Identifier() Identifier
	Object() interface{}
}

type Stub

type Stub struct {
	Id Identifier
}

func (Stub) Identifier

func (s Stub) Identifier() Identifier

type SubFilter added in v0.0.7

type SubFilter struct {
	Typename    string
	MatchField  string
	Value       string
	ParentMatch string /* could be different than 'Field' */
}

type ValidatorFunc

type ValidatorFunc func(json string) bool

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL