tasks

package
v0.25.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2024 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const MaxQueueRetries = 5
View Source
const MaxTokensFallback = 2048
View Source
const NerRetryMax = 3
View Source
const NerTimeout = 10 * time.Second
View Source
const SQLSubscriberPollInterval = 500 * time.Millisecond
View Source
const SummaryMaxOutputTokens = 1024
View Source
const TaskCountThrottle = 250 // messages per second
View Source
const TaskTimeout = 60 // seconds

Variables

View Source
var IntentStringRegex = regexp.MustCompile(`(?i)^\s*intent\W+\s+`)

Functions

func Initialize

func Initialize(ctx context.Context, appState *models.AppState, router models.TaskRouter)

func NewMessageNERTask

func NewMessageNERTask(appState *models.AppState) models.Task

func NewMessageSummaryNERTask

func NewMessageSummaryNERTask(appState *models.AppState) models.Task

func NewRetryableHTTPClient added in v0.19.0

func NewRetryableHTTPClient(retryMax int, timeout time.Duration) *http.Client

NewRetryableHTTPClient returns a new retryable HTTP client with the given retryMax and timeout. The retryable HTTP transport is wrapped in an OpenTelemetry transport.

func NewSQLQueuePublisher

func NewSQLQueuePublisher(db *sql.DB, logger watermill.LoggerAdapter) (message.Publisher, error)

func NewSQLQueueSubscriber

func NewSQLQueueSubscriber(db *sql.DB, logger watermill.LoggerAdapter) (message.Subscriber, error)

func RunTaskRouter

func RunTaskRouter(ctx context.Context, appState *models.AppState, db *sql.DB)

func TaskHandler

func TaskHandler(task models.Task) message.NoPublishHandlerFunc

TaskHandler returns a message handler function for the given task. Handlers are NoPublishHandlerFuncs i.e. do not publish messages.

Types

type BaseTask

type BaseTask struct {
	// contains filtered or unexported fields
}

func (*BaseTask) Execute

func (b *BaseTask) Execute(
	_ context.Context,
	_ *message.Message,
) error

func (*BaseTask) HandleError

func (b *BaseTask) HandleError(err error)

type DocumentEmbedderTask

type DocumentEmbedderTask struct {
	BaseTask
}

func NewDocumentEmbedderTask

func NewDocumentEmbedderTask(
	appState *models.AppState,
) *DocumentEmbedderTask

func (*DocumentEmbedderTask) Execute

func (dt *DocumentEmbedderTask) Execute(
	ctx context.Context,
	msg *message.Message,
) error

func (*DocumentEmbedderTask) Process

func (dt *DocumentEmbedderTask) Process(
	ctx context.Context,
	collectionName string,
	docTasks []models.DocEmbeddingTask,
) error

type IntentPromptTemplateData

type IntentPromptTemplateData struct {
	Input string
}

type MessageEmbedderTask

type MessageEmbedderTask struct {
	BaseTask
}

func NewMessageEmbedderTask

func NewMessageEmbedderTask(appState *models.AppState) *MessageEmbedderTask

func (*MessageEmbedderTask) Execute

func (t *MessageEmbedderTask) Execute(
	ctx context.Context,
	msg *message.Message,
) error

func (*MessageEmbedderTask) Process

func (t *MessageEmbedderTask) Process(
	ctx context.Context,
	sessionID string,
	msgs []models.Message,
) error

type MessageIntentTask

type MessageIntentTask struct {
	BaseTask
}

func NewMessageIntentTask

func NewMessageIntentTask(appState *models.AppState) *MessageIntentTask

func (*MessageIntentTask) Execute

func (mt *MessageIntentTask) Execute(
	ctx context.Context,
	msg *message.Message,
) error

type MessageNERTask

type MessageNERTask struct {
	BaseTask
}

func (*MessageNERTask) Execute

func (n *MessageNERTask) Execute(
	ctx context.Context,
	msg *message.Message,
) error

type MessageSummaryEmbedderTask

type MessageSummaryEmbedderTask struct {
	BaseTask
}

func NewMessageSummaryEmbedderTask

func NewMessageSummaryEmbedderTask(appState *models.AppState) *MessageSummaryEmbedderTask

func (*MessageSummaryEmbedderTask) Execute

func (*MessageSummaryEmbedderTask) HandleError

func (t *MessageSummaryEmbedderTask) HandleError(err error)

func (*MessageSummaryEmbedderTask) Process

func (t *MessageSummaryEmbedderTask) Process(
	ctx context.Context,
	sessionID string,
	summary *models.Summary,
) error

type MessageSummaryNERTask

type MessageSummaryNERTask struct {
	BaseTask
}

func (*MessageSummaryNERTask) Execute

func (n *MessageSummaryNERTask) Execute(
	ctx context.Context,
	msg *message.Message,
) error

type MessageSummaryTask

type MessageSummaryTask struct {
	BaseTask
}

MessageSummaryTask gets a list of messages created since the last SummaryPoint, determines if the message count exceeds the configured message window, and if so: - determines the new SummaryPoint index, which will one message older than message_window / 2 - summarizes the messages from this new SummaryPoint to the oldest message not yet Summarized.

When summarizing, it adds context from these messages to an existing summary if there is one.

func NewMessageSummaryTask

func NewMessageSummaryTask(appState *models.AppState) *MessageSummaryTask

func (*MessageSummaryTask) Execute

func (t *MessageSummaryTask) Execute(
	ctx context.Context,
	msg *message.Message,
) error

func (*MessageSummaryTask) HandleError

func (t *MessageSummaryTask) HandleError(err error)

type MessageTokenCountTask

type MessageTokenCountTask struct {
	BaseTask
}

func NewMessageTokenCountTask

func NewMessageTokenCountTask(appState *models.AppState) *MessageTokenCountTask

func (*MessageTokenCountTask) Execute

func (mt *MessageTokenCountTask) Execute(
	ctx context.Context,
	msg *message.Message,
) error

func (*MessageTokenCountTask) HandleError

func (mt *MessageTokenCountTask) HandleError(err error)

type SQLSchema

type SQLSchema struct {
	wsql.DefaultPostgreSQLSchema
}

func (SQLSchema) SubscribeIsolationLevel

func (s SQLSchema) SubscribeIsolationLevel() sql.IsolationLevel

type SummaryPromptTemplateData

type SummaryPromptTemplateData struct {
	PrevSummary    string
	MessagesJoined string
}

type TaskPublisher

type TaskPublisher struct {
	// contains filtered or unexported fields
}

func NewTaskPublisher

func NewTaskPublisher(db *sql.DB) *TaskPublisher

func (*TaskPublisher) Close

func (t *TaskPublisher) Close() error

func (*TaskPublisher) Publish

func (t *TaskPublisher) Publish(
	taskType models.TaskTopic,
	metadata map[string]string,
	payload any,
) error

Publish publishes a message to the given topic. Payload must be a struct that can be marshalled to JSON.

func (*TaskPublisher) PublishMessage

func (t *TaskPublisher) PublishMessage(
	metadata map[string]string,
	payload []models.MessageTask,
) error

PublishMessage publishes a slice of Messages to all Message topics.

type TaskRouter

type TaskRouter struct {
	*message.Router

	Subscribers map[string]message.Subscriber
	// contains filtered or unexported fields
}

TaskRouter is a wrapper around watermill's Router that adds some functionality for managing tasks and handlers. TaskRouter uses a SQLQueueSubscriber for all handlers.

func NewTaskRouter

func NewTaskRouter(appState *models.AppState, db *sql.DB) (*TaskRouter, error)

NewTaskRouter creates a new TaskRouter. Note that db should not be a bun.DB instance as bun runs at an isolation level that is incompatible with watermill's SQLQueueSubscriber.

func (*TaskRouter) AddTask

func (tr *TaskRouter) AddTask(
	_ context.Context,
	name string,
	taskType models.TaskTopic,
	task models.Task,
)

AddTask adds a task handler to the router.

func (*TaskRouter) Close

func (tr *TaskRouter) Close() (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL