rivermigrate

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2024 License: MPL-2.0 Imports: 18 Imported by: 5

Documentation

Overview

Package rivermigrate provides a Go API for running migrations as alternative to migrating via the bundled CLI.

Example (Migrate)

Example_migrate demonstrates the use of River's Go migration API by migrating up and down.

package main

import (
	"context"
	"fmt"
	"strings"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river/internal/riverinternaltest"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
	"github.com/riverqueue/river/rivermigrate"
)

// Example_migrate demonstrates the use of River's Go migration API by migrating
// up and down.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	tx, err := dbPool.Begin(ctx)
	if err != nil {
		panic(err)
	}
	defer tx.Rollback(ctx)

	migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil)

	// Our test database starts with a full River schema. Drop it so that we can
	// demonstrate working migrations. This isn't necessary outside this test.
	dropRiverSchema(ctx, migrator, tx)

	printVersions := func(res *rivermigrate.MigrateResult) {
		for _, version := range res.Versions {
			fmt.Printf("Migrated [%s] version %d\n", strings.ToUpper(string(res.Direction)), version.Version)
		}
	}

	// Migrate to version 3. An actual call may want to omit all MigrateOpts,
	// which will default to applying all available up migrations.
	res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
		TargetVersion: 3,
	})
	if err != nil {
		panic(err)
	}
	printVersions(res)

	// Migrate down by three steps. Down migrating defaults to running only one
	// step unless overridden by an option like MaxSteps or TargetVersion.
	res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
		MaxSteps: 3,
	})
	if err != nil {
		panic(err)
	}
	printVersions(res)

}

func dropRiverSchema[TTx any](ctx context.Context, migrator *rivermigrate.Migrator[TTx], tx TTx) {
	_, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
		TargetVersion: -1,
	})
	if err != nil {
		panic(err)
	}
}
Output:

Migrated [UP] version 1
Migrated [UP] version 2
Migrated [UP] version 3
Migrated [DOWN] version 3
Migrated [DOWN] version 2
Migrated [DOWN] version 1
Example (MigrateDatabaseSQL)

Example_migrateDatabaseSQL demonstrates the use of River's Go migration API through Go's built-in database/sql package.

ctx := context.Background()

dbPool, err := sql.Open("pgx", riverinternaltest.DatabaseURL("river_testdb_example"))
if err != nil {
	panic(err)
}
defer dbPool.Close()

tx, err := dbPool.BeginTx(ctx, nil)
if err != nil {
	panic(err)
}
defer tx.Rollback()

migrator := rivermigrate.New(riverdatabasesql.New(dbPool), nil)

// Our test database starts with a full River schema. Drop it so that we can
// demonstrate working migrations. This isn't necessary outside this test.
dropRiverSchema(ctx, migrator, tx)

printVersions := func(res *rivermigrate.MigrateResult) {
	for _, version := range res.Versions {
		fmt.Printf("Migrated [%s] version %d\n", strings.ToUpper(string(res.Direction)), version.Version)
	}
}

// Migrate to version 3. An actual call may want to omit all MigrateOpts,
// which will default to applying all available up migrations.
res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{
	TargetVersion: 3,
})
if err != nil {
	panic(err)
}
printVersions(res)

// Migrate down by three steps. Down migrating defaults to running only one
// step unless overridden by an option like MaxSteps or TargetVersion.
res, err = migrator.MigrateTx(ctx, tx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{
	MaxSteps: 3,
})
if err != nil {
	panic(err)
}
printVersions(res)
Output:

Migrated [UP] version 1
Migrated [UP] version 2
Migrated [UP] version 3
Migrated [DOWN] version 3
Migrated [DOWN] version 2
Migrated [DOWN] version 1

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Logger is the structured logger to use for logging purposes. If none is
	// specified, logs will be emitted to STDOUT with messages at warn level
	// or higher.
	Logger *slog.Logger
}

Config contains configuration for Migrator.

type Direction

type Direction string
const (
	DirectionDown Direction = "down"
	DirectionUp   Direction = "up"
)

type MigrateOpts

type MigrateOpts struct {
	DryRun bool

	// MaxSteps is the maximum number of migrations to apply either up or down.
	// When migrating in the up direction, migrates an unlimited number of steps
	// by default. When migrating in the down direction, migrates only a single
	// step by default (set TargetVersion to -1 to apply unlimited steps down).
	// Set to -1 to apply no migrations (for testing/checking purposes).
	MaxSteps int

	// TargetVersion is a specific migration version to apply migrations to. The
	// version must exist and it must be in the possible list of migrations to
	// apply. e.g. If requesting an up migration with version 3, version 3 must
	// not already be applied.
	//
	// When applying migrations up, migrations are applied including the target
	// version, so when starting at version 0 and requesting version 3, versions
	// 1, 2, and 3 would be applied. When applying migrations down, down
	// migrations are applied excluding the target version, so when starting at
	// version 5 an requesting version 3, down migrations for versions 5 and 4
	// would be applied, leaving the final schema at version 3.
	//
	// When migrating down, TargetVersion can be set to the special value of -1
	// to apply all down migrations (i.e. River schema is removed completely).
	TargetVersion int
}

MigrateOpts are options for a migrate operation.

type MigrateResult

type MigrateResult struct {
	// Direction is the direction that migration occurred (up or down).
	Direction Direction

	// Versions are migration versions that were added (for up migrations) or
	// removed (for down migrations) for this run.
	Versions []MigrateVersion
}

MigrateResult is the result of a migrate operation.

type MigrateVersion

type MigrateVersion struct {
	// Duration is the amount of time it took to apply the migration.
	Duration time.Duration

	// SQL is the SQL that was applied along with the migration.
	SQL string

	// Version is the version of the migration applied.
	Version int
}

MigrateVersion is the result for a single applied migration.

type Migration added in v0.1.0

type Migration struct {
	// SQLDown is the s SQL for the migration's down direction.
	SQLDown string

	// SQLUp is the s SQL for the migration's up direction.
	SQLUp string

	// Version is the integer version number of this migration.
	Version int
}

Migration is a bundled migration containing a version (e.g. 1, 2, 3), and SQL for up and down directions.

type Migrator

type Migrator[TTx any] struct {
	baseservice.BaseService
	// contains filtered or unexported fields
}

Migrator is a database migration tool for River which can run up or down migrations in order to establish the schema that the queue needs to run.

func New

func New[TTx any](driver riverdriver.Driver[TTx], config *Config) *Migrator[TTx]

New returns a new migrator with the given database driver and configuration. The config parameter may be omitted as nil.

Two drivers are supported for migrations, one for Pgx v5 and one for the built-in database/sql package for use with migration frameworks like Goose. See packages riverpgxv5 and riverdatabasesql respectively.

The function takes a generic parameter TTx representing a transaction type, but it can be omitted because it'll generally always be inferred from the driver. For example:

import "github.com/riverqueue/river/riverdriver/riverpgxv5"
import "github.com/riverqueue/rivermigrate"

...

dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
if err != nil {
	// handle error
}
defer dbPool.Close()

migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil)

func (*Migrator[TTx]) AllVersions added in v0.1.0

func (m *Migrator[TTx]) AllVersions() []Migration

AllVersions gets information on all known migration versions.

func (*Migrator[TTx]) GetVersion added in v0.1.0

func (m *Migrator[TTx]) GetVersion(version int) (Migration, error)

GetVersion gets information about a specific migration version. An error is returned if a versions is requested that doesn't exist.

func (*Migrator[TTx]) Migrate

func (m *Migrator[TTx]) Migrate(ctx context.Context, direction Direction, opts *MigrateOpts) (*MigrateResult, error)

Migrate migrates the database in the given direction (up or down). The opts parameter may be omitted for convenience.

By default, applies all outstanding migrations when moving in the up direction, but for safety, only one step when moving in the down direction. To migrate more than one step down, MigrateOpts.MaxSteps or MigrateOpts.TargetVersion are available. Setting MigrateOpts.TargetVersion to -1 will apply every available downstep so that River's schema is removed completely.

res, err := migrator.Migrate(ctx, rivermigrate.DirectionUp, nil)
if err != nil {
	// handle error
}

func (*Migrator[TTx]) MigrateTx

func (m *Migrator[TTx]) MigrateTx(ctx context.Context, tx TTx, direction Direction, opts *MigrateOpts) (*MigrateResult, error)

Migrate migrates the database in the given direction (up or down). The opts parameter may be omitted for convenience.

By default, applies all outstanding migrations when moving in the up direction, but for safety, only one step when moving in the down direction. To migrate more than one step down, MigrateOpts.MaxSteps or MigrateOpts.TargetVersion are available. Setting MigrateOpts.TargetVersion to -1 will apply every available downstep so that River's schema is removed completely.

res, err := migrator.MigrateTx(ctx, tx, rivermigrate.DirectionUp, nil)
if err != nil {
	// handle error
}

This variant lets a caller run migrations within a transaction. Postgres DDL is transactional, so migration changes aren't visible until the transaction commits, and are rolled back if the transaction rolls back.

func (*Migrator[TTx]) Validate added in v0.0.17

func (m *Migrator[TTx]) Validate(ctx context.Context) (*ValidateResult, error)

Validate validates the current state of migrations, returning an unsuccessful validation and usable message in case there are migrations that haven't yet been applied.

func (*Migrator[TTx]) ValidateTx added in v0.0.17

func (m *Migrator[TTx]) ValidateTx(ctx context.Context, tx TTx) (*ValidateResult, error)

Validate validates the current state of migrations, returning an unsuccessful validation and usable message in case there are migrations that haven't yet been applied.

This variant lets a caller validate within a transaction.

type ValidateResult added in v0.0.17

type ValidateResult struct {
	// Messages contain informational messages of what wasn't valid in case of a
	// failed validation. Always empty if OK is true.
	Messages []string

	// OK is true if validation completed with no problems.
	OK bool
}

ValidateResult is the result of a validation operation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL