cluster

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2023 License: Apache-2.0 Imports: 8 Imported by: 13

Documentation

Overview

package cluster exposes synchronization primitives to ensure correct behavior across multiple plugin instances in a Mattermost cluster.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job is a scheduled job whose callback function is executed on a configured interval by at most one plugin instance at a time.

Use scheduled jobs to perform background activity on a regular interval without having to explicitly coordinate with other instances of the same plugin that might repeat that effort.

func Schedule

func Schedule(pluginAPI JobPluginAPI, key string, nextWaitInterval NextWaitInterval, callback func()) (*Job, error)

Schedule creates a scheduled job.

Example
// Use p.API from your plugin instead.
pluginAPI := plugin.API(nil)

callback := func() {
	// periodic work to do
}

job, err := Schedule(pluginAPI, "key", MakeWaitForInterval(5*time.Minute), callback)
if err != nil {
	panic("failed to schedule job")
}

// main thread

defer job.Close()
Output:

func (*Job) Close

func (j *Job) Close() error

Close terminates a scheduled job, preventing it from being scheduled on this plugin instance.

type JobConfig

type JobConfig struct {
	// Interval is the period of execution for the job.
	Interval time.Duration
}

JobConfig defines the configuration of a scheduled job.

type JobMetadata added in v0.0.13

type JobMetadata struct {
	// LastFinished is the last time the job finished anywhere in the cluster.
	LastFinished time.Time
}

JobMetadata persists metadata about job execution.

type JobOnce added in v0.0.13

type JobOnce struct {
	// contains filtered or unexported fields
}

func (*JobOnce) Cancel added in v0.0.13

func (j *JobOnce) Cancel()

Cancel terminates a scheduled job, preventing it from being scheduled on this plugin instance. It also removes the job from the db, preventing it from being run in the future.

type JobOnceMetadata added in v0.0.13

type JobOnceMetadata struct {
	Key   string
	RunAt time.Time
	Props any
}

type JobOnceScheduler added in v0.0.13

type JobOnceScheduler struct {
	// contains filtered or unexported fields
}

func GetJobOnceScheduler added in v0.0.13

func GetJobOnceScheduler(pluginAPI JobPluginAPI) *JobOnceScheduler

GetJobOnceScheduler returns a scheduler which is ready to have its callback set. Repeated calls will return the same scheduler.

func (*JobOnceScheduler) Cancel added in v0.0.13

func (s *JobOnceScheduler) Cancel(key string)

Cancel cancels a job by its key. This is useful if the plugin lost the original *JobOnce, or is stopping a job found in ListScheduledJobs().

func (*JobOnceScheduler) ListScheduledJobs added in v0.0.13

func (s *JobOnceScheduler) ListScheduledJobs() ([]JobOnceMetadata, error)

ListScheduledJobs returns a list of the jobs in the db that have been scheduled. There is no guarantee that list is accurate by the time the caller reads the list. E.g., the jobs in the list may have been run, canceled, or new jobs may have scheduled.

func (*JobOnceScheduler) ScheduleOnce added in v0.0.13

func (s *JobOnceScheduler) ScheduleOnce(key string, runAt time.Time, props any) (*JobOnce, error)

ScheduleOnce creates a scheduled job that will run once. When the clock reaches runAt, the callback will be called with key and props as the argument.

If the job key already exists in the db, this will return an error. To reschedule a job, first cancel the original then schedule it again.

Example
package main

import (
	"log"
	"time"

	"github.com/mattermost/mattermost-server/v6/plugin"
)

func HandleJobOnceCalls(key string, props any) {
	if key == "the key i'm watching for" {
		log.Println(props)
		// Work to do only once per cluster
	}
}

func main() {
	// Use p.API from your plugin instead.
	pluginAPI := plugin.API(nil)

	// Get the scheduler, which you can pass throughout the plugin...
	scheduler := GetJobOnceScheduler(pluginAPI)

	// Set the plugin's callback handler
	_ = scheduler.SetCallback(HandleJobOnceCalls)

	// Now start the scheduler, which starts the poller and schedules all waiting jobs.
	_ = scheduler.Start()

	// main thread...

	// add a job
	_, _ = scheduler.ScheduleOnce("the key i'm watching for", time.Now().Add(2*time.Hour), struct{ foo string }{"aasd"})

	// Maybe you want to check the scheduled jobs, or cancel them. This is completely optional--there
	// is no need to cancel jobs, even if you are shutting down. Call Cancel only when you want to
	// cancel a future job. Cancelling a job will prevent it from running in the future on this or
	// any server.
	jobs, _ := scheduler.ListScheduledJobs()
	defer func() {
		for _, j := range jobs {
			scheduler.Cancel(j.Key)
		}
	}()
}
Output:

func (*JobOnceScheduler) SetCallback added in v0.0.13

func (s *JobOnceScheduler) SetCallback(callback func(string, any)) error

SetCallback sets the scheduler's callback. When a job fires, the callback will be called with the job's id.

func (*JobOnceScheduler) Start added in v0.0.13

func (s *JobOnceScheduler) Start() error

Start starts the Scheduler. It finds all previous ScheduleOnce jobs and starts them running, and fires any jobs that have reached or exceeded their runAt time. Thus, even if a cluster goes down and is restarted, Start will restart previously scheduled jobs.

type JobPluginAPI

type JobPluginAPI interface {
	MutexPluginAPI
	KVGet(key string) ([]byte, *model.AppError)
	KVDelete(key string) *model.AppError
	KVList(page, count int) ([]string, *model.AppError)
}

JobPluginAPI is the plugin API interface required to schedule jobs.

type Mutex

type Mutex struct {
	// contains filtered or unexported fields
}

Mutex is similar to sync.Mutex, except usable by multiple plugin instances across a cluster.

Internally, a mutex relies on an atomic key-value set operation as exposed by the Mattermost plugin API.

Mutexes with different names are unrelated. Mutexes with the same name from different plugins are unrelated. Pick a unique name for each mutex your plugin requires.

A Mutex must not be copied after first use.

Example
package main

import (
	"github.com/mattermost/mattermost-plugin-api/cluster"

	"github.com/mattermost/mattermost-server/v6/plugin"
)

func main() {
	// Use p.API from your plugin instead.
	pluginAPI := plugin.API(nil)

	m, err := cluster.NewMutex(pluginAPI, "key")
	if err != nil {
		panic(err)
	}
	m.Lock()
	// critical section
	m.Unlock()
}
Output:

func NewMutex

func NewMutex(pluginAPI MutexPluginAPI, key string) (*Mutex, error)

NewMutex creates a mutex with the given key name.

Panics if key is empty.

func (*Mutex) Lock

func (m *Mutex) Lock()

Lock locks m. If the mutex is already locked by any plugin instance, including the current one, the calling goroutine blocks until the mutex can be locked.

func (*Mutex) LockWithContext added in v0.0.8

func (m *Mutex) LockWithContext(ctx context.Context) error

LockWithContext locks m unless the context is canceled. If the mutex is already locked by any plugin instance, including the current one, the calling goroutine blocks until the mutex can be locked, or the context is canceled.

The mutex is locked only if a nil error is returned.

func (*Mutex) Unlock

func (m *Mutex) Unlock()

Unlock unlocks m. It is a run-time error if m is not locked on entry to Unlock.

Just like sync.Mutex, a locked Lock is not associated with a particular goroutine or plugin instance. It is allowed for one goroutine or plugin instance to lock a Lock and then arrange for another goroutine or plugin instance to unlock it. In practice, ownership of the lock should remain within a single plugin instance.

type MutexPluginAPI

type MutexPluginAPI interface {
	KVSetWithOptions(key string, value []byte, options model.PluginKVSetOptions) (bool, *model.AppError)
	LogError(msg string, keyValuePairs ...interface{})
}

MutexPluginAPI is the plugin API interface required to manage mutexes.

type NextWaitInterval added in v0.0.8

type NextWaitInterval func(now time.Time, metadata JobMetadata) time.Duration

NextWaitInterval is a callback computing the next wait interval for a job.

func MakeWaitForInterval added in v0.0.8

func MakeWaitForInterval(interval time.Duration) NextWaitInterval

MakeWaitForInterval creates a function to scheduling a job to run on the given interval relative to the last finished timestamp.

For example, if the job first starts at 12:01 PM, and is configured with interval 5 minutes, it will next run at:

12:06, 12:11, 12:16, ...

If the job has not previously started, it will run immediately.

func MakeWaitForRoundedInterval added in v0.0.8

func MakeWaitForRoundedInterval(interval time.Duration) NextWaitInterval

MakeWaitForRoundedInterval creates a function, scheduling a job to run on the nearest rounded interval relative to the last finished timestamp.

For example, if the job first starts at 12:04 PM, and is configured with interval 5 minutes, and is configured to round to 5 minute intervals, it will next run at:

12:05 PM, 12:10 PM, 12:15 PM, ...

If the job has not previously started, it will run immediately. Note that this wait interval strategy does not guarantee a minimum interval between runs, only that subsequent runs will be scheduled on the rounded interval.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL