package core
import "sync"
type Broadcaster[T any] struct {
name string
listeners map[chan T]bool
mutex sync.RWMutex
}
// NewBroadcaster creates a new Broadcaster instance for the specified type T.
// It initializes a map to hold the listeners.
// The name parameter is used to identify the broadcaster.
// It returns a pointer to the new Broadcaster instance.
func NewBroadcaster[T any](name string) *Broadcaster[T] {
return &Broadcaster[T]{
name: name,
listeners: make(map[chan T]bool),
}
}
// Subscribe adds a new listener channel to the broadcaster.
// It returns a channel of type T that can be used to receive messages.
// The channel is buffered (100) to allow for non-blocking sends.
func (b *Broadcaster[T]) Subscribe() chan T {
ch := make(chan T, 100)
b.mutex.Lock()
b.listeners[ch] = true
b.mutex.Unlock()
return ch
}
// Unsubscribe removes a listener channel from the broadcaster.
// It closes the channel to signal that no more messages will be sent.
// If the channel does not exist in the listeners map, it does nothing.
func (b *Broadcaster[T]) Unsubscribe(ch chan T) {
b.mutex.Lock()
if _, ok := b.listeners[ch]; ok {
delete(b.listeners, ch)
close(ch)
}
b.mutex.Unlock()
}
// Broadcast sends a message of type T to all subscribed listeners.
// It logs the number of listeners and the name of the broadcaster.
// If a listener's channel is full, it skips sending the message to avoid blocking.
// This is a non-blocking send operation.
func (b *Broadcaster[T]) Broadcast(msg T) {
// Create a snapshot of listeners to avoid holding lock during send
b.mutex.RLock()
listeners := make([]chan T, 0, len(b.listeners))
for ch := range b.listeners {
listeners = append(listeners, ch)
}
b.mutex.RUnlock()
// Send to all listeners without holding the lock
for _, ch := range listeners {
select {
case ch <- msg:
default:
// If the channel is full, we skip sending the message
// to avoid blocking the broadcaster.
// This is a non-blocking send.
}
}
}
package core
import (
"context"
"fmt"
"sync"
)
type Listener[T any] struct {
broadcaster *Broadcaster[T]
waitgroup *sync.WaitGroup
}
// NewListener creates a new Listener instance for the specified type T.
// It initializes a broadcaster and a waitgroup to manage concurrent processing of notifications.
// It returns a pointer to the new Listener instance.
func NewListener[T any](broadcaster *Broadcaster[T]) (*Listener[T], error) {
if broadcaster == nil {
return nil, fmt.Errorf("broadcaster cannot be nil")
}
return &Listener[T]{
broadcaster: broadcaster,
waitgroup: &sync.WaitGroup{},
}, nil
}
// Listen listens for notifications on the broadcaster's channel.
// It takes a context for cancellation, a ready channel to signal readiness,
// and a notifyFunction that will be called with the data received.
// The listener will process notifications in a separate goroutine to avoid blocking.
// If the context is done, it will stop listening and return.
// The ready channel is closed in the first for loop iteration to signal that the listener is ready.
func (l *Listener[T]) Listen(ctx context.Context, ready chan struct{}, notifyFunction func(data T)) {
if notifyFunction == nil {
return
}
channel := l.broadcaster.Subscribe()
readySignaled := false
for {
select {
case <-ctx.Done():
if !readySignaled {
close(ready)
}
return
case data := <-channel:
l.waitgroup.Add(1)
if !readySignaled {
readySignaled = true
close(ready)
}
go func(d T) {
defer l.waitgroup.Done()
notifyFunction(d)
}(data)
default:
if !readySignaled {
readySignaled = true
close(ready)
}
}
}
}
// Notify sends a notification with the provided data to all listeners.
// It uses the broadcaster to broadcast the data to all subscribed channels.
// This method is typically called when an event occurs that needs to be communicated to all listeners.
// As Broadcast is not blocking it does not block and will not wait for listeners to process the notification.
func (l *Listener[T]) Notify(data T) {
l.broadcaster.Broadcast(data)
}
// WaitForNotificationsProcessed waits for all notifications to be processed.
// It blocks until all goroutines that were started to process notifications have completed.
// This is useful to ensure that all notifications have been handled before proceeding with further operations.
// It is typically called after calling Listen and Notify to ensure that all processing is complete.
func (l *Listener[T]) WaitForNotificationsProcessed() {
l.waitgroup.Wait()
}
package core
import (
"fmt"
"time"
"github.com/siherrmann/queuer/model"
)
type Retryer struct {
function func() error
sleep time.Duration
options *model.OnError
}
// NewRetryer creates a new Retryer instance.
// It initializes the retryer with a function to execute, a sleep duration for retries,
// and options for retry behavior.
// It returns a pointer to the new Retryer instance or an error if the options are invalid.
func NewRetryer(function func() error, options *model.OnError) (*Retryer, error) {
if options == nil || options.MaxRetries <= 0 || options.RetryDelay < 0 {
return nil, fmt.Errorf("no valid retry options provided")
}
return &Retryer{
function: function,
sleep: time.Duration(options.RetryDelay) * time.Second,
options: options,
}, nil
}
// Retry attempts to execute the function up to MaxRetries times.
// It sleeps for the specified duration between retries.
// The retry behavior is determined by the RetryBackoff option.
// If the function returns an error, it will retry according to the specified backoff strategy.
// If all retries fail, it returns the last error encountered.
//
// The function is executed in a loop until it succeeds or the maximum number of retries is reached
// If the function succeeds, it returns nil.
//
// The backoff strategies are:
// - RETRY_BACKOFF_NONE: No backoff, retries immediately.
// - RETRY_BACKOFF_LINEAR: Increases the sleep duration linearly by the initial delay.
// - RETRY_BACKOFF_EXPONENTIAL: Doubles the sleep duration after each retry.
func (r *Retryer) Retry() (err error) {
for i := 0; i < r.options.MaxRetries; i++ {
err = r.function()
if err != nil {
time.Sleep(r.sleep)
if r.options.RetryBackoff == model.RETRY_BACKOFF_NONE {
continue
} else if r.options.RetryBackoff == model.RETRY_BACKOFF_LINEAR {
r.sleep += time.Duration(r.options.RetryDelay) * time.Second
continue
} else if r.options.RetryBackoff == model.RETRY_BACKOFF_EXPONENTIAL {
r.sleep *= 2
continue
}
}
break
}
return err
}
package core
import (
"context"
"fmt"
"math"
"reflect"
"sync"
"time"
"github.com/siherrmann/queuer/helper"
"github.com/siherrmann/queuer/model"
)
type Runner struct {
cancel context.CancelFunc
cancelMu sync.RWMutex
Options *model.Options
Task interface{}
Parameters model.Parameters
// Result channel to return results
ResultsChannel chan []interface{}
ErrorChannel chan error
}
// NewRunner creates a new Runner instance for the specified task and parameters.
// It checks if the task and parameters are valid and returns a pointer to the new Runner instance.
// It returns an error if the task or parameters are invalid.
func NewRunner(options *model.Options, task interface{}, parameters ...interface{}) (*Runner, error) {
taskInputParameters, err := helper.GetInputParametersFromTask(task)
if err != nil {
return nil, helper.NewError("getting task input parameters", err)
} else if len(taskInputParameters) != len(parameters) {
return nil, fmt.Errorf("task expects %d parameters, got %d", len(taskInputParameters), len(parameters))
}
for i, param := range parameters {
// Convert json float to int if the parameter is int
if taskInputParameters[i].Kind() == reflect.Int && reflect.TypeOf(param).Kind() == reflect.Float64 {
parameters[i] = int(param.(float64))
} else if taskInputParameters[i].Kind() != reflect.TypeOf(param).Kind() {
return nil, fmt.Errorf("parameter %d of task must be of type %s, got %s", i, taskInputParameters[i].Kind(), reflect.TypeOf(param).Kind())
}
}
err = helper.CheckValidTaskWithParameters(task, parameters...)
if err != nil {
return nil, fmt.Errorf("error checking task: %v", err)
}
return &Runner{
Task: task,
Parameters: model.Parameters(parameters),
Options: options,
ResultsChannel: make(chan []interface{}, 1),
ErrorChannel: make(chan error, 1),
}, nil
}
// NewRunnerFromJob creates a new Runner instance from a job.
// It initializes the runner with the job's task and parameters.
// It returns a pointer to the new Runner instance or an error if the job is invalid.
func NewRunnerFromJob(task *model.Task, job *model.Job) (*Runner, error) {
if task == nil || job == nil {
return nil, fmt.Errorf("task and job cannot be nil")
}
parameters := make([]interface{}, len(job.Parameters))
copy(parameters, job.Parameters)
runner, err := NewRunner(job.Options, task.Task, job.Parameters...)
if err != nil {
return nil, fmt.Errorf("error creating runner from job: %v", err)
}
return runner, nil
}
// Run executes the task with the provided parameters.
// It will return results on ResultsChannel and errors on ErrorChannel.
// If the task panics, it will send the panic value to ErrorChannel.
// The main intended use of this function is to run the task in a separate goroutine with panic recovery.
// It uses a context to manage cancellation and timeout.
// If the context is done, it will cancel the task and return an error.
// The context's timeout is set based on the OnError options if provided, otherwise it uses a cancelable context.
func (r *Runner) Run(ctx context.Context) {
var cancel context.CancelFunc
if r.Options != nil && r.Options.OnError != nil && r.Options.OnError.Timeout > 0 {
ctx, cancel = context.WithTimeout(
ctx,
time.Duration(math.Round(r.Options.OnError.Timeout*1000))*time.Millisecond,
)
} else {
ctx, cancel = context.WithCancel(ctx)
}
// Store the cancel function safely
r.cancelMu.Lock()
r.cancel = cancel
r.cancelMu.Unlock()
resultsChannel := make(chan []interface{}, 1)
errorChannel := make(chan error, 1)
panicChan := make(chan interface{})
go func() {
defer func() {
if p := recover(); p != nil {
panicChan <- p
}
}()
taskFunc := reflect.ValueOf(r.Task)
results := taskFunc.Call(r.Parameters.ToReflectValues())
resultValues := []interface{}{}
for _, result := range results {
resultValues = append(resultValues, result.Interface())
}
outputParameters, err := helper.GetOutputParametersFromTask(r.Task)
if err != nil {
errorChannel <- helper.NewError("getting task output parameters", err)
return
}
var ok bool
if len(resultValues) > 0 {
if err, ok = resultValues[len(resultValues)-1].(error); ok || (outputParameters[1].String() == "error" && resultValues[len(resultValues)-1] == nil) {
resultValues = resultValues[:len(resultValues)-1]
}
}
if err != nil {
errorChannel <- err
} else {
resultsChannel <- resultValues
}
}()
for {
select {
case p := <-panicChan:
r.ErrorChannel <- fmt.Errorf("panic running task: %v", p)
return
case err := <-errorChannel:
r.ErrorChannel <- fmt.Errorf("error running task: %v", err)
return
case results := <-resultsChannel:
r.ResultsChannel <- results
return
case <-ctx.Done():
r.ErrorChannel <- fmt.Errorf("error running task: %v", ctx.Err())
return
}
}
}
func (r *Runner) Cancel(onCancel ...func()) {
if len(onCancel) > 0 {
for _, cancelFunc := range onCancel {
if cancelFunc != nil {
cancelFunc()
}
}
}
// Cancel the context if it exists
r.cancelMu.RLock()
cancel := r.cancel
r.cancelMu.RUnlock()
if cancel != nil {
cancel()
}
}
package core
import (
"context"
"log"
"reflect"
"time"
"github.com/siherrmann/queuer/helper"
"github.com/siherrmann/queuer/model"
)
type Scheduler struct {
Task interface{}
Parameters model.Parameters
StartTime *time.Time
}
// NewScheduler creates a new Scheduler instance for the specified task and parameters.
// It checks if the task and parameters are valid and returns a pointer to the new Scheduler instance.
// It returns an error if the task or parameters are invalid.
func NewScheduler(startTime *time.Time, task interface{}, parameters ...interface{}) (*Scheduler, error) {
err := helper.CheckValidTaskWithParameters(task, parameters...)
if err != nil {
return nil, helper.NewError("checking task with parameters", err)
}
return &Scheduler{
Task: task,
Parameters: model.Parameters(parameters),
StartTime: startTime,
}, nil
}
// Go starts the scheduler to run the task at the specified start time.
// It creates a new Runner instance and runs the task after the specified duration.
// If the start time is nil, it runs the task immediately.
// It uses a context to manage cancellation and timeout.
// If the context is done, it will cancel the task and return an error.
// The context's timeout is set based on the OnError options if provided, otherwise it uses a cancelable context.
func (s *Scheduler) Go(ctx context.Context) {
var duration time.Duration
if s.StartTime != nil {
duration = time.Until(*s.StartTime)
}
runner, err := NewRunner(
nil,
func() {
time.AfterFunc(duration, func() {
reflect.ValueOf(s.Task).Call(s.Parameters.ToReflectValues())
})
},
)
if err != nil {
log.Printf("Error creating runner: %v", err)
return
}
go runner.Run(ctx)
}
package core
import (
"context"
"fmt"
"log"
"time"
"github.com/siherrmann/queuer/helper"
)
// Ticker represents a recurring task runner.
type Ticker struct {
interval time.Duration
runner *Runner
}
// NewTicker creates and returns a new Ticker instance.
// It initializes the ticker with a specified interval and a task to run.
// The task must be valid and compatible with the provided parameters.
// It returns a pointer to the new Ticker instance or an error if the interval, task or parameters are invalid.
func NewTicker(interval time.Duration, task interface{}, parameters ...interface{}) (*Ticker, error) {
if interval <= 0 {
return nil, fmt.Errorf("ticker interval must be greater than zero")
}
err := helper.CheckValidTaskWithParameters(task, parameters...)
if err != nil {
return nil, helper.NewError("checking task with parameters", err)
}
runner, err := NewRunner(nil, task, parameters...)
if err != nil {
return nil, helper.NewError("creating runner", err)
}
return &Ticker{
interval: interval,
runner: runner,
}, nil
}
// Go starts the Ticker. It runs the task at the specified interval
// until the provided context is cancelled.
// It uses a ticker to trigger the task execution at the specified interval.
// If the context is done, it will stop the ticker and return.
// The task is run in a separate goroutine to avoid blocking the ticker.
// If the task returns an error, it will log the error.
// The ticker will continue to run until the context is cancelled or an error occurs.
func (t *Ticker) Go(ctx context.Context) {
go t.runner.Run(ctx)
ticker := time.NewTicker(t.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
go t.runner.Run(ctx)
case err := <-t.runner.ErrorChannel:
if err != nil {
// TODO also implement error channel?
log.Printf("Error running task: %v", err)
return
}
}
}
}
package database
import (
"context"
"database/sql"
"fmt"
"log"
"time"
"github.com/google/uuid"
"github.com/lib/pq"
"github.com/siherrmann/queuer/helper"
"github.com/siherrmann/queuer/model"
loadSql "github.com/siherrmann/queuerSql"
)
// JobDBHandlerFunctions defines the interface for Job database operations.
type JobDBHandlerFunctions interface {
CheckTablesExistance() (bool, error)
CreateTable() error
DropTables() error
InsertJob(job *model.Job) (*model.Job, error)
InsertJobTx(tx *sql.Tx, job *model.Job) (*model.Job, error)
BatchInsertJobs(jobs []*model.Job) error
UpdateJobsInitial(worker *model.Worker) ([]*model.Job, error)
UpdateJobFinal(job *model.Job) (*model.Job, error)
UpdateStaleJobs() (int, error)
DeleteJob(rid uuid.UUID) error
SelectJob(rid uuid.UUID) (*model.Job, error)
SelectAllJobs(lastID int, entries int) ([]*model.Job, error)
SelectAllJobsByWorkerRID(workerRid uuid.UUID, lastID int, entries int) ([]*model.Job, error)
SelectAllJobsBySearch(search string, lastID int, entries int) ([]*model.Job, error)
// Job Archive
AddRetentionArchive(retention time.Duration) error
RemoveRetentionArchive() error
SelectJobFromArchive(rid uuid.UUID) (*model.Job, error)
SelectAllJobsFromArchive(lastID int, entries int) ([]*model.Job, error)
SelectAllJobsFromArchiveBySearch(search string, lastID int, entries int) ([]*model.Job, error)
}
// JobDBHandler implements JobDBHandlerFunctions and holds the database connection.
type JobDBHandler struct {
db *helper.Database
EncryptionKey string
}
// NewJobDBHandler creates a new instance of JobDBHandler.
// It initializes the database connection and optionally drops existing tables.
// If withTableDrop is true, it will drop the existing job tables before creating new ones
func NewJobDBHandler(dbConnection *helper.Database, withTableDrop bool, encryptionKey ...string) (*JobDBHandler, error) {
if dbConnection == nil {
return nil, helper.NewError("database connection validation", fmt.Errorf("database connection is nil"))
}
jobDbHandler := &JobDBHandler{
db: dbConnection,
}
if len(encryptionKey) > 0 {
jobDbHandler.EncryptionKey = encryptionKey[0]
}
if withTableDrop {
err := dbConnection.DropFunctionsFromPublicSchema(loadSql.JobFunctions)
if err != nil {
return nil, helper.NewError("drop job functions", err)
}
err = jobDbHandler.DropTables()
if err != nil {
return nil, helper.NewError("drop tables", err)
}
}
err := loadSql.LoadJobSql(jobDbHandler.db.Instance, false)
if err != nil {
return nil, helper.NewError("load job sql", err)
}
err = loadSql.LoadNotifySql(jobDbHandler.db.Instance, false)
if err != nil {
return nil, helper.NewError("load notify sql", err)
}
err = jobDbHandler.CreateTable()
if err != nil {
return nil, helper.NewError("create table", err)
}
return jobDbHandler, nil
}
// CheckTablesExistance checks if the 'job' and 'job_archive' tables exist in the database.
// It returns true if both tables exist, otherwise false.
func (r JobDBHandler) CheckTablesExistance() (bool, error) {
jobExists, err := r.db.CheckTableExistance("job")
if err != nil {
return false, helper.NewError("job table", err)
}
jobArchiveExists, err := r.db.CheckTableExistance("job_archive")
if err != nil {
return false, helper.NewError("job_archive table", err)
}
return jobExists && jobArchiveExists, err
}
// CreateTable creates the 'job' and 'job_archive' tables in the database.
// If the tables already exist, it does not create them again.
// It also creates a trigger for notifying events on the table and all necessary indexes.
func (r JobDBHandler) CreateTable() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Use the SQL init() function to create all tables, triggers, and indexes
_, err := r.db.Instance.ExecContext(ctx, `SELECT init_job();`)
if err != nil {
log.Panicf("error initializing job tables: %#v", err)
}
r.db.Logger.Info("Checked/created table job")
return nil
}
// DropTables drops the 'job' and 'job_archive' tables from the database.
func (r JobDBHandler) DropTables() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
query := `DROP TABLE IF EXISTS job`
_, err := r.db.Instance.ExecContext(ctx, query)
if err != nil {
return helper.NewError("job table", err)
}
query = `DROP TABLE IF EXISTS job_archive`
_, err = r.db.Instance.ExecContext(ctx, query)
if err != nil {
return helper.NewError("job_archive table", err)
}
r.db.Logger.Info("Dropped table job")
return nil
}
// InsertJob inserts a new job record into the database.
func (r JobDBHandler) InsertJob(job *model.Job) (*model.Job, error) {
newJob := &model.Job{}
row := r.db.Instance.QueryRow(
`SELECT * FROM insert_job($1, $2, $3, $4, $5, $6, $7)`,
job.Options,
job.TaskName,
job.Parameters,
job.ParametersKeyed,
job.Status,
job.ScheduledAt,
job.ScheduleCount,
)
err := row.Scan(
&newJob.ID,
&newJob.RID,
&newJob.WorkerID,
&newJob.WorkerRID,
&newJob.Options,
&newJob.TaskName,
&newJob.Parameters,
&newJob.ParametersKeyed,
&newJob.Status,
&newJob.ScheduledAt,
&newJob.ScheduleCount,
&newJob.Attempts,
&newJob.CreatedAt,
&newJob.UpdatedAt,
)
if err != nil {
return nil, helper.NewError("scan", err)
}
return newJob, nil
}
// InsertJobTx inserts a new job record into the database within a transaction.
func (r JobDBHandler) InsertJobTx(tx *sql.Tx, job *model.Job) (*model.Job, error) {
newJob := &model.Job{}
row := tx.QueryRow(
`SELECT * FROM insert_job($1, $2, $3, $4, $5, $6, $7)`,
job.Options,
job.TaskName,
job.Parameters,
job.ParametersKeyed,
job.Status,
job.ScheduledAt,
job.ScheduleCount,
)
err := row.Scan(
&newJob.ID,
&newJob.RID,
&newJob.WorkerID,
&newJob.WorkerRID,
&newJob.Options,
&newJob.TaskName,
&newJob.Parameters,
&newJob.ParametersKeyed,
&newJob.Status,
&newJob.ScheduledAt,
&newJob.ScheduleCount,
&newJob.Attempts,
&newJob.CreatedAt,
&newJob.UpdatedAt,
)
if err != nil {
return nil, helper.NewError("scan", err)
}
return newJob, nil
}
// BatchInsertJobs inserts multiple job records into the database in a single transaction.
func (r JobDBHandler) BatchInsertJobs(jobs []*model.Job) error {
if len(jobs) == 0 {
return nil
}
tx, err := r.db.Instance.Begin()
if err != nil {
return helper.NewError("transaction start", err)
}
stmt, err := tx.Prepare(pq.CopyIn("job", "options", "task_name", "parameters", "parameters_keyed", "scheduled_at"))
if err != nil {
return helper.NewError("statement preparation", err)
}
for _, job := range jobs {
var err error
optionsJSON := []byte("{}")
parametersJSON := []byte("[]")
parametersKeyedJSON := []byte("{}")
if job.Options != nil {
optionsJSON, err = job.Options.Marshal()
if err != nil {
return helper.NewError("marshaling job options", err)
}
}
if job.Parameters != nil {
parametersJSON, err = job.Parameters.Marshal()
if err != nil {
return helper.NewError("marshaling job parameters", err)
}
}
if job.ParametersKeyed != nil {
parametersKeyedJSON, err = job.ParametersKeyed.Marshal()
if err != nil {
return helper.NewError("marshaling job parameters_keyed", err)
}
}
_, err = stmt.Exec(
string(optionsJSON),
job.TaskName,
string(parametersJSON),
string(parametersKeyedJSON),
job.ScheduledAt,
)
if err != nil {
return helper.NewError("batch insert execution", err)
}
}
_, err = stmt.Exec()
if err != nil {
return helper.NewError("final batch insert execution", err)
}
err = stmt.Close()
if err != nil {
return helper.NewError("prepared statement close", err)
}
err = tx.Commit()
if err != nil {
return helper.NewError("transaction commit", err)
}
return nil
}
// UpdateJobsInitial updates an existing queued non locked job record in the database.
//
// Checks if the job is in 'QUEUED' or 'FAILED' status and if the worker can handle the task.
// The worker must have the task in its available tasks and the next interval must be available if set.
// If the job is scheduled it must be scheduled within the next 10 minutes.
// It updates the job to 'RUNNING' status, increments the schedule count and attempts, and sets the started_at timestamp.
// It uses the `FOR UPDATE SKIP LOCKED` clause to avoid locking issues with concurrent updates.
//
// It returns the updated job records.
func (r JobDBHandler) UpdateJobsInitial(worker *model.Worker) ([]*model.Job, error) {
rows, err := r.db.Instance.Query(
`SELECT * FROM update_job_initial($1);`,
worker.ID,
)
if err != nil {
return nil, helper.NewError("query", err)
}
defer rows.Close()
var jobs []*model.Job
for rows.Next() {
job := &model.Job{}
err := rows.Scan(
&job.ID,
&job.RID,
&job.WorkerID,
&job.WorkerRID,
&job.Options,
&job.TaskName,
&job.Parameters,
&job.ParametersKeyed,
&job.Status,
&job.ScheduledAt,
&job.StartedAt,
&job.ScheduleCount,
&job.Attempts,
&job.CreatedAt,
&job.UpdatedAt,
)
if err != nil {
return nil, helper.NewError("scan", err)
}
jobs = append(jobs, job)
}
err = rows.Err()
if err != nil {
return []*model.Job{}, helper.NewError("rows error", err)
}
return jobs, nil
}
// UpdateJobFinal updates an existing job record in the database to state 'FAILED' or 'SUCCEEDED'.
//
// It deletes the job from the 'job' table and inserts it into the 'job_archive' table.
// The archived job will have the status set to the provided status, and it will include results and error information.
//
// It returns the archived job record.
func (r JobDBHandler) UpdateJobFinal(job *model.Job) (*model.Job, error) {
var row *sql.Row
if len(r.EncryptionKey) > 0 {
row = r.db.Instance.QueryRow(
`SELECT * FROM update_job_final_encrypted($1, $2, $3, $4, $5);`,
job.ID,
job.Status,
job.Results,
job.Error,
r.EncryptionKey,
)
} else {
row = r.db.Instance.QueryRow(
`SELECT * FROM update_job_final($1, $2, $3, $4);`,
job.ID,
job.Status,
job.Results,
job.Error,
)
}
archivedJob := &model.Job{}
err := row.Scan(
&archivedJob.ID,
&archivedJob.RID,
&archivedJob.WorkerID,
&archivedJob.WorkerRID,
&archivedJob.Options,
&archivedJob.TaskName,
&archivedJob.Parameters,
&archivedJob.ParametersKeyed,
&archivedJob.Status,
&archivedJob.ScheduledAt,
&archivedJob.StartedAt,
&archivedJob.ScheduleCount,
&archivedJob.Attempts,
&archivedJob.Results,
&archivedJob.Error,
&archivedJob.CreatedAt,
&archivedJob.UpdatedAt,
)
if err != nil {
return nil, helper.NewError("scan", err)
}
return archivedJob, nil
}
// UpdateStaleJobs updates all jobs to QUEUED status where the assigned worker is STOPPED
// so they can be picked up by available workers again. It returns the number of jobs that were updated.
// Jobs are considered stale if their assigned worker has STOPPED status.
func (r JobDBHandler) UpdateStaleJobs() (int, error) {
var affectedRows int
err := r.db.Instance.QueryRow(
`SELECT update_stale_jobs($1, $2, $3, $4, $5)`,
model.JobStatusQueued,
model.JobStatusSucceeded,
model.JobStatusCancelled,
model.JobStatusFailed,
model.WorkerStatusStopped,
).Scan(&affectedRows)
if err != nil {
return 0, helper.NewError("update stale jobs", err)
}
return affectedRows, nil
}
// SelectJob retrieves a single job record from the database based on its RID.
func (r JobDBHandler) SelectJob(rid uuid.UUID) (*model.Job, error) {
row := r.db.Instance.QueryRow(
`SELECT * FROM select_job($1, $2)`,
r.EncryptionKey,
rid,
)
job := &model.Job{}
err := row.Scan(
&job.ID,
&job.RID,
&job.WorkerID,
&job.WorkerRID,
&job.Options,
&job.TaskName,
&job.Parameters,
&job.ParametersKeyed,
&job.Status,
&job.ScheduledAt,
&job.StartedAt,
&job.ScheduleCount,
&job.Attempts,
&job.Results,
&job.Error,
&job.CreatedAt,
&job.UpdatedAt,
)
if err != nil {
return nil, helper.NewError("scan", err)
}
return job, nil
}
// SelectAllJobs retrieves a paginated list of jobs for a all workers.
// It returns jobs that were created before the specified lastID, or the newest jobs if lastID is 0.
func (r JobDBHandler) SelectAllJobs(lastID int, entries int) ([]*model.Job, error) {
rows, err := r.db.Instance.Query(
`SELECT * FROM select_all_jobs($1, $2, $3)`,
r.EncryptionKey,
lastID,
entries,
)
if err != nil {
return []*model.Job{}, helper.NewError("query", err)
}
defer rows.Close()
var jobs []*model.Job
for rows.Next() {
job := &model.Job{}
err := rows.Scan(
&job.ID,
&job.RID,
&job.WorkerID,
&job.WorkerRID,
&job.Options,
&job.TaskName,
&job.Parameters,
&job.ParametersKeyed,
&job.Status,
&job.ScheduledAt,
&job.StartedAt,
&job.ScheduleCount,
&job.Attempts,
&job.Results,
&job.Error,
&job.CreatedAt,
&job.UpdatedAt,
)
if err != nil {
return []*model.Job{}, helper.NewError("scan", err)
}
jobs = append(jobs, job)
}
err = rows.Err()
if err != nil {
return []*model.Job{}, helper.NewError("rows error", err)
}
return jobs, nil
}
// SelectAllJobsByWorkerRID retrieves a paginated list of jobs for a specific worker, filtered by worker RID.
// It returns jobs that were created before the specified lastID, or the newest jobs if lastID is 0.
func (r JobDBHandler) SelectAllJobsByWorkerRID(workerRid uuid.UUID, lastID int, entries int) ([]*model.Job, error) {
rows, err := r.db.Instance.Query(
`SELECT * FROM select_all_jobs_by_worker_rid($1, $2, $3, $4)`,
r.EncryptionKey,
workerRid,
lastID,
entries,
)
if err != nil {
return []*model.Job{}, helper.NewError("query", err)
}
defer rows.Close()
var jobs []*model.Job
for rows.Next() {
job := &model.Job{}
err := rows.Scan(
&job.ID,
&job.RID,
&job.WorkerID,
&job.WorkerRID,
&job.Options,
&job.TaskName,
&job.Parameters,
&job.ParametersKeyed,
&job.Status,
&job.ScheduledAt,
&job.StartedAt,
&job.ScheduleCount,
&job.Attempts,
&job.Results,
&job.Error,
&job.CreatedAt,
&job.UpdatedAt,
)
if err != nil {
return []*model.Job{}, helper.NewError("scan", err)
}
jobs = append(jobs, job)
}
err = rows.Err()
if err != nil {
return []*model.Job{}, helper.NewError("rows error", err)
}
return jobs, nil
}
// SelectAllJobsBySearch retrieves a paginated list of jobs for a worker, filtered by search string.
//
// It searches across 'rid', 'worker_id', and 'status' fields.
// The search is case-insensitive and uses ILIKE for partial matches.
//
// It returns jobs that were created before the specified lastID, or the newest jobs if lastID is 0.
func (r JobDBHandler) SelectAllJobsBySearch(search string, lastID int, entries int) ([]*model.Job, error) {
rows, err := r.db.Instance.Query(
`SELECT * FROM select_all_jobs_by_search($1, $2, $3, $4)`,
r.EncryptionKey,
search,
lastID,
entries,
)
if err != nil {
return []*model.Job{}, helper.NewError("query", err)
}
defer rows.Close()
var jobs []*model.Job
for rows.Next() {
job := &model.Job{}
err := rows.Scan(
&job.ID,
&job.RID,
&job.WorkerID,
&job.WorkerRID,
&job.Options,
&job.TaskName,
&job.Parameters,
&job.ParametersKeyed,
&job.Status,
&job.ScheduledAt,
&job.StartedAt,
&job.ScheduleCount,
&job.Attempts,
&job.Results,
&job.Error,
&job.CreatedAt,
&job.UpdatedAt,
)
if err != nil {
return []*model.Job{}, helper.NewError("scan", err)
}
jobs = append(jobs, job)
}
err = rows.Err()
if err != nil {
return []*model.Job{}, helper.NewError("rows error", err)
}
return jobs, nil
}
// Job Archive
// AddRetentionArchive updates the retention archive settings for the job archive.
func (r JobDBHandler) AddRetentionArchive(retention time.Duration) error {
_, err := r.db.Instance.Exec(
`SELECT add_retention_archive($1)`,
int(retention.Hours()/24),
)
if err != nil {
return helper.NewError("exec", err)
}
return nil
}
// RemoveRetentionArchive removes the retention archive settings for the job archive.
func (r JobDBHandler) RemoveRetentionArchive() error {
_, err := r.db.Instance.Exec(
`SELECT remove_retention_archive()`,
)
if err != nil {
return helper.NewError("exec", err)
}
return nil
}
// DeleteJob deletes a job record from the job archive based on its RID.
// We only delete jobs from the archive as queued and running jobs should be cancelled first.
// Cancelling a job will move it to the archive with CANCELLED status.
func (r JobDBHandler) DeleteJob(rid uuid.UUID) error {
_, err := r.db.Instance.Exec(
`SELECT delete_job($1::UUID)`,
rid,
)
if err != nil {
return helper.NewError("exec", err)
}
return nil
}
// SelectJobFromArchive retrieves a single archived job record from the database based on its RID.
func (r JobDBHandler) SelectJobFromArchive(rid uuid.UUID) (*model.Job, error) {
row := r.db.Instance.QueryRow(
`SELECT * FROM select_job_from_archive($1, $2)`,
r.EncryptionKey,
rid,
)
job := &model.Job{}
err := row.Scan(
&job.ID,
&job.RID,
&job.WorkerID,
&job.WorkerRID,
&job.Options,
&job.TaskName,
&job.Parameters,
&job.ParametersKeyed,
&job.Status,
&job.ScheduledAt,
&job.StartedAt,
&job.ScheduleCount,
&job.Attempts,
&job.Results,
&job.Error,
&job.CreatedAt,
&job.UpdatedAt,
)
if err != nil {
return nil, helper.NewError("scan", err)
}
return job, nil
}
// SelectAllJobsFromArchive retrieves a paginated list of archived jobs.
// It returns jobs that were created before the specified lastID, or the newest jobs if lastID is 0.
func (r JobDBHandler) SelectAllJobsFromArchive(lastID int, entries int) ([]*model.Job, error) {
rows, err := r.db.Instance.Query(
`SELECT * FROM select_all_jobs_from_archive($1, $2, $3)`,
r.EncryptionKey,
lastID,
entries,
)
if err != nil {
return []*model.Job{}, helper.NewError("query", err)
}
defer rows.Close()
var jobs []*model.Job
for rows.Next() {
job := &model.Job{}
err := rows.Scan(
&job.ID,
&job.RID,
&job.WorkerID,
&job.WorkerRID,
&job.Options,
&job.TaskName,
&job.Parameters,
&job.ParametersKeyed,
&job.Status,
&job.ScheduledAt,
&job.StartedAt,
&job.ScheduleCount,
&job.Attempts,
&job.Results,
&job.Error,
&job.CreatedAt,
&job.UpdatedAt,
)
if err != nil {
return []*model.Job{}, helper.NewError("scan", err)
}
jobs = append(jobs, job)
}
err = rows.Err()
if err != nil {
return []*model.Job{}, helper.NewError("rows error", err)
}
return jobs, nil
}
// SelectAllJobsFromArchiveBySearch retrieves a paginated list of archived jobs filtered by search string.
// It searches across 'rid', 'worker_id', 'task_name', and 'status' fields.
// It returns jobs that were created before the specified lastID, or the newest jobs if lastID
func (r JobDBHandler) SelectAllJobsFromArchiveBySearch(search string, lastID int, entries int) ([]*model.Job, error) {
rows, err := r.db.Instance.Query(
`SELECT * FROM select_all_jobs_from_archive_by_search($1, $2, $3, $4)`,
r.EncryptionKey,
search,
lastID,
entries,
)
if err != nil {
return []*model.Job{}, helper.NewError("query", err)
}
defer rows.Close()
var jobs []*model.Job
for rows.Next() {
job := &model.Job{}
err := rows.Scan(
&job.ID,
&job.RID,
&job.WorkerID,
&job.WorkerRID,
&job.Options,
&job.TaskName,
&job.Parameters,
&job.ParametersKeyed,
&job.Status,
&job.ScheduledAt,
&job.StartedAt,
&job.ScheduleCount,
&job.Attempts,
&job.Results,
&job.Error,
&job.CreatedAt,
&job.UpdatedAt,
)
if err != nil {
return []*model.Job{}, helper.NewError("scan", err)
}
jobs = append(jobs, job)
}
err = rows.Err()
if err != nil {
return []*model.Job{}, helper.NewError("rows error", err)
}
return jobs, nil
}
package database
import (
"context"
"log"
"time"
"github.com/lib/pq"
"github.com/siherrmann/queuer/helper"
)
type QueuerListener struct {
Listener *pq.Listener
Channel string
}
// NewQueuerDBListener creates a new QueuerListener instance.
// It initializes a PostgreSQL listener for the specified channel using the provided database configuration.
// The listener will automatically reconnect if the connection is lost, with a 10-second timeout and a 1-minute interval for reconnection attempts.
// If an error occurs during the creation of the listener, it returns an error.
// The listener will log any errors encountered during listening.
func NewQueuerDBListener(dbConfig *helper.DatabaseConfiguration, channel string) (*QueuerListener, error) {
listener := pq.NewListener(dbConfig.DatabaseConnectionString(), 10*time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
if err != nil {
log.Printf("error creating postgres listener: %v", err)
}
})
err := listener.Listen(channel)
if err != nil {
return nil, helper.NewError("listen", err)
}
return &QueuerListener{
Listener: listener,
Channel: channel,
}, nil
}
// Listen listens for events on the specified channel and processes them.
// It takes a context for cancellation, a cancel function to stop listening,
// and a notifyFunction that will be called with the event data when an event is received.
// The listener will check the connection every 90 seconds and will cancel the context if an error occurs during the ping.
// The notifyFunction will be called in a separate goroutine to avoid blocking the listener.
// If the context is done, the listener will stop listening and returns.
// It will log any errors encountered during the ping operation.
func (l *QueuerListener) Listen(ctx context.Context, cancel context.CancelFunc, notifyFunction func(data string)) {
l.ListenWithTimeout(ctx, cancel, notifyFunction, 90*time.Second)
}
// ListenWithTimeout is similar to Listen but allows configuring the ping timeout interval.
// This is primarily used for testing to avoid waiting the full 90 seconds.
func (l *QueuerListener) ListenWithTimeout(ctx context.Context, cancel context.CancelFunc, notifyFunction func(data string), pingTimeout time.Duration) {
for {
select {
case <-ctx.Done():
return
case n := <-l.Listener.Notify:
if n != nil {
go notifyFunction(n.Extra)
}
case <-time.After(pingTimeout):
// Checking connection at the specified interval
err := l.Listener.Ping()
if err != nil {
log.Printf("Error pinging listener: %v", err)
cancel()
return
}
}
}
}
package database
import (
"context"
"database/sql"
"fmt"
"log"
"time"
"github.com/siherrmann/queuer/helper"
"github.com/siherrmann/queuer/model"
loadSql "github.com/siherrmann/queuerSql"
)
// MasterDBHandlerFunctions defines the interface for Master database operations.
type MasterDBHandlerFunctions interface {
CheckTableExistance() (bool, error)
CreateTable() error
DropTable() error
UpdateMaster(worker *model.Worker, settings *model.MasterSettings) (*model.Master, error)
SelectMaster() (*model.Master, error)
}
// MasterDBHandler implements MasterDBHandlerFunctions and holds the database connection.
type MasterDBHandler struct {
db *helper.Database
}
// NewMasterDBHandler creates a new instance of MasterDBHandler.
// It initializes the database connection and creates the master table if it does not exist.
func NewMasterDBHandler(dbConnection *helper.Database, withTableDrop bool) (*MasterDBHandler, error) {
if dbConnection == nil {
return nil, helper.NewError("check", fmt.Errorf("database connection is nil"))
}
masterDbHandler := &MasterDBHandler{
db: dbConnection,
}
if withTableDrop {
err := dbConnection.DropFunctionsFromPublicSchema(loadSql.MasterFunctions)
if err != nil {
return nil, helper.NewError("drop master functions", err)
}
err = masterDbHandler.DropTable()
if err != nil {
return nil, helper.NewError("drop master table", err)
}
}
err := loadSql.LoadMasterSql(masterDbHandler.db.Instance, false)
if err != nil {
return nil, helper.NewError("load master sql", err)
}
err = masterDbHandler.CreateTable()
if err != nil {
return nil, helper.NewError("create master table", err)
}
return masterDbHandler, nil
}
// CheckTableExistance checks if the 'master' table exists in the database.
// It returns true if the table exists, otherwise false.
func (r MasterDBHandler) CheckTableExistance() (bool, error) {
masterTableExists, err := r.db.CheckTableExistance("master")
if err != nil {
return false, helper.NewError("master table", err)
}
return masterTableExists, nil
}
// CreateTable creates the 'master' and 'master_archive' tables in the database.
// If the tables already exist, it does not create them again.
// It also creates a trigger for notifying events on the table and all necessary indexes.
func (r MasterDBHandler) CreateTable() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Use the SQL init_master() function to create the table and initial data
_, err := r.db.Instance.ExecContext(ctx, `SELECT init_master();`)
if err != nil {
log.Panicf("error initializing master table: %#v", err)
}
r.db.Logger.Info("Checked/created table master")
return nil
}
// DropTables drops the 'master' and 'master_archive' tables from the database.
func (r MasterDBHandler) DropTable() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
query := `DROP TABLE IF EXISTS master`
_, err := r.db.Instance.ExecContext(ctx, query)
if err != nil {
return helper.NewError("drop", err)
}
r.db.Logger.Info("Dropped table master")
return nil
}
// UpdateMaster updates the master entry with the given worker's ID and settings.
// It locks the row for update to ensure that only one worker can update the master at a time.
// It returns the old master entry if it was successfully updated, or nil if no update was done.
func (r MasterDBHandler) UpdateMaster(worker *model.Worker, settings *model.MasterSettings) (*model.Master, error) {
row := r.db.Instance.QueryRow(
`SELECT * FROM update_master($1, $2, $3, $4)`,
worker.ID,
worker.RID,
settings,
int(settings.MasterLockTimeout.Minutes()),
)
master := &model.Master{}
err := row.Scan(
&master.ID,
&master.WorkerID,
&master.WorkerRID,
&master.Settings,
&master.CreatedAt,
&master.UpdatedAt,
)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, helper.NewError("scan", err)
}
return master, nil
}
// SelectMaster retrieves the current master entry from the database.
func (r MasterDBHandler) SelectMaster() (*model.Master, error) {
row := r.db.Instance.QueryRow(
`SELECT * FROM select_master()`,
)
master := &model.Master{}
err := row.Scan(
&master.ID,
&master.WorkerID,
&master.WorkerRID,
&master.Settings,
&master.CreatedAt,
&master.UpdatedAt,
)
if err != nil {
return nil, helper.NewError("scan", err)
}
return master, nil
}
package database
import (
"context"
"fmt"
"log"
"time"
"github.com/google/uuid"
"github.com/lib/pq"
"github.com/siherrmann/queuer/helper"
"github.com/siherrmann/queuer/model"
loadSql "github.com/siherrmann/queuerSql"
)
// WorkerDBHandlerFunctions defines the interface for Worker database operations.
type WorkerDBHandlerFunctions interface {
CheckTableExistance() (bool, error)
CreateTable() error
DropTable() error
InsertWorker(worker *model.Worker) (*model.Worker, error)
UpdateWorker(worker *model.Worker) (*model.Worker, error)
UpdateStaleWorkers(staleThreshold time.Duration) (int, error)
DeleteWorker(rid uuid.UUID) error
DeleteStaleWorkers(deleteThreshold time.Duration) (int, error)
SelectWorker(rid uuid.UUID) (*model.Worker, error)
SelectAllWorkers(lastID int, entries int) ([]*model.Worker, error)
SelectAllWorkersBySearch(search string, lastID int, entries int) ([]*model.Worker, error)
// Connections
SelectAllConnections() ([]*model.Connection, error)
}
// WorkerDBHandler implements WorkerDBHandlerFunctions and holds the database connection.
type WorkerDBHandler struct {
db *helper.Database
}
// NewWorkerDBHandler creates a new instance of WorkerDBHandler.
// It initializes the database connection and optionally drops the existing worker table.
// If withTableDrop is true, it will drop the existing worker table before creating a new one.
func NewWorkerDBHandler(dbConnection *helper.Database, withTableDrop bool) (*WorkerDBHandler, error) {
if dbConnection == nil {
return nil, helper.NewError("check", fmt.Errorf("database connection is nil"))
}
workerDbHandler := &WorkerDBHandler{
db: dbConnection,
}
if withTableDrop {
err := dbConnection.DropFunctionsFromPublicSchema(loadSql.WorkerFunctions)
if err != nil {
return nil, helper.NewError("drop worker functions", err)
}
err = workerDbHandler.DropTable()
if err != nil {
return nil, helper.NewError("drop worker table", err)
}
}
err := loadSql.LoadWorkerSql(dbConnection.Instance, withTableDrop)
if err != nil {
return nil, helper.NewError("load worker sql", err)
}
err = workerDbHandler.CreateTable()
if err != nil {
return nil, helper.NewError("create worker table", err)
}
return workerDbHandler, nil
}
// CheckTableExistance checks if the 'worker' table exists in the database.
// It returns true if the table exists, otherwise false.
func (r WorkerDBHandler) CheckTableExistance() (bool, error) {
workerTableExists, err := r.db.CheckTableExistance("worker")
if err != nil {
return false, helper.NewError("worker table", err)
}
return workerTableExists, nil
}
// CreateTable creates the 'worker' table in the database if it doesn't already exist.
// It defines the structure of the table with appropriate columns and types.
// If the table already exists, it will not create it again.
// It also creates necessary indexes for efficient querying.
func (r WorkerDBHandler) CreateTable() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err := r.db.Instance.ExecContext(ctx, `SELECT init_worker();`)
if err != nil {
log.Panicf("error initializing worker table: %#v", err)
}
r.db.Logger.Info("Checked/created table worker")
return nil
}
// DropTable drops the 'worker' table from the database.
// It will remove the table and all its data.
// This operation is irreversible, so it should be used with caution.
// It is used during testing or when resetting the database schema.
// If the table does not exist, it will not return an error.
func (r WorkerDBHandler) DropTable() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
query := `DROP TABLE IF EXISTS worker`
_, err := r.db.Instance.ExecContext(ctx, query)
if err != nil {
return helper.NewError("worker table", err)
}
r.db.Logger.Info("Dropped table worker")
return nil
}
// InsertWorker inserts a new worker record with name, options and max concurrency into the database.
// It returns the newly created worker with an automatically generated RID.
// If the insertion fails, it returns an error.
func (r WorkerDBHandler) InsertWorker(worker *model.Worker) (*model.Worker, error) {
row := r.db.Instance.QueryRow(
`SELECT
output_id,
output_rid,
output_name,
output_options,
output_max_concurrency,
output_status,
output_created_at,
output_updated_at
FROM insert_worker($1, $2, $3);`,
worker.Name,
worker.Options,
worker.MaxConcurrency,
)
newWorker := &model.Worker{}
err := row.Scan(
&newWorker.ID,
&newWorker.RID,
&newWorker.Name,
&newWorker.Options,
&newWorker.MaxConcurrency,
&newWorker.Status,
&newWorker.CreatedAt,
&newWorker.UpdatedAt,
)
if err != nil {
return nil, helper.NewError("scan", err)
}
return newWorker, nil
}
// UpdateWorker updates an existing worker record in the database based on its RID.
// It updates the worker's name, options, available tasks, next interval functions, max concurrency, and status.
// It returns the updated worker record with an automatically updated updated_at timestamp.
// If the update fails, it returns an error.
func (r WorkerDBHandler) UpdateWorker(worker *model.Worker) (*model.Worker, error) {
row := r.db.Instance.QueryRow(
`SELECT
output_id,
output_rid,
output_name,
output_options,
output_available_tasks,
output_available_next_interval,
output_max_concurrency,
output_status,
output_created_at,
output_updated_at
FROM update_worker($1, $2, $3, $4, $5, $6, $7);`,
worker.Name,
worker.Options,
pq.Array(worker.AvailableTasks),
pq.Array(worker.AvailableNextIntervalFuncs),
worker.MaxConcurrency,
worker.Status,
worker.RID,
)
updatedWorker := &model.Worker{}
err := row.Scan(
&updatedWorker.ID,
&updatedWorker.RID,
&updatedWorker.Name,
&updatedWorker.Options,
pq.Array(&updatedWorker.AvailableTasks),
pq.Array(&updatedWorker.AvailableNextIntervalFuncs),
&updatedWorker.MaxConcurrency,
&updatedWorker.Status,
&updatedWorker.CreatedAt,
&updatedWorker.UpdatedAt,
)
if err != nil {
return nil, helper.NewError("scan", err)
}
return updatedWorker, err
}
// UpdateStaleWorkers updates all stale workers to STOPPED status based on the provided threshold.
// It returns the number of workers that were updated.
// Workers are considered stale if they have READY or RUNNING status and their updated_at
// timestamp is older than the threshold.
func (r WorkerDBHandler) UpdateStaleWorkers(staleThreshold time.Duration) (int, error) {
cutoffTime := time.Now().UTC().Add(-staleThreshold)
var rowsAffected int
err := r.db.Instance.QueryRow(
`SELECT update_stale_workers($1, $2, $3, $4)`,
model.WorkerStatusStopped,
model.WorkerStatusReady,
model.WorkerStatusRunning,
cutoffTime,
).Scan(&rowsAffected)
if err != nil {
return 0, helper.NewError("update stale workers", err)
}
return rowsAffected, nil
}
// DeleteWorker deletes a worker record from the database based on its RID.
// It removes the worker from the database and returns an error if the deletion fails.
func (r WorkerDBHandler) DeleteWorker(rid uuid.UUID) error {
_, err := r.db.Instance.Exec(
`SELECT delete_worker($1);`,
rid,
)
if err != nil {
return helper.NewError("delete", err)
}
return nil
}
// DeleteStaleWorkers deletes workers that have been in STOPPED status for longer than the deleteThreshold.
// It returns the number of workers that were deleted.
func (r WorkerDBHandler) DeleteStaleWorkers(deleteThreshold time.Duration) (int, error) {
cutoffTime := time.Now().UTC().Add(-deleteThreshold)
var rowsAffected int
err := r.db.Instance.QueryRow(
`SELECT delete_stale_workers($1)`,
cutoffTime,
).Scan(&rowsAffected)
if err != nil {
return 0, helper.NewError("delete stale workers", err)
}
return rowsAffected, nil
}
// SelectWorker retrieves a single worker record from the database based on its RID.
// It returns the worker record.
// If the worker is not found or an error occurs during the query, it returns an error.
func (r WorkerDBHandler) SelectWorker(rid uuid.UUID) (*model.Worker, error) {
worker := &model.Worker{}
row := r.db.Instance.QueryRow(
`SELECT * FROM select_worker($1)`,
rid,
)
err := row.Scan(
&worker.ID,
&worker.RID,
&worker.Name,
&worker.Options,
pq.Array(&worker.AvailableTasks),
pq.Array(&worker.AvailableNextIntervalFuncs),
&worker.MaxConcurrency,
&worker.Status,
&worker.CreatedAt,
&worker.UpdatedAt,
)
if err != nil {
return nil, helper.NewError("scan", err)
}
return worker, nil
}
// SelectAllWorkers retrieves a paginated list of all workers.
// It returns a slice of worker records, ordered by creation date in descending order.
// It returns workers that were created before the specified lastID, or the newest workers if lastID is 0.
func (r WorkerDBHandler) SelectAllWorkers(lastID int, entries int) ([]*model.Worker, error) {
rows, err := r.db.Instance.Query(
`SELECT * FROM select_all_workers($1, $2)`,
lastID,
entries,
)
if err != nil {
return []*model.Worker{}, helper.NewError("query", err)
}
defer rows.Close()
var workers []*model.Worker
for rows.Next() {
worker := &model.Worker{}
err := rows.Scan(
&worker.ID,
&worker.RID,
&worker.Name,
&worker.Options,
pq.Array(&worker.AvailableTasks),
pq.Array(&worker.AvailableNextIntervalFuncs),
&worker.MaxConcurrency,
&worker.Status,
&worker.CreatedAt,
&worker.UpdatedAt,
)
if err != nil {
return []*model.Worker{}, helper.NewError("scan", err)
}
workers = append(workers, worker)
}
err = rows.Err()
if err != nil {
return []*model.Worker{}, helper.NewError("rows error", err)
}
return workers, nil
}
// SelectAllWorkersBySearch retrieves a paginated list of workers, filtered by search string.
// It searches across 'queue_name', 'name', and 'status' fields.
// The search is case-insensitive and uses ILIKE for partial matches.
// It returns a slice of worker records, ordered by creation date in descending order.
// It returns workers that were created before the specified lastID, or the newest workers if last
func (r WorkerDBHandler) SelectAllWorkersBySearch(search string, lastID int, entries int) ([]*model.Worker, error) {
rows, err := r.db.Instance.Query(
`SELECT * FROM select_all_workers_by_search($1, $2, $3)`,
search,
lastID,
entries,
)
if err != nil {
return []*model.Worker{}, helper.NewError("query", err)
}
defer rows.Close()
var workers []*model.Worker
for rows.Next() {
worker := &model.Worker{}
err := rows.Scan(
&worker.ID,
&worker.RID,
&worker.Name,
&worker.Options,
pq.Array(&worker.AvailableTasks),
pq.Array(&worker.AvailableNextIntervalFuncs),
&worker.MaxConcurrency,
&worker.Status,
&worker.CreatedAt,
&worker.UpdatedAt,
)
if err != nil {
return []*model.Worker{}, helper.NewError("scan", err)
}
workers = append(workers, worker)
}
err = rows.Err()
if err != nil {
return []*model.Worker{}, helper.NewError("rows error", err)
}
return workers, nil
}
// SelectAllConnections retrieves all active connections from the database.
// It returns a slice of Connection records.
// If the query fails, it returns an error.
func (r WorkerDBHandler) SelectAllConnections() ([]*model.Connection, error) {
rows, err := r.db.Instance.Query(
`SELECT * FROM select_all_connections()`,
)
if err != nil {
return nil, fmt.Errorf("error querying active connections: %w", err)
}
defer rows.Close()
var connections []*model.Connection
for rows.Next() {
conn := &model.Connection{}
err := rows.Scan(
&conn.PID,
&conn.Database,
&conn.Username,
&conn.ApplicationName,
&conn.Query,
&conn.State,
)
if err != nil {
return nil, helper.NewError("scan", err)
}
connections = append(connections, conn)
}
if err = rows.Err(); err != nil {
return nil, helper.NewError("rows error", err)
}
return connections, nil
}
package helper
import (
"context"
"database/sql"
"fmt"
"log"
"log/slog"
"os"
"strconv"
"strings"
"sync"
"time"
_ "github.com/joho/godotenv/autoload"
"github.com/lib/pq"
)
// Database represents a service that interacts with a database.
type Database struct {
Name string
Logger *slog.Logger
Instance *sql.DB
}
func NewDatabase(name string, dbConfig *DatabaseConfiguration, logger *slog.Logger) *Database {
if dbConfig != nil {
db := &Database{Name: name, Logger: logger}
db.ConnectToDatabase(dbConfig, logger)
if db.Instance == nil {
panic("error connecting to database")
}
return db
} else {
return &Database{
Name: name,
Logger: logger,
Instance: nil,
}
}
}
func NewDatabaseWithDB(name string, dbConnnection *sql.DB, logger *slog.Logger) *Database {
return &Database{
Name: name,
Logger: logger,
Instance: dbConnnection,
}
}
type DatabaseConfiguration struct {
Host string
Port string
Database string
Username string
Password string
Schema string
SSLMode string
WithTableDrop bool
}
// NewDatabaseConfiguration creates a new DatabaseConfiguration instance.
// It reads the database configuration from environment variables.
// It returns a pointer to the new DatabaseConfiguration instance or an error if any required environment variable is not set.
func NewDatabaseConfiguration() (*DatabaseConfiguration, error) {
config := &DatabaseConfiguration{
Host: os.Getenv("QUEUER_DB_HOST"),
Port: os.Getenv("QUEUER_DB_PORT"),
Database: os.Getenv("QUEUER_DB_DATABASE"),
Username: os.Getenv("QUEUER_DB_USERNAME"),
Password: os.Getenv("QUEUER_DB_PASSWORD"),
Schema: os.Getenv("QUEUER_DB_SCHEMA"),
SSLMode: os.Getenv("QUEUER_DB_SSLMODE"),
WithTableDrop: os.Getenv("QUEUER_DB_WITH_TABLE_DROP") == "true",
}
if len(strings.TrimSpace(config.Host)) == 0 || len(strings.TrimSpace(config.Port)) == 0 || len(strings.TrimSpace(config.Database)) == 0 || len(strings.TrimSpace(config.Username)) == 0 || len(strings.TrimSpace(config.Password)) == 0 || len(strings.TrimSpace(config.Schema)) == 0 {
return nil, fmt.Errorf("QUEUER_DB_HOST, QUEUER_DB_PORT, QUEUER_DB_DATABASE, QUEUER_DB_USERNAME, QUEUER_DB_PASSWORD and QUEUER_DB_SCHEMA environment variables must be set")
}
return config, nil
}
func (d *DatabaseConfiguration) DatabaseConnectionString() string {
sslMode := "require"
if len(strings.TrimSpace(d.SSLMode)) > 0 {
sslMode = d.SSLMode
}
return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?application_name=queuer&sslmode=%s&search_path=%s", d.Username, d.Password, d.Host, d.Port, d.Database, sslMode, d.Schema)
}
// Internal function for the service creation to connect to a database.
// DatabaseConfiguration must contain uri, username and password.
// It initializes the database connection and sets the Instance field of the Database struct.
func (d *Database) ConnectToDatabase(dbConfig *DatabaseConfiguration, logger *slog.Logger) {
if len(strings.TrimSpace(dbConfig.Host)) == 0 || len(strings.TrimSpace(dbConfig.Port)) == 0 || len(strings.TrimSpace(dbConfig.Database)) == 0 || len(strings.TrimSpace(dbConfig.Username)) == 0 || len(strings.TrimSpace(dbConfig.Password)) == 0 || len(strings.TrimSpace(dbConfig.Schema)) == 0 {
panic("database configuration must contain uri, username and password")
}
var connectOnce sync.Once
var db *sql.DB
connectOnce.Do(func() {
dsn, err := pq.ParseURL(dbConfig.DatabaseConnectionString())
if err != nil {
log.Panicf("error parsing database connection string: %s", err.Error())
}
base, err := pq.NewConnector(dsn)
if err != nil {
log.Panic(err)
}
db = sql.OpenDB(pq.ConnectorWithNoticeHandler(base, func(notice *pq.Error) {
// log.Printf("Notice sent: %s", notice.Message)
}))
db.SetMaxOpenConns(0)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err = db.ExecContext(
ctx,
"CREATE EXTENSION IF NOT EXISTS pg_trgm;",
)
if err != nil {
logger.Error(err.Error())
}
pingErr := db.Ping()
if pingErr != nil {
log.Panicf("error connecting to database: %s", pingErr.Error())
}
logger.Info("Connected to db")
})
d.Instance = db
}
// CheckTableExistance checks if a table with the specified name exists in the database.
// It queries the information_schema.tables to check for the existence of the table.
// It returns true if the table exists, false otherwise, and an error if the query fails.
func (d *Database) CheckTableExistance(tableName string) (bool, error) {
exists := false
tableNameQuoted := pq.QuoteLiteral(tableName)
row := d.Instance.QueryRow(
fmt.Sprintf(`
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_name = %s
) AS table_existence`,
tableNameQuoted,
),
)
err := row.Scan(
&exists,
)
if err != nil {
return false, err
}
return exists, nil
}
// CreateIndex creates an index on the specified column of the specified table.
// It uses the PostgreSQL CREATE INDEX statement to create the index.
// If the index already exists, it will not create a new one.
// It returns an error if the index creation fails.
func (d *Database) CreateIndex(tableName string, columnName string) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
tableNameQuoted := pq.QuoteIdentifier(tableName)
indexQuoted := pq.QuoteIdentifier("idx_" + tableName + "_" + columnName)
columnNameQuoted := pq.QuoteIdentifier(columnName)
_, err := d.Instance.ExecContext(
ctx,
fmt.Sprintf("CREATE INDEX IF NOT EXISTS %s ON %s(%s)", indexQuoted, tableNameQuoted, columnNameQuoted),
)
if err != nil {
return fmt.Errorf("error creating %s index: %#v", indexQuoted, err)
}
return nil
}
// CreateIndexes creates indexes on the specified columns of the specified table.
// It iterates over the column names and calls CreateIndex for each one.
// It returns an error if any of the index creations fail.
func (d *Database) CreateIndexes(tableName string, columnNames ...string) error {
for _, columnName := range columnNames {
err := d.CreateIndex(tableName, columnName)
if err != nil {
return err
}
}
return nil
}
// CreateCombinedIndex creates a combined index on the specified columns of the specified table.
// It uses the PostgreSQL CREATE INDEX statement to create the index.
// If the index already exists, it will not create a new one.
// It returns an error if the index creation fails.
func (d *Database) CreateCombinedIndex(tableName string, columnName1 string, columnName2 string) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
tableNameQuoted := pq.QuoteIdentifier(tableName)
indexQuoted := pq.QuoteIdentifier("idx_" + tableName + "_" + columnName1 + "_" + columnName2)
columnName1Quoted := pq.QuoteIdentifier(columnName1)
columnName2Quoted := pq.QuoteIdentifier(columnName2)
_, err := d.Instance.ExecContext(
ctx,
fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s ON %s (%s, %s)`, indexQuoted, tableNameQuoted, columnName1Quoted, columnName2Quoted),
)
if err != nil {
return fmt.Errorf("error creating %s index: %#v", indexQuoted, err)
}
return nil
}
// CreateUniqueCombinedIndex creates a unique combined index on the specified columns of the specified table.
// It uses the PostgreSQL CREATE UNIQUE INDEX statement to create the index.
// If the index already exists, it will not create a new one.
// It returns an error if the index creation fails.
func (d *Database) CreateUniqueCombinedIndex(tableName string, columnName1 string, columnName2 string) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
tableNameQuoted := pq.QuoteIdentifier(tableName)
indexQuoted := pq.QuoteIdentifier("idx_" + tableName + "_" + columnName1 + "_" + columnName2)
columnName1Quoted := pq.QuoteIdentifier(columnName1)
columnName2Quoted := pq.QuoteIdentifier(columnName2)
_, err := d.Instance.ExecContext(
ctx,
fmt.Sprintf(`CREATE UNIQUE INDEX IF NOT EXISTS %s ON %s (%s, %s)`, indexQuoted, tableNameQuoted, columnName1Quoted, columnName2Quoted),
)
if err != nil {
return fmt.Errorf("error creating %s index: %#v", indexQuoted, err)
}
return nil
}
// DropIndex drops the index on the specified table and column.
// It uses the PostgreSQL DROP INDEX statement to drop the index.
// If the index does not exist, it will not return an error.
// It returns an error if the index dropping fails.
func (d *Database) DropIndex(tableName string, jsonMapKey string) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
indexQuoted := pq.QuoteIdentifier("idx_" + tableName + "_" + jsonMapKey)
_, err := d.Instance.ExecContext(
ctx,
fmt.Sprintf(`DROP INDEX IF EXISTS %s;`, indexQuoted),
)
if err != nil {
return fmt.Errorf("error dropping %s index: %#v", indexQuoted, err)
}
return nil
}
// Health checks the health of the database connection by pinging the database.
// It returns a map with keys indicating various health statistics.
func (d *Database) Health() map[string]string {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
stats := make(map[string]string)
// Ping the database
err := d.Instance.PingContext(ctx)
if err != nil {
stats["status"] = "down"
stats["error"] = fmt.Sprintf("db down: %v", err)
log.Panicf("db down: %v", err) // Log the error and terminate the program
return stats
}
// Database is up, add more statistics
stats["status"] = "up"
stats["message"] = "It's healthy"
// Get database stats (like open connections, in use, idle, etc.)
dbStats := d.Instance.Stats()
stats["open_connections"] = strconv.Itoa(dbStats.OpenConnections)
stats["in_use"] = strconv.Itoa(dbStats.InUse)
stats["idle"] = strconv.Itoa(dbStats.Idle)
stats["wait_count"] = strconv.FormatInt(dbStats.WaitCount, 10)
stats["wait_duration"] = dbStats.WaitDuration.String()
stats["max_idle_closed"] = strconv.FormatInt(dbStats.MaxIdleClosed, 10)
stats["max_lifetime_closed"] = strconv.FormatInt(dbStats.MaxLifetimeClosed, 10)
// Evaluate stats to provide a health message
if dbStats.OpenConnections > 40 { // Assuming 50 is the max for this example
stats["message"] = "The database is experiencing heavy load."
}
if dbStats.WaitCount > 1000 {
stats["message"] = "The database has a high number of wait events, indicating potential bottlenecks."
}
if dbStats.MaxIdleClosed > int64(dbStats.OpenConnections)/2 {
stats["message"] = "Many idle connections are being closed, consider revising the connection pool settings."
}
if dbStats.MaxLifetimeClosed > int64(dbStats.OpenConnections)/2 {
stats["message"] = "Many connections are being closed due to max lifetime, consider increasing max lifetime or revising the connection usage pattern."
}
return stats
}
// Close closes the database connection.
// It logs a message indicating the disconnection from the specific database.
// If the connection is successfully closed, it returns nil.
// If an error occurs while closing the connection, it returns the error.
func (d *Database) Close() error {
log.Printf("Disconnected from database: %v", d.Instance)
return d.Instance.Close()
}
// DropFunctionsFromPublicSchema drops user-defined functions from the public schema.
// It queries pg_proc to find all overloaded versions of the specified functions,
// filters to only public schema functions, excludes extension-owned functions,
// and drops each by its full signature. This is SQL injection safe as it uses
// parameterized queries for function name lookup.
func (d *Database) DropFunctionsFromPublicSchema(functionNames []string) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for _, functionName := range functionNames {
// Query for all overloaded versions, filtering to public schema and excluding extensions
rows, queryErr := d.Instance.QueryContext(ctx, `
SELECT pg_proc.oid::regprocedure::text
FROM pg_proc
JOIN pg_namespace ON pg_proc.pronamespace = pg_namespace.oid
WHERE pg_proc.proname = $1
AND pg_namespace.nspname = 'public'
AND NOT EXISTS (
SELECT 1 FROM pg_depend
WHERE objid = pg_proc.oid AND deptype = 'e'
)
`, functionName)
if queryErr != nil {
return NewError("query function overloads "+functionName, queryErr)
}
defer rows.Close()
var signatures []string
for rows.Next() {
var signature string
if scanErr := rows.Scan(&signature); scanErr != nil {
return NewError("scan function signature", scanErr)
}
signatures = append(signatures, signature)
}
// Drop each overloaded function by its full signature
for _, signature := range signatures {
dropQuery := fmt.Sprintf(`DROP FUNCTION IF EXISTS %s;`, signature)
_, err := d.Instance.ExecContext(ctx, dropQuery)
if err != nil {
return NewError("drop function "+signature, err)
}
}
}
return nil
}
package helper
import (
"context"
"fmt"
"log/slog"
"net/url"
"testing"
"time"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/postgres"
"github.com/testcontainers/testcontainers-go/wait"
)
const (
dbName = "database"
dbUser = "user"
dbPwd = "password"
)
// MustStartPostgresContainer starts a PostgreSQL container for testing purposes.
// It uses the timescale/timescaledb image with PostgreSQL 17.
// It returns a function to terminate the container, the port on which the database is accessible,
// and an error if the container could not be started.
func MustStartPostgresContainer() (func(ctx context.Context, opts ...testcontainers.TerminateOption) error, string, error) {
ctx := context.Background()
pgContainer, err := postgres.Run(
ctx,
"timescale/timescaledb:latest-pg17",
postgres.WithDatabase(dbName),
postgres.WithUsername(dbUser),
postgres.WithPassword(dbPwd),
testcontainers.WithWaitStrategy(
wait.ForLog("database system is ready to accept connections").
WithOccurrence(2).WithStartupTimeout(5*time.Second),
),
)
if err != nil {
return nil, "", fmt.Errorf("error starting postgres container: %w", err)
}
connStr, err := pgContainer.ConnectionString(ctx, "sslmode=disable")
if err != nil {
return nil, "", fmt.Errorf("error getting connection string: %w", err)
}
u, err := url.Parse(connStr)
if err != nil {
return nil, "", fmt.Errorf("error parsing connection string: %v", err)
}
return pgContainer.Terminate, u.Port(), err
}
// NewTestDatabase creates a new Database instance for testing purposes.
// It initializes the database with the provided configuration and the name "test_db".
// It returns a pointer to the new Database instance.
func NewTestDatabase(config *DatabaseConfiguration) *Database {
return NewDatabase(
"test_db",
config,
slog.Default(),
)
}
// SetTestDatabaseConfigEnvs sets the environment variables for the test database configuration.
// It sets the host, port, database name, username, password, schema,
// and table drop options for the test database.
func SetTestDatabaseConfigEnvs(t *testing.T, port string) {
t.Setenv("QUEUER_DB_HOST", "localhost")
t.Setenv("QUEUER_DB_PORT", port)
t.Setenv("QUEUER_DB_DATABASE", dbName)
t.Setenv("QUEUER_DB_USERNAME", dbUser)
t.Setenv("QUEUER_DB_PASSWORD", dbPwd)
t.Setenv("QUEUER_DB_SCHEMA", "public")
t.Setenv("QUEUER_DB_SSLMODE", "disable")
t.Setenv("QUEUER_DB_WITH_TABLE_DROP", "true")
}
package helper
import (
"fmt"
"path"
"runtime"
"strings"
)
type Error struct {
Original error
Trace []string
}
func (e Error) Error() string {
return e.Original.Error() + " | Trace: " + fmt.Sprint(strings.Join(e.Trace, ", "))
}
func NewError(trace string, original error) Error {
pc, _, _, ok := runtime.Caller(1)
details := runtime.FuncForPC(pc)
if ok && details != nil {
functionName := path.Base(details.Name())
trace = functionName + " - " + trace
}
if v, ok := original.(Error); ok {
return Error{
Original: v.Original,
Trace: append(v.Trace, trace),
}
}
return Error{
Original: original,
Trace: []string{trace},
}
}
package helper
import (
"context"
"encoding/json"
"io"
"log"
"log/slog"
"github.com/fatih/color"
)
type PrettyHandlerOptions struct {
SlogOpts slog.HandlerOptions
}
type PrettyHandler struct {
slog.Handler
l *log.Logger
}
func (h *PrettyHandler) Handle(ctx context.Context, r slog.Record) error {
level := r.Level.String() + ":"
switch r.Level {
case slog.LevelDebug:
level = color.MagentaString(level)
case slog.LevelInfo:
level = color.BlueString(level)
case slog.LevelWarn:
level = color.YellowString(level)
case slog.LevelError:
level = color.RedString(level)
}
fields := make(map[string]interface{}, r.NumAttrs())
r.Attrs(func(a slog.Attr) bool {
fields[a.Key] = a.Value.Any()
return true
})
b, err := json.MarshalIndent(fields, "", " ")
if err != nil {
return err
}
timeStr := r.Time.Format("[15:05:05.000]")
msg := color.CyanString(r.Message)
h.l.Println(timeStr, level, msg, color.WhiteString(string(b)))
return nil
}
func NewPrettyHandler(
out io.Writer,
opts PrettyHandlerOptions,
) *PrettyHandler {
h := &PrettyHandler{
Handler: slog.NewJSONHandler(out, &opts.SlogOpts),
l: log.New(out, "", 0),
}
return h
}
package helper
import (
"fmt"
"reflect"
"runtime"
)
// CheckValidTask checks if the provided task is a valid function.
// It returns an error if the task is nil, not a function, or if its value is nil.
func CheckValidTask(task interface{}) error {
if task == nil {
return fmt.Errorf("task must not be nil")
}
if reflect.ValueOf(task).Kind() != reflect.Func {
return fmt.Errorf("task must be a function, got %s", reflect.TypeOf(task).Kind())
}
if reflect.ValueOf(task).IsNil() {
return fmt.Errorf("task value must not be nil")
}
return nil
}
// CheckValidTaskWithParameters checks if the provided task and parameters are valid.
// It checks if the task is a valid function and if the parameters match the task's input types.
func CheckValidTaskWithParameters(task interface{}, parameters ...interface{}) error {
err := CheckValidTask(task)
if err != nil {
return err
}
taskType := reflect.TypeOf(task)
if taskType.NumIn() != len(parameters) {
return fmt.Errorf("task expects %d parameters, got %d", taskType.NumIn(), len(parameters))
}
for i, param := range parameters {
if !reflect.TypeOf(param).AssignableTo(taskType.In(i)) {
return fmt.Errorf("parameter %d of task must be of type %s, got %s", i, taskType.In(i).Kind(), reflect.TypeOf(param).Kind())
}
}
return nil
}
// GetTaskNameFromFunction retrieves the name of the function from the provided task.
// It checks if the task is a valid function and returns its name.
func GetTaskNameFromFunction(f interface{}) (string, error) {
err := CheckValidTask(f)
if err != nil {
return "", err
}
return runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), nil
}
// GetTaskNameFromInterface retrieves the name of the task from the provided interface.
// It checks if the task is a string or a function and returns its name accordingly.
func GetTaskNameFromInterface(task interface{}) (string, error) {
if taskNameString, ok := task.(string); ok {
return taskNameString, nil
}
return GetTaskNameFromFunction(task)
}
// GetInputParametersFromTask retrieves the input parameters of the provided task.
// It checks if the task is a valid function and returns its input parameter types.
func GetInputParametersFromTask(task interface{}) ([]reflect.Type, error) {
inputCount := reflect.TypeOf(task).NumIn()
inputParameters := []reflect.Type{}
for i := 0; i < inputCount; i++ {
inputParameters = append(inputParameters, reflect.TypeOf(task).In(i))
}
return inputParameters, nil
}
// GetOutputParametersFromTask retrieves the output parameters of the provided task.
// It checks if the task is a valid function and returns its output parameter types.
func GetOutputParametersFromTask(task interface{}) ([]reflect.Type, error) {
outputCount := reflect.TypeOf(task).NumOut()
outputParameters := []reflect.Type{}
for i := 0; i < outputCount; i++ {
outputParameters = append(outputParameters, reflect.TypeOf(task).Out(i))
}
return outputParameters, nil
}
package model
import (
"database/sql/driver"
"encoding/json"
"errors"
"fmt"
"reflect"
"strings"
"time"
"github.com/google/uuid"
"github.com/siherrmann/queuer/helper"
)
const (
// Job statuses before processing
JobStatusQueued = "QUEUED"
JobStatusScheduled = "SCHEDULED"
// Running status is used when the job is being processed by a worker.
JobStatusRunning = "RUNNING"
// Job statuses after processing
JobStatusFailed = "FAILED"
JobStatusSucceeded = "SUCCEEDED"
JobStatusCancelled = "CANCELLED"
)
type Parameters []interface{}
func (c Parameters) Value() (driver.Value, error) {
return c.Marshal()
}
func (c *Parameters) Scan(value interface{}) error {
return c.Unmarshal(value)
}
func (r Parameters) Marshal() ([]byte, error) {
return json.Marshal(r)
}
func (r *Parameters) Unmarshal(value interface{}) error {
if s, ok := value.(Parameters); ok {
*r = Parameters(s)
} else {
b, ok := value.([]byte)
if !ok {
return helper.NewError("byte assertion", errors.New("type assertion to []byte failed"))
}
return json.Unmarshal(b, r)
}
return nil
}
func (r *Parameters) ToReflectValues() []reflect.Value {
if r == nil {
return []reflect.Value{}
}
reflectValues := []reflect.Value{}
for _, p := range *r {
reflectValues = append(reflectValues, reflect.ValueOf(p))
}
return reflectValues
}
func (r *Parameters) ToInterfaceSlice() []interface{} {
if r == nil {
return []interface{}{}
}
interfaceSlice := make([]interface{}, len(*r))
copy(interfaceSlice, *r)
return interfaceSlice
}
type ParametersKeyed map[string]interface{}
func (c ParametersKeyed) Value() (driver.Value, error) {
return c.Marshal()
}
func (c *ParametersKeyed) Scan(value interface{}) error {
return c.Unmarshal(value)
}
func (r ParametersKeyed) Marshal() ([]byte, error) {
return json.Marshal(r)
}
func (r *ParametersKeyed) Unmarshal(value interface{}) error {
if s, ok := value.(ParametersKeyed); ok {
*r = ParametersKeyed(s)
} else {
b, ok := value.([]byte)
if !ok {
return helper.NewError("byte assertion", errors.New("type assertion to []byte failed"))
}
return json.Unmarshal(b, r)
}
return nil
}
// Job represents an assigned task to a worker.
// It contains all necessary information to execute the task,
// including OnError and Schedule options, parameters, and status.
//
// ID, RID, CreatedAt, and UpdatedAt are set automatically on creation.
//
// Status, ScheduledAt, StartedAt, ScheduleCount, Attempts,
// Results, Error, CreatedAt, and UpdatedAt are set automatically on update.
type Job struct {
ID int `json:"id"`
RID uuid.UUID `json:"rid"`
WorkerID int `json:"worker_id"`
WorkerRID uuid.UUID `json:"worker_rid"`
Options *Options `json:"options"`
TaskName string `json:"task_name"`
Parameters Parameters `json:"parameters"`
ParametersKeyed ParametersKeyed `json:"parameters_keyed"`
Status string `json:"status"`
ScheduledAt *time.Time `json:"scheduled_at"`
StartedAt *time.Time `json:"started_at"`
ScheduleCount int `json:"schedule_count"`
Attempts int `json:"attempts"`
Results Parameters `json:"result"`
Error string `json:"error"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// NewJob creates a new Job instance with the provided task, options, and parameters.
// It validates the task name and options, and initializes the job status and scheduled time if applicable.
// It returns a pointer to the new Job instance or an error if something is invalid.
func NewJob(task interface{}, options *Options, parametersKeyed map[string]interface{}, parameters ...interface{}) (*Job, error) {
taskName, err := helper.GetTaskNameFromInterface(task)
if err != nil {
return nil, helper.NewError("getting task name", err)
}
if len(taskName) == 0 || len(taskName) > 100 {
return nil, helper.NewError("taskName check", fmt.Errorf("taskName must have a length between 1 and 100"))
}
if options != nil && options.OnError != nil {
err := options.OnError.IsValid()
if err != nil {
return nil, helper.NewError("OnError check", err)
}
}
if options != nil && options.Schedule != nil {
err := options.Schedule.IsValid()
if err != nil {
return nil, helper.NewError("Schedule check", err)
}
}
status := JobStatusQueued
var scheduledAt time.Time
if options != nil && options.Schedule != nil {
status = JobStatusScheduled
scheduledAt = options.Schedule.Start.UTC()
}
return &Job{
TaskName: taskName,
Status: status,
ScheduledAt: &scheduledAt,
Options: options,
Parameters: parameters,
ParametersKeyed: parametersKeyed,
}, nil
}
// JobFromNotification represents a job received from a notification.
// It contains all fields from Job, but with DBTime
// for time fields to handle database-specific time formats.
type JobFromNotification struct {
ID int `json:"id"`
RID uuid.UUID `json:"rid"`
WorkerID int `json:"worker_id"`
WorkerRID uuid.UUID `json:"worker_rid"`
Options *Options `json:"options"`
TaskName string `json:"task_name"`
Parameters Parameters `json:"parameters"`
ParametersKeyed ParametersKeyed `json:"parameters_keyed"`
Status string `json:"status"`
ScheduledAt DBTime `json:"scheduled_at"`
StartedAt DBTime `json:"started_at"`
Attempts int `json:"attempts"`
Results Parameters `json:"result"`
Error string `json:"error"`
CreatedAt DBTime `json:"created_at"`
UpdatedAt DBTime `json:"updated_at"`
}
// ToJob converts a JobFromNotification to a Job instance.
func (jn *JobFromNotification) ToJob() *Job {
return &Job{
ID: jn.ID,
RID: jn.RID,
WorkerID: jn.WorkerID,
WorkerRID: jn.WorkerRID,
Options: jn.Options,
TaskName: jn.TaskName,
Parameters: jn.Parameters,
ParametersKeyed: jn.ParametersKeyed,
Status: jn.Status,
ScheduledAt: &jn.ScheduledAt.Time,
StartedAt: &jn.StartedAt.Time,
Attempts: jn.Attempts,
Results: jn.Results,
Error: jn.Error,
CreatedAt: jn.CreatedAt.Time,
UpdatedAt: jn.UpdatedAt.Time,
}
}
// DBTime is a custom time type for handling database-specific time formats.
type DBTime struct {
time.Time
}
const dbTimeLayoutWithoutZeroes = "2006-01-02T15:04:05."
const dbTimeLayout = "2006-01-02T15:04:05.000000"
func (ct DBTime) MarshalJSON() ([]byte, error) {
if ct.Time.IsZero() {
return []byte("null"), nil
}
return []byte(fmt.Sprintf("\"%s\"", ct.Time.Format(dbTimeLayout))), nil
}
func (ct *DBTime) UnmarshalJSON(b []byte) error {
s := strings.Trim(string(b), "\"")
// Handle db null value and zero time
if s == "null" || s == "0001-01-01T00:00:00" {
ct.Time = time.Time{}
return nil
}
tSplit := strings.Split(s, ".")
if len(tSplit) != 2 {
return helper.NewError("splitting db time format", fmt.Errorf("invalid time format: %s", s))
}
var err error
ct.Time, err = time.Parse(dbTimeLayoutWithoutZeroes+strings.Repeat("0", len(tSplit[1])), s)
if err != nil {
return helper.NewError("parsing db time", fmt.Errorf("error parsing time: %s, error: %w", s, err))
}
return nil
}
func (ct *DBTime) IsSet() bool {
return !ct.IsZero()
}
package model
import (
"database/sql/driver"
"encoding/json"
"errors"
"time"
"github.com/google/uuid"
"github.com/siherrmann/queuer/helper"
)
type Master struct {
ID int `json:"id"`
Settings MasterSettings `json:"settings"`
WorkerID int `json:"worker_id"`
WorkerRID uuid.UUID `json:"worker_rid"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type MasterSettings struct {
// MasterLockTimeout is the duration after which a master lock
// is considered stale and a new worker can get master.
MasterLockTimeout time.Duration `json:"master_lock_timeout"`
// MasterPollInterval is the interval at which the master worker
// updates the master entry to stay master.
MasterPollInterval time.Duration `json:"master_poll_interval"`
// JobDeleteThreshold is the duration for which archived data is retained.
JobDeleteThreshold time.Duration `json:"retention_archive"`
// WorkerStaleThreshold is the duration after which a worker
// is considered stale if it hasn't updated its heartbeat
// and gets updated to status STOPPED.
WorkerStaleThreshold time.Duration `json:"worker_stale_threshold"`
// WorkerDeleteThreshold is the duration after which a stale worker
// is deleted from the database.
WorkerDeleteThreshold time.Duration `json:"worker_delete_threshold"`
// JobStaleThreshold is the duration after which a job
// is considered stale if it hasn't been updated
// and gets updated to status CANCELED.
JobStaleThreshold time.Duration `json:"job_stale_threshold"`
}
func (c MasterSettings) Value() (driver.Value, error) {
return c.Marshal()
}
func (c *MasterSettings) Scan(value interface{}) error {
return c.Unmarshal(value)
}
func (r MasterSettings) Marshal() ([]byte, error) {
return json.Marshal(r)
}
func (r *MasterSettings) Unmarshal(value interface{}) error {
if o, ok := value.(MasterSettings); ok {
*r = o
} else {
b, ok := value.([]byte)
if !ok {
return helper.NewError("byte assertion", errors.New("type assertion to []byte failed"))
}
return json.Unmarshal(b, r)
}
return nil
}
func (r *MasterSettings) SetDefault() {
if r.MasterLockTimeout == 0 {
r.MasterLockTimeout = 5 * time.Minute
}
if r.MasterPollInterval == 0 {
r.MasterPollInterval = 1 * time.Minute
}
if r.WorkerStaleThreshold == 0 {
r.WorkerStaleThreshold = 5 * time.Minute
}
if r.WorkerDeleteThreshold == 0 {
r.WorkerDeleteThreshold = 24 * time.Hour
}
if r.JobStaleThreshold == 0 {
r.JobStaleThreshold = 1 * time.Hour
}
if r.JobDeleteThreshold == 0 {
r.JobDeleteThreshold = 7 * 24 * time.Hour
}
}
package model
import (
"database/sql/driver"
"encoding/json"
"errors"
"github.com/siherrmann/queuer/helper"
)
type Options struct {
OnError *OnError `json:"on_error,omitempty"`
Schedule *Schedule `json:"schedule,omitempty"`
}
func (c *Options) IsValid() error {
// nil options are considered valid
if c == nil {
return nil
}
if c.OnError != nil {
if err := c.OnError.IsValid(); err != nil {
return err
}
}
if c.Schedule != nil {
if err := c.Schedule.IsValid(); err != nil {
return err
}
}
return nil
}
func (c Options) Value() (driver.Value, error) {
return c.Marshal()
}
func (c *Options) Scan(value interface{}) error {
return c.Unmarshal(value)
}
func (r Options) Marshal() ([]byte, error) {
return json.Marshal(r)
}
func (r *Options) Unmarshal(value interface{}) error {
if o, ok := value.(Options); ok {
*r = o
} else {
b, ok := value.([]byte)
if !ok {
return helper.NewError("byte assertion", errors.New("type assertion to []byte failed"))
}
return json.Unmarshal(b, r)
}
return nil
}
package model
import (
"database/sql/driver"
"encoding/json"
"errors"
"github.com/siherrmann/queuer/helper"
)
const (
RETRY_BACKOFF_NONE = "none"
RETRY_BACKOFF_LINEAR = "linear"
RETRY_BACKOFF_EXPONENTIAL = "exponential"
)
// OptionsOnError represents the options to handle errors during job execution.
// It includes timeout, maximum retries, retry delay, and retry backoff strategy.
// It is used to define how the system should behave when a job fails.
//
// Parameters:
// - Timeout is the maximum time in seconds to wait for a job to complete before considering it failed.
// - MaxRetries is the maximum number of retries to attempt if a job fails.
// - RetryDelay is the delay in seconds before retrying a failed job.
// - RetryBackoff is the strategy to use for retrying failed jobs. It can be one of the following:
// - none: no retry, the job will fail immediately.
// - linear: retry with a fixed delay.
// - exponential: retry with an exponentially increasing delay.
// If RetryBackoff is not provided, it defaults to "none".
type OnError struct {
Timeout float64 `json:"timeout"`
MaxRetries int `json:"max_retries"`
RetryDelay float64 `json:"retry_delay"`
RetryBackoff string `json:"retry_backoff"`
}
// IsValid checks if the OnError options are valid.
// Timeout, MaxRetries, and RetryDelay must be non-negative.
// RetryBackoff must be one of the predefined strategies: none, linear, or exponential.
// It returns an error if any of the conditions are not met.
func (c *OnError) IsValid() error {
if c.Timeout < 0 {
return errors.New("timeout cannot be negative")
}
if c.MaxRetries < 0 {
return errors.New("max retries cannot be negative")
}
if c.RetryDelay < 0 {
return errors.New("retry delay cannot be negative")
}
if c.RetryBackoff != RETRY_BACKOFF_NONE &&
c.RetryBackoff != RETRY_BACKOFF_LINEAR &&
c.RetryBackoff != RETRY_BACKOFF_EXPONENTIAL {
return errors.New("retry backoff must be one of: none, linear, exponential")
}
return nil
}
func (c OnError) Value() (driver.Value, error) {
return c.Marshal()
}
func (c *OnError) Scan(value interface{}) error {
return c.Unmarshal(value)
}
func (r OnError) Marshal() ([]byte, error) {
return json.Marshal(r)
}
func (r *OnError) Unmarshal(value interface{}) error {
if o, ok := value.(OnError); ok {
*r = o
} else {
b, ok := value.([]byte)
if !ok {
return helper.NewError("byte assertion", errors.New("type assertion to []byte failed"))
}
return json.Unmarshal(b, r)
}
return nil
}
package model
import (
"database/sql/driver"
"encoding/json"
"errors"
"time"
"github.com/siherrmann/queuer/helper"
)
// NextIntervalFunc defines a function type that calculates the next interval
// based on the start time and the current count of executions.
type NextIntervalFunc func(start time.Time, currentCount int) time.Time
// Schedule represents the scheduling options for a job.
// It includes the start time, maximum count of executions, interval between executions,
// and an optional next interval function to calculate the next execution time.
// It is used to define how often a job should be executed.
//
// Parameters:
// - Start is the time when the job should start executing.
// - If MaxCount is 0, the job will run indefinitely.
// - If MaxCount greater 0, the job will run MaxCount times.
// - Interval is the duration between executions.
// - If MaxCount is equal or greater than 0, Interval must be greater than zero or NextInterval must be provided.
// - If MaxCount is 1, NextInterval can be provided to specify the next execution time.
type Schedule struct {
Start time.Time `json:"start"`
MaxCount int `json:"max_count"`
Interval time.Duration `json:"interval"`
NextInterval string `json:"next_interval,omitempty"`
}
// IsValid checks if the Schedule options are valid.
// Start time must not be zero, MaxCount must be non-negative,
// Interval must be greater than zero if MaxCount is greater than 1,
// or NextInterval must be provided.
func (c *Schedule) IsValid() error {
if c.Start.IsZero() {
return helper.NewError("zero Start check", errors.New("start time cannot be zero"))
}
if c.MaxCount < 0 {
return helper.NewError("negative MaxCount check", errors.New("maxCount must be greater than or equal to 0"))
}
if c.MaxCount > 1 && c.Interval <= time.Duration(0) && c.NextInterval == "" {
return helper.NewError("invalid interval check", errors.New("interval must be greater than zero or nextInterval must be provided"))
}
return nil
}
func (c Schedule) Value() (driver.Value, error) {
return c.Marshal()
}
func (c *Schedule) Scan(value interface{}) error {
return c.Unmarshal(value)
}
func (r Schedule) Marshal() ([]byte, error) {
return json.Marshal(r)
}
func (r *Schedule) Unmarshal(value interface{}) error {
if o, ok := value.(Schedule); ok {
*r = o
} else {
b, ok := value.([]byte)
if !ok {
return helper.NewError("byte assertion", errors.New("type assertion to []byte failed"))
}
return json.Unmarshal(b, r)
}
return nil
}
package model
import (
"fmt"
"reflect"
"github.com/siherrmann/queuer/helper"
)
// Task represents a job task with its function, name, input parameters, and output parameters.
// It is used to define a job that can be executed by the queuer system.
//
// Parameters:
// - Task is the function that will be executed as a job.
// - Name is the name of the task, which should be unique and descriptive.
// - InputParameters is a slice of reflect.Type representing the types of input parameters for the task.
type Task struct {
Task interface{}
Name string
InputParameters []reflect.Type
OutputParameters []reflect.Type
}
func NewTask(task interface{}) (*Task, error) {
taskName, err := helper.GetTaskNameFromFunction(task)
if err != nil {
return nil, helper.NewError("getting task name", err)
}
return NewTaskWithName(task, taskName)
}
func NewTaskWithName(task interface{}, taskName string) (*Task, error) {
if len(taskName) == 0 || len(taskName) > 100 {
return nil, helper.NewError("taskName check", fmt.Errorf("taskName must have a length between 1 and 100"))
}
err := helper.CheckValidTask(task)
if err != nil {
return nil, err
}
inputParameters := []reflect.Type{}
inputCount := reflect.TypeOf(task).NumIn()
for i := 0; i < inputCount; i++ {
inputParameters = append(inputParameters, reflect.TypeOf(task).In(i))
}
outputParameters := []reflect.Type{}
outputCount := reflect.TypeOf(task).NumOut()
for i := 0; i < outputCount; i++ {
outputParameters = append(outputParameters, reflect.TypeOf(task).Out(i))
}
return &Task{
Task: task,
Name: taskName,
InputParameters: inputParameters,
OutputParameters: outputParameters,
}, nil
}
package model
import (
"fmt"
"time"
"github.com/google/uuid"
"github.com/siherrmann/queuer/helper"
)
const (
WorkerStatusReady = "READY"
WorkerStatusRunning = "RUNNING"
WorkerStatusFailed = "FAILED"
WorkerStatusStopped = "STOPPED"
)
// Worker represents a worker that can execute tasks.
// It includes the worker's ID, name, options for error handling, maximum concurrency,
// available tasks, and status.
//
// ID, RID, Status, CreatedAt, and UpdatedAt are set automatically on creation.
//
// Parameters:
// - Name is the name of the worker, which should be unique and descriptive.
// - Options is an optional field that can be used to specify error handling options.
// If the Job has options set, the Job options are used as primary options.
// - MaxConcurrency is the maximum number of tasks that can be executed concurrently by the worker.
// - AvailableTasks is a list of task names that the worker can execute.
// - AvailableNextIntervalFuncs is a list of next interval functions that the worker can use for
type Worker struct {
ID int `json:"id"`
RID uuid.UUID `json:"rid"`
Name string `json:"name"`
Options *OnError `json:"options,omitempty"`
MaxConcurrency int `json:"max_concurrency"`
AvailableTasks []string `json:"available_tasks,omitempty"`
AvailableNextIntervalFuncs []string `json:"available_next_interval,omitempty"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// NewWorker creates a new Worker with the specified name and maximum concurrency.
// It validates the name and maximum concurrency, and initializes the worker status to running.
// It returns a pointer to the new Worker instance or an error if something is invalid.
func NewWorker(name string, maxConcurrency int) (*Worker, error) {
if len(name) == 0 || len(name) > 100 {
return nil, helper.NewError("name check", fmt.Errorf("name must have a length between 1 and 100"))
}
if maxConcurrency < 1 || maxConcurrency > 100 {
return nil, helper.NewError("maxConcurrency check", fmt.Errorf("maxConcurrency must be between 1 and 100"))
}
return &Worker{
Name: name,
MaxConcurrency: maxConcurrency,
Status: WorkerStatusReady,
}, nil
}
// NewWorkerWithOptions creates a new Worker with the specified name, maximum concurrency, and error handling options.
// It validates the name, maximum concurrency, and options, and initializes the worker status to running.
// It returns a pointer to the new Worker instance or an error if something is invalid.
func NewWorkerWithOptions(name string, maxConcurrency int, options *OnError) (*Worker, error) {
if len(name) == 0 || len(name) > 100 {
return nil, helper.NewError("name check", fmt.Errorf("name must have a length between 1 and 100"))
}
if maxConcurrency < 1 || maxConcurrency > 1000 {
return nil, helper.NewError("maxConcurrency check", fmt.Errorf("maxConcurrency must be between 1 and 1000"))
}
err := options.IsValid()
if err != nil {
return nil, helper.NewError("options check", err)
}
return &Worker{
Name: name,
Options: options,
MaxConcurrency: maxConcurrency,
Status: WorkerStatusReady,
}, nil
}
package queuer
import (
"context"
"database/sql"
"encoding/json"
"log"
"log/slog"
"os"
"strings"
"sync"
"time"
"github.com/siherrmann/queuer/core"
"github.com/siherrmann/queuer/database"
"github.com/siherrmann/queuer/helper"
"github.com/siherrmann/queuer/model"
)
// Queuer represents the main queuing system.
// It manages job scheduling, execution, and error handling.
// It provides methods to start, stop, and manage jobs and workers.
// It also handles database connections and listeners for job events.
type Queuer struct {
DB *sql.DB
JobPollInterval time.Duration
RetentionArchive time.Duration
// Context
ctx context.Context
cancel context.CancelFunc
// Runners
activeRunners sync.Map
// Logger
log *slog.Logger
// Worker
worker *model.Worker
workerMu sync.RWMutex
// DBs
dbConfig *helper.DatabaseConfiguration
dbJob database.JobDBHandlerFunctions
dbWorker database.WorkerDBHandlerFunctions
dbMaster database.MasterDBHandlerFunctions
// Job DB listeners
jobDbListener *database.QueuerListener
jobArchiveDbListener *database.QueuerListener
// Job listeners
jobInsertListener *core.Listener[*model.Job]
jobUpdateListener *core.Listener[*model.Job]
jobDeleteListener *core.Listener[*model.Job]
// Available functions
tasks map[string]*model.Task
nextIntervalFuncs map[string]model.NextIntervalFunc
}
// NewQueuer creates a new Queuer instance with the given name and max concurrency.
// It wraps NewQueuerWithDB to initialize the queuer without an external db config and encryption key.
// The encryption key for the database is taken from an environment variable (QUEUER_ENCRYPTION_KEY),
// if not provided, it defaults to unencrypted results.
func NewQueuer(name string, maxConcurrency int, options ...*model.OnError) *Queuer {
return NewQueuerWithDB(name, maxConcurrency, os.Getenv("QUEUER_ENCRYPTION_KEY"), nil, options...)
}
// NewQueuerWithDB creates a new Queuer instance with the given name and max concurrency.
// It initializes the database connection and worker.
// If options are provided, it creates a worker with those options.
//
// It takes the db configuration from environment variables if dbConfig is nil.
// - QUEUER_DB_HOST (required)
// - QUEUER_DB_PORT (required)
// - QUEUER_DB_DATABASE (required)
// - QUEUER_DB_USERNAME (required)
// - QUEUER_DB_PASSWORD (required)
// - QUEUER_DB_SCHEMA (required)
// - QUEUER_DB_SSLMODE (optional, defaults to "require")
//
// If the encryption key is empty, it defaults to unencrypted results.
//
// If any error occurs during initialization, it logs a panic error and exits the program.
// It returns a pointer to the newly created Queuer instance.
func NewQueuerWithDB(name string, maxConcurrency int, encryptionKey string, dbConfig *helper.DatabaseConfiguration, options ...*model.OnError) *Queuer {
// Logger
opts := helper.PrettyHandlerOptions{
SlogOpts: slog.HandlerOptions{
Level: slog.LevelInfo,
},
}
logger := slog.New(helper.NewPrettyHandler(os.Stdout, opts))
// Database
var err error
var dbCon *helper.Database
if dbConfig != nil {
dbCon = helper.NewDatabase(
"queuer",
dbConfig,
logger,
)
} else {
var err error
dbConfig, err = helper.NewDatabaseConfiguration()
if err != nil {
log.Panicf("error creating database configuration: %s", err.Error())
}
dbCon = helper.NewDatabase(
"queuer",
dbConfig,
logger,
)
}
// DBs
dbJob, err := database.NewJobDBHandler(dbCon, dbConfig.WithTableDrop)
if err != nil {
log.Panicf("error creating job db handler: %s", err.Error())
}
dbWorker, err := database.NewWorkerDBHandler(dbCon, dbConfig.WithTableDrop)
if err != nil {
log.Panicf("error creating worker db handler: %s", err.Error())
}
dbMaster, err := database.NewMasterDBHandler(dbCon, dbConfig.WithTableDrop)
if err != nil {
log.Panicf("error creating master db handler: %s", err.Error())
}
// Inserting worker
var newWorker *model.Worker
if len(options) > 0 {
newWorker, err = model.NewWorkerWithOptions(name, maxConcurrency, options[0])
if err != nil {
log.Panicf("error creating new worker with options: %s", err.Error())
}
} else {
newWorker, err = model.NewWorker(name, maxConcurrency)
if err != nil {
log.Panicf("error creating new worker: %s", err.Error())
}
}
// Worker
worker, err := dbWorker.InsertWorker(newWorker)
if err != nil {
log.Panicf("error inserting worker: %s", err.Error())
}
logger.Info("Queuer with worker created", slog.String("worker_name", newWorker.Name), slog.String("worker_rid", worker.RID.String()))
return &Queuer{
log: logger,
worker: worker,
DB: dbCon.Instance,
dbConfig: dbConfig,
dbJob: dbJob,
dbWorker: dbWorker,
dbMaster: dbMaster,
JobPollInterval: 1 * time.Minute,
tasks: map[string]*model.Task{},
nextIntervalFuncs: map[string]model.NextIntervalFunc{},
}
}
// NewStaticQueuer creates a new Queuer instance without a worker.
// It initializes the database connection and other necessary components.
// If any error occurs during initialization, it logs a panic error and exits the program.
// It returns a pointer to the newly created Queuer instance.
// This queuer instance does not listen to the db nor does it run jobs.
// It is primarily used for static operations like adding jobs, getting job status etc.
func NewStaticQueuer(logLevel slog.Leveler, dbConfig *helper.DatabaseConfiguration) *Queuer {
// Logger
opts := helper.PrettyHandlerOptions{
SlogOpts: slog.HandlerOptions{
Level: logLevel,
},
}
logger := slog.New(helper.NewPrettyHandler(os.Stdout, opts))
// Database
var err error
var dbCon *helper.Database
if dbConfig != nil {
dbCon = helper.NewDatabase(
"queuer",
dbConfig,
logger,
)
} else {
var err error
dbConfig, err = helper.NewDatabaseConfiguration()
if err != nil {
log.Panicf("error creating database configuration: %s", err.Error())
}
dbCon = helper.NewDatabase(
"queuer",
dbConfig,
logger,
)
}
// DBs
dbJob, err := database.NewJobDBHandler(dbCon, dbConfig.WithTableDrop)
if err != nil {
log.Panicf("error creating job db handler: %s", err.Error())
}
dbWorker, err := database.NewWorkerDBHandler(dbCon, dbConfig.WithTableDrop)
if err != nil {
log.Panicf("error creating worker db handler: %s", err.Error())
}
dbMaster, err := database.NewMasterDBHandler(dbCon, dbConfig.WithTableDrop)
if err != nil {
log.Panicf("error creating master db handler: %s", err.Error())
}
return &Queuer{
log: logger,
DB: dbCon.Instance,
dbConfig: dbConfig,
dbJob: dbJob,
dbWorker: dbWorker,
dbMaster: dbMaster,
JobPollInterval: 1 * time.Minute,
tasks: map[string]*model.Task{},
nextIntervalFuncs: map[string]model.NextIntervalFunc{},
}
}
// Start starts the queuer by initializing the job listeners and starting the job poll ticker.
// If masterSettings are provided, it also starts the master poll ticker.
// If masterSettings contain 0 values, they are set to default values.
// It checks if the queuer is initialized properly, and if not, it logs a panic error and exits the program.
// It runs the job processing in a separate goroutine and listens for job events.
//
// Detailed steps include:
// 1. Create job and job archive database listeners.
// 2. Create broadcasters for job insert, update, and delete events.
// 3. Start the job listeners to listen for job events.
// 4. Start the job poll ticker to periodically check for new jobs.
// 5. Wait for the queuer to be ready or log a panic error if it fails to start within 5 seconds.
func (q *Queuer) Start(ctx context.Context, cancel context.CancelFunc, masterSettings ...*model.MasterSettings) {
if ctx == nil || ctx == context.TODO() || cancel == nil {
panic("ctx and cancel must be set")
}
q.ctx = ctx
q.cancel = cancel
// DB listeners
var err error
q.jobDbListener, err = database.NewQueuerDBListener(q.dbConfig, "job")
if err != nil {
log.Panicf("error creating job insert listener: %s", err.Error())
}
q.log.Info("Added listener", slog.String("channel", "job"))
q.jobArchiveDbListener, err = database.NewQueuerDBListener(q.dbConfig, "job_archive")
if err != nil {
log.Panicf("error creating job archive listener: %s", err.Error())
}
q.log.Info("Added listener", slog.String("channel", "job_archive"))
// Broadcasters for job updates and deletes
broadcasterJobInsert := core.NewBroadcaster[*model.Job]("job.INSERT")
q.jobInsertListener, err = core.NewListener(broadcasterJobInsert)
if err != nil {
log.Panicf("error creating job insert listener: %s", err.Error())
}
broadcasterJobUpdate := core.NewBroadcaster[*model.Job]("job.UPDATE")
q.jobUpdateListener, err = core.NewListener(broadcasterJobUpdate)
if err != nil {
log.Panicf("error creating job update listener: %s", err.Error())
}
broadcasterJobDelete := core.NewBroadcaster[*model.Job]("job.DELETE")
q.jobDeleteListener, err = core.NewListener(broadcasterJobDelete)
if err != nil {
log.Panicf("error creating job delete listener: %s", err.Error())
}
// Update worker to running
q.workerMu.Lock()
q.worker.Status = model.WorkerStatusRunning
q.worker, err = q.dbWorker.UpdateWorker(q.worker)
q.workerMu.Unlock()
if err != nil {
log.Panicf("error updating worker status to running: %s", err.Error())
}
// Start pollers
ready := make(chan struct{})
go func() {
q.listen(ctx, cancel)
err = q.heartbeatTicker(ctx)
if err != nil && ctx.Err() == nil {
q.log.Error("Error starting heartbeat ticker", slog.String("error", err.Error()))
return
}
err := q.pollJobTicker(ctx)
if err != nil && ctx.Err() == nil {
q.log.Error("Error starting job poll ticker", slog.String("error", err.Error()))
return
}
if len(masterSettings) > 0 && masterSettings[0] != nil {
masterSettings[0].SetDefault()
err = q.pollMasterTicker(ctx, masterSettings[0])
if err != nil && ctx.Err() == nil {
q.log.Error("Error starting master poll ticker", slog.String("error", err.Error()))
return
}
}
close(ready)
<-ctx.Done()
err = q.Stop()
if err != nil {
q.log.Error("Error stopping queuer", slog.String("error", err.Error()))
}
}()
select {
case <-ready:
q.log.Info("Queuer started")
return
case <-time.After(5 * time.Second):
q.log.Error("Queuer failed to start within 5 seconds")
}
}
// Start starts the queuer by initializing the job listeners and starting the job poll ticker.
// It checks if the queuer is initialized properly, and if not, it logs a panic error and exits the program.
// It runs the job processing in a separate goroutine and listens for job events.
//
// This version does not run the job processing, allowing the queuer to be started without a worker.
// Is is useful if you want to run a queuer instance in a seperate service without a worker,
// for example to handle listening to job events and providing a central frontend.
func (q *Queuer) StartWithoutWorker(ctx context.Context, cancel context.CancelFunc, withoutListeners bool, masterSettings ...*model.MasterSettings) {
q.ctx = ctx
q.cancel = cancel
// Job listeners
var err error
if !withoutListeners {
q.jobDbListener, err = database.NewQueuerDBListener(q.dbConfig, "job")
if err != nil {
log.Panicf("error creating job insert listener: %s", err.Error())
}
q.log.Info("Added listener", slog.String("channel", "job"))
q.jobArchiveDbListener, err = database.NewQueuerDBListener(q.dbConfig, "job_archive")
if err != nil {
log.Panicf("error creating job archive listener: %s", err.Error())
}
q.log.Info("Added listener", slog.String("channel", "job_archive"))
}
// Broadcasters for job updates and deletes
broadcasterJobInsert := core.NewBroadcaster[*model.Job]("job.INSERT")
q.jobInsertListener, err = core.NewListener(broadcasterJobInsert)
if err != nil {
log.Panicf("error creating job insert listener: %s", err.Error())
}
broadcasterJobUpdate := core.NewBroadcaster[*model.Job]("job.UPDATE")
q.jobUpdateListener, err = core.NewListener(broadcasterJobUpdate)
if err != nil {
log.Panicf("error creating job update listener: %s", err.Error())
}
broadcasterJobDelete := core.NewBroadcaster[*model.Job]("job.DELETE")
q.jobDeleteListener, err = core.NewListener(broadcasterJobDelete)
if err != nil {
log.Panicf("error creating job delete listener: %s", err.Error())
}
// Start job listeners
ready := make(chan struct{})
go func() {
if !withoutListeners {
q.listenWithoutRunning(ctx, cancel)
}
if len(masterSettings) > 0 && masterSettings[0] != nil {
err = q.pollMasterTicker(ctx, masterSettings[0])
if err != nil && ctx.Err() == nil {
q.log.Error("Error starting master poll ticker", slog.String("error", err.Error()))
return
}
}
close(ready)
<-ctx.Done()
err := q.Stop()
if err != nil {
q.log.Error("Error stopping queuer", slog.String("error", err.Error()))
}
}()
select {
case <-ready:
q.log.Info("Queuer without worker started")
return
case <-time.After(5 * time.Second):
q.log.Error("Queuer failed to start within 5 seconds")
}
}
// Stop stops the queuer by closing the job listeners, cancelling all queued and running jobs,
// and cancelling the context to stop the queuer.
func (q *Queuer) Stop() error {
// Close db listeners
if q.jobDbListener != nil {
err := q.jobDbListener.Listener.Close()
if err != nil && !strings.Contains(err.Error(), "Listener has been closed") {
return helper.NewError("closing job insert listener", err)
}
}
if q.jobArchiveDbListener != nil {
err := q.jobArchiveDbListener.Listener.Close()
if err != nil && !strings.Contains(err.Error(), "Listener has been closed") {
return helper.NewError("closing job archive listener", err)
}
}
// Update worker status to stopped
var err error
q.workerMu.Lock()
q.worker.Status = model.WorkerStatusStopped
q.worker, err = q.dbWorker.UpdateWorker(q.worker)
workerRID := q.worker.RID
q.workerMu.Unlock()
if err != nil {
return helper.NewError("updating worker status to stopped", err)
}
// Cancel all queued and running jobs
err = q.CancelAllJobsByWorker(workerRID, 100)
if err != nil {
return helper.NewError("cancelling all jobs by worker", err)
}
// Cancel the context to stop the queuer
if q.ctx != nil {
q.cancel()
}
// Wait a moment for background goroutines to finish gracefully
time.Sleep(100 * time.Millisecond)
// Close database connection
if q.DB != nil {
q.log.Info("Closing database connection")
err = q.DB.Close()
if err != nil {
q.log.Error("error closing database connection", slog.String("error", err.Error()))
}
}
q.log.Info("Queuer stopped")
return nil
}
// Internal
// listen listens to job events and runs the initial job processing.
func (q *Queuer) listen(ctx context.Context, cancel context.CancelFunc) {
readyJob := make(chan struct{})
readyJobArchive := make(chan struct{})
go func() {
close(readyJob)
q.jobDbListener.Listen(ctx, cancel, func(data string) {
job := &model.JobFromNotification{}
err := json.Unmarshal([]byte(data), job)
if err != nil {
q.log.Error("Error unmarshalling job data", slog.String("error", err.Error()))
return
}
switch job.Status {
case model.JobStatusQueued, model.JobStatusScheduled:
q.jobInsertListener.Notify(job.ToJob())
err = q.runJobInitial()
if err != nil {
q.log.Error("Error running job", slog.String("error", err.Error()))
return
}
default:
q.jobUpdateListener.Notify(job.ToJob())
}
})
}()
<-readyJob
go func() {
close(readyJobArchive)
q.jobArchiveDbListener.Listen(ctx, cancel, func(data string) {
job := &model.JobFromNotification{}
err := json.Unmarshal([]byte(data), job)
if err != nil {
q.log.Error("Error unmarshalling job data", slog.String("error", err.Error()))
return
}
switch job.Status {
case model.JobStatusCancelled:
runner, ok := q.activeRunners.Load(job.RID)
if ok {
q.log.Info("Canceling running job", slog.String("job_id", job.RID.String()))
runner.(*core.Runner).Cancel()
q.activeRunners.Delete(job.RID)
}
default:
q.jobDeleteListener.Notify(job.ToJob())
}
})
}()
<-readyJobArchive
}
func (q *Queuer) listenWithoutRunning(ctx context.Context, cancel context.CancelFunc) {
readyJob := make(chan struct{})
readyJobArchive := make(chan struct{})
go func() {
close(readyJob)
q.jobDbListener.Listen(ctx, cancel, func(data string) {
job := &model.JobFromNotification{}
err := json.Unmarshal([]byte(data), job)
if err != nil {
q.log.Error("Error unmarshalling job data", slog.String("error", err.Error()))
return
}
if job.Status == model.JobStatusQueued || job.Status == model.JobStatusScheduled {
q.jobInsertListener.Notify(job.ToJob())
} else {
q.jobUpdateListener.Notify(job.ToJob())
}
})
}()
<-readyJob
go func() {
close(readyJobArchive)
q.jobArchiveDbListener.Listen(ctx, cancel, func(data string) {
job := &model.JobFromNotification{}
err := json.Unmarshal([]byte(data), job)
if err != nil {
q.log.Error("Error unmarshalling job data", slog.String("error", err.Error()))
return
}
q.jobDeleteListener.Notify(job.ToJob())
})
}()
<-readyJobArchive
}
func (q *Queuer) heartbeatTicker(ctx context.Context) error {
// Default heartbeat interval of 30 seconds
heartbeatInterval := 30 * time.Second
ticker, err := core.NewTicker(
heartbeatInterval,
func() {
q.log.Debug("Sending worker heartbeat...")
q.workerMu.RLock()
worker := q.worker
q.workerMu.RUnlock()
if worker == nil {
return
}
// Update worker to refresh the updated_at timestamp
updatedWorker, err := q.dbWorker.UpdateWorker(worker)
if err != nil {
q.log.Error("Error updating worker heartbeat", slog.String("error", err.Error()))
return
}
q.workerMu.Lock()
q.worker = updatedWorker
q.workerMu.Unlock()
},
)
if err != nil {
return helper.NewError("creating heartbeat ticker", err)
}
q.log.Info("Starting worker heartbeat ticker...")
go ticker.Go(ctx)
return nil
}
func (q *Queuer) pollJobTicker(ctx context.Context) error {
ticker, err := core.NewTicker(
q.JobPollInterval,
func() {
q.log.Info("Polling jobs...")
err := q.runJobInitial()
if err != nil {
q.log.Error("Error running job", slog.String("error", err.Error()))
}
},
)
if err != nil {
return helper.NewError("creating ticker", err)
}
q.log.Info("Starting job poll ticker...")
go ticker.Go(ctx)
return nil
}
func (q *Queuer) pollMasterTicker(ctx context.Context, masterSettings *model.MasterSettings) error {
ctxInner, cancel := context.WithCancel(ctx)
ticker, err := core.NewTicker(
masterSettings.MasterPollInterval,
func() {
q.log.Info("Polling master...")
q.workerMu.RLock()
worker := q.worker
workerRID := q.worker.RID
q.workerMu.RUnlock()
master, err := q.dbMaster.UpdateMaster(worker, masterSettings)
if err != nil {
q.log.Error("Error updating master", slog.String("error", err.Error()))
}
if master != nil {
q.log.Debug("New master", slog.String("worker_id", workerRID.String()))
err := q.masterTicker(ctx, master, masterSettings)
if err != nil {
q.log.Error("Error starting master ticker", slog.String("error", err.Error()))
} else {
cancel()
}
}
},
)
if err != nil {
return helper.NewError("creating ticker", err)
}
q.log.Info("Starting master poll ticker...")
go ticker.Go(ctxInner)
return nil
}
package queuer
import (
"database/sql"
"fmt"
"log/slog"
"strings"
"time"
"github.com/google/uuid"
"github.com/siherrmann/queuer/core"
"github.com/siherrmann/queuer/helper"
"github.com/siherrmann/queuer/model"
)
// AddJob adds a job to the queue with the given task and parameters.
// As a task you can either pass a function or a string with the task name
// (necessary if you want to use a task with a name set by you).
// It returns the created job or an error if something goes wrong.
func (q *Queuer) AddJob(task interface{}, parametersKeyed map[string]interface{}, parameters ...interface{}) (*model.Job, error) {
options := q.mergeOptions(nil)
job, err := q.addJob(task, options, parametersKeyed, parameters...)
if err != nil {
q.log.Error("Error adding job", slog.String("error", err.Error()))
return nil, helper.NewError("adding job", err)
}
q.log.Info("Job added", slog.String("job_rid", job.RID.String()))
return job, nil
}
// AddJobTx adds a job to the queue with the given task and parameters within a transaction.
// As a task you can either pass a function or a string with the task name
// (necessary if you want to use a task with a name set by you).
// It returns the created job or an error if something goes wrong.
func (q *Queuer) AddJobTx(tx *sql.Tx, task interface{}, parametersKeyed map[string]interface{}, parameters ...interface{}) (*model.Job, error) {
options := q.mergeOptions(nil)
job, err := q.addJobTx(tx, task, options, parametersKeyed, parameters...)
if err != nil {
q.log.Error("Error adding job with transaction", slog.String("error", err.Error()))
return nil, helper.NewError("adding job", err)
}
q.log.Info("Job added", slog.String("job_rid", job.RID.String()))
return job, nil
}
/*
AddJobWithOptions adds a job with the given task, options, and parameters.
As a task you can either pass a function or a string with the task name
(necessary if you want to use a task with a name set by you).
It returns the created job or an error if something goes wrong.
The options parameter allows you to specify additional options for the job,
such as scheduling, retry policies, and error handling.
If options are nil, the worker's default options will be used.
Example usage:
func AddJobExample(queuer *Queuer, param1 string, param2 int) {
options := &model.Options{
OnError: &model.OnError{
Timeout: 5,
MaxRetries: 2, Runs 3 times, first is not a retry
RetryDelay: 1,
RetryBackoff: model.RETRY_BACKOFF_NONE,
},
Schedule: &model.Schedule{
Start: time.Now().Add(10 * time.Second),
Interval: 5 * time.Second,
MaxCount: 3,
},
}
job, err := queuer.AddJobWithOptions(options, myTaskFunction, param1, param2)
if err != nil {
log.Fatalf("Failed to add job: %v", err)
}
}
*/
func (q *Queuer) AddJobWithOptions(options *model.Options, task interface{}, parametersKeyed map[string]interface{}, parameters ...interface{}) (*model.Job, error) {
q.mergeOptions(options)
job, err := q.addJob(task, options, parametersKeyed, parameters...)
if err != nil {
q.log.Error("Error adding job with options", slog.String("error", err.Error()))
return nil, helper.NewError("adding job", err)
}
q.log.Info("Job with options added", slog.String("job_rid", job.RID.String()))
return job, nil
}
// AddJobWithOptionsTx adds a job with the given task, options, and parameters within a transaction.
// As a task you can either pass a function or a string with the task name
// (necessary if you want to use a task with a name set by you).
// It returns the created job or an error if something goes wrong.
func (q *Queuer) AddJobWithOptionsTx(tx *sql.Tx, options *model.Options, task interface{}, parametersKeyed map[string]interface{}, parameters ...interface{}) (*model.Job, error) {
q.mergeOptions(options)
job, err := q.addJobTx(tx, task, options, parametersKeyed, parameters...)
if err != nil {
q.log.Error("Error adding job with transaction and options", slog.String("error", err.Error()))
return nil, helper.NewError("adding job", err)
}
q.log.Info("Job with options added", slog.String("job_rid", job.RID.String()))
return job, nil
}
// AddJobs adds a batch of jobs to the queue.
// It takes a slice of BatchJob, which contains the task, options, and parameters for each job.
// It returns an error if something goes wrong during the process.
func (q *Queuer) AddJobs(batchJobs []model.BatchJob) error {
var jobs []*model.Job
for _, batchJob := range batchJobs {
options := q.mergeOptions(batchJob.Options)
newJob, err := model.NewJob(batchJob.Task, options, batchJob.ParametersKeyed, batchJob.Parameters...)
if err != nil {
return fmt.Errorf("error creating job with job options: %v", err)
}
jobs = append(jobs, newJob)
}
err := q.dbJob.BatchInsertJobs(jobs)
if err != nil {
q.log.Error("Error inserting jobs", slog.String("error", err.Error()))
return helper.NewError("batch insert", err)
}
q.log.Info("Jobs added", slog.Int("count", len(jobs)))
return nil
}
// WaitForJobAdded waits for any job to start and returns the job.
// It listens for job insert events and returns the job when it is added to the queue.
func (q *Queuer) WaitForJobAdded() *model.Job {
jobStarted := make(chan *model.Job, 1)
outerReady := make(chan struct{})
ready := make(chan struct{})
go func() {
close(outerReady)
q.jobInsertListener.Listen(q.ctx, ready, func(job *model.Job) {
jobStarted <- job
})
}()
<-outerReady
<-ready
for {
select {
case job := <-jobStarted:
return job
case <-q.ctx.Done():
return nil
}
}
}
// WaitForJobFinished waits for a job to finish and returns the job.
// It listens for job delete events and returns the job when it is finished.
// If timeout is reached before the job finishes, it returns nil.
func (q *Queuer) WaitForJobFinished(jobRid uuid.UUID, timeout time.Duration) *model.Job {
// First check if the job is already finished (in archive)
if archivedJob, err := q.dbJob.SelectJobFromArchive(jobRid); err == nil && archivedJob != nil {
return archivedJob
}
jobFinished := make(chan *model.Job, 1)
outerReady := make(chan struct{})
ready := make(chan struct{})
go func() {
close(outerReady)
q.jobDeleteListener.Listen(q.ctx, ready, func(job *model.Job) {
if job.RID == jobRid {
jobFinished <- job
}
})
}()
<-outerReady
<-ready
timeoutChan := time.After(timeout)
for {
select {
case job := <-jobFinished:
return job
case <-timeoutChan:
return nil
case <-q.ctx.Done():
return nil
}
}
}
// CancelJob cancels a job with the given job RID.
// It retrieves the job from the database and cancels it.
// If the job is not found or already cancelled, it returns an error.
func (q *Queuer) CancelJob(jobRid uuid.UUID) (*model.Job, error) {
job, err := q.dbJob.SelectJob(jobRid)
if err != nil {
return nil, helper.NewError("selecting job", err)
}
err = q.cancelJob(job)
if err != nil {
return nil, helper.NewError("cancelling job", err)
}
return job, nil
}
// CancelAllJobsByWorker cancels all jobs assigned to a specific worker by its RID.
// It retrieves all jobs assigned to the worker and cancels each one.
// It returns an error if something goes wrong during the process.
func (q *Queuer) CancelAllJobsByWorker(workerRid uuid.UUID, entries int) error {
jobs, err := q.dbJob.SelectAllJobsByWorkerRID(workerRid, 0, entries)
if err != nil {
return helper.NewError("selecting jobs", err)
}
for _, job := range jobs {
err := q.cancelJob(job)
if err != nil {
return helper.NewError("cancelling job", err)
}
}
return nil
}
// ReaddJobFromArchive readds a job from the archive back to the queue.
func (q *Queuer) ReaddJobFromArchive(jobRid uuid.UUID) (*model.Job, error) {
job, err := q.dbJob.SelectJobFromArchive(jobRid)
if err != nil {
return nil, helper.NewError("selecting job from archive", err)
}
// Readd the job to the queue
newJob, err := q.AddJobWithOptions(job.Options, job.TaskName, job.ParametersKeyed, job.Parameters...)
if err != nil {
return nil, helper.NewError("readding job", err)
}
q.log.Info("Job readded", slog.String("job_rid", newJob.RID.String()))
return newJob, nil
}
// DeleteJob deletes a job by its RID.
func (q *Queuer) DeleteJob(jobRid uuid.UUID) error {
err := q.dbJob.DeleteJob(jobRid)
if err != nil {
return helper.NewError("deleting job", err)
}
q.log.Info("Job deleted", slog.String("job_rid", jobRid.String()))
return nil
}
// GetJob retrieves a job by its RID.
func (q *Queuer) GetJob(jobRid uuid.UUID) (*model.Job, error) {
job, err := q.dbJob.SelectJob(jobRid)
if err != nil {
return nil, helper.NewError("selecting job", err)
}
return job, nil
}
// GetJobs retrieves all jobs in the queue.
func (q *Queuer) GetJobs(lastId int, entries int) ([]*model.Job, error) {
jobs, err := q.dbJob.SelectAllJobs(lastId, entries)
if err != nil {
return nil, helper.NewError("selecting all jobs", err)
}
return jobs, nil
}
// GetJobsBySearch retrieves jobs that match the given search term.
func (q *Queuer) GetJobsBySearch(search string, lastId int, entries int) ([]*model.Job, error) {
jobs, err := q.dbJob.SelectAllJobsBySearch(search, lastId, entries)
if err != nil {
return nil, helper.NewError("selecting jobs by search", err)
}
return jobs, nil
}
// GetJobsByWorkerRID retrieves jobs assigned to a specific worker by its RID.
func (q *Queuer) GetJobsByWorkerRID(workerRid uuid.UUID, lastId int, entries int) ([]*model.Job, error) {
jobs, err := q.dbJob.SelectAllJobsByWorkerRID(workerRid, lastId, entries)
if err != nil {
return nil, helper.NewError("selecting jobs by worker RID", err)
}
return jobs, nil
}
// GetJobEnded retrieves a job that has ended (succeeded, cancelled or failed) by its RID.
func (q *Queuer) GetJobEnded(jobRid uuid.UUID) (*model.Job, error) {
job, err := q.dbJob.SelectJobFromArchive(jobRid)
if err != nil {
return nil, helper.NewError("selecting ended job", err)
}
return job, nil
}
// GetJobsEnded retrieves all jobs that have ended (succeeded, cancelled or failed).
func (q *Queuer) GetJobsEnded(lastId int, entries int) ([]*model.Job, error) {
jobs, err := q.dbJob.SelectAllJobsFromArchive(lastId, entries)
if err != nil {
return nil, helper.NewError("selecting ended jobs", err)
}
return jobs, nil
}
// GetJobsEndedBySearch retrieves ended jobs that match the given search term.
func (q *Queuer) GetJobsEndedBySearch(search string, lastId int, entries int) ([]*model.Job, error) {
jobs, err := q.dbJob.SelectAllJobsFromArchiveBySearch(search, lastId, entries)
if err != nil {
return nil, helper.NewError("selecting ended jobs by search", err)
}
return jobs, nil
}
// Internal
// mergeOptions merges the worker options with optional job options.
func (q *Queuer) mergeOptions(options *model.Options) *model.Options {
q.workerMu.RLock()
workerOptions := q.worker.Options
q.workerMu.RUnlock()
if options != nil && options.OnError == nil {
options.OnError = workerOptions
} else if options == nil && workerOptions != nil {
options = &model.Options{OnError: workerOptions}
}
return options
}
// addJob adds a job to the queue with all necessary parameters.
func (q *Queuer) addJob(task interface{}, options *model.Options, parametersKeyed map[string]interface{}, parameters ...interface{}) (*model.Job, error) {
newJob, err := model.NewJob(task, options, parametersKeyed, parameters...)
if err != nil {
return nil, helper.NewError("creating job", err)
}
job, err := q.dbJob.InsertJob(newJob)
if err != nil {
return nil, helper.NewError("inserting job", err)
}
return job, nil
}
// addJobTx adds a job to the queue with all necessary parameters.
func (q *Queuer) addJobTx(tx *sql.Tx, task interface{}, options *model.Options, parametersKeyed map[string]interface{}, parameters ...interface{}) (*model.Job, error) {
newJob, err := model.NewJob(task, options, parametersKeyed, parameters...)
if err != nil {
return nil, helper.NewError("creating job", err)
}
job, err := q.dbJob.InsertJobTx(tx, newJob)
if err != nil {
return nil, helper.NewError("inserting job", err)
}
return job, nil
}
// runJobInitial is called to run the next job in the queue.
func (q *Queuer) runJobInitial() error {
// Update job status to running with worker.
q.workerMu.RLock()
worker := q.worker
q.workerMu.RUnlock()
jobs, err := q.dbJob.UpdateJobsInitial(worker)
if err != nil {
return helper.NewError("updating jobs initial", err)
} else if len(jobs) == 0 {
return nil
}
for _, job := range jobs {
if job.Options != nil && job.Options.Schedule != nil && job.Options.Schedule.Start.After(time.Now()) {
scheduler, err := core.NewScheduler(
&job.Options.Schedule.Start,
q.runJob,
job,
)
if err != nil {
return helper.NewError("creating scheduler", err)
}
q.log.Info("Scheduling job", slog.String("job_rid", job.RID.String()), slog.String("schedule_start", job.Options.Schedule.Start.String()))
go scheduler.Go(q.ctx)
} else {
go q.runJob(job)
}
}
return nil
}
// waitForJob executes the job and returns the results or an error.
func (q *Queuer) waitForJob(job *model.Job) (results []interface{}, cancelled bool, err error) {
// Run job and update job status to completed with results
// TODO At this point the task should be available in the queuer,
// but we should probably still check if the task is available?
task := q.tasks[job.TaskName]
runner, err := core.NewRunnerFromJob(task, job)
if err != nil {
return nil, false, helper.NewError("creating runner", err)
}
q.activeRunners.Store(job.RID, runner)
go runner.Run(q.ctx)
select {
case err = <-runner.ErrorChannel:
break
case results = <-runner.ResultsChannel:
break
case <-q.ctx.Done():
runner.Cancel()
return nil, true, nil
}
q.activeRunners.Delete(job.RID)
return results, false, err
}
// retryJob retries the job with the given job error.
func (q *Queuer) retryJob(job *model.Job, jobErr error) {
if job.Options == nil || job.Options.OnError == nil || job.Options.OnError.MaxRetries <= 0 {
q.failJob(job, jobErr)
return
}
var err error
var results []interface{}
retryer, err := core.NewRetryer(
func() error {
q.log.Debug("Trying/retrying job", slog.String("job_rid", job.RID.String()))
results, _, err = q.waitForJob(job)
if err != nil {
return helper.NewError("retrying job", err)
}
return nil
},
job.Options.OnError,
)
if err != nil {
jobErr = helper.NewError("creating retryer", fmt.Errorf("%v, job error: %v", err, jobErr))
}
err = retryer.Retry()
if err != nil {
q.failJob(job, helper.NewError("retrying job", fmt.Errorf("%v, job error: %v", err, jobErr)))
} else {
q.succeedJob(job, results)
}
}
// runJob retries the job.
func (q *Queuer) runJob(job *model.Job) {
q.log.Info("Running job", slog.String("job_rid", job.RID.String()))
results, cancelled, err := q.waitForJob(job)
if cancelled {
return
} else if err != nil {
q.retryJob(job, err)
} else {
q.succeedJob(job, results)
}
}
func (q *Queuer) cancelJob(job *model.Job) error {
if job.Status == model.JobStatusRunning || job.Status == model.JobStatusScheduled || job.Status == model.JobStatusQueued {
job.Status = model.JobStatusCancelled
_, err := q.dbJob.UpdateJobFinal(job)
if err != nil && err.(helper.Error).Original != sql.ErrNoRows {
q.log.Error("Error updating job status to cancelled", slog.String("error", err.Error()))
} else if err == nil {
q.log.Info("Job cancelled", slog.String("job_rid", job.RID.String()))
}
}
return nil
}
// succeedJob updates the job status to succeeded and runs the next job if available.
func (q *Queuer) succeedJob(job *model.Job, results []interface{}) {
job.Status = model.JobStatusSucceeded
job.Results = results
q.endJob(job)
}
func (q *Queuer) failJob(job *model.Job, jobErr error) {
job.Status = model.JobStatusFailed
job.Error = jobErr.Error()
q.endJob(job)
}
func (q *Queuer) endJob(job *model.Job) {
q.workerMu.RLock()
workerID := q.worker.ID
q.workerMu.RUnlock()
if job.WorkerID != workerID {
return
}
endedJob, err := q.dbJob.UpdateJobFinal(job)
if err != nil {
if strings.Contains(err.Error(), sql.ErrNoRows.Error()) {
return
}
q.log.Error("Error updating finished job", slog.String("status", job.Status), slog.String("error", err.Error()))
} else {
q.log.Debug("Job ended", slog.String("status", endedJob.Status), slog.String("rid", endedJob.RID.String()))
// Readd scheduled jobs to the queue
if endedJob.Options != nil && endedJob.Options.Schedule != nil && endedJob.ScheduleCount < endedJob.Options.Schedule.MaxCount {
var newScheduledAt time.Time
if len(endedJob.Options.Schedule.NextInterval) > 0 {
// This worker should only have the current job if the NextIntervalFunc is available.
nextIntervalFunc, ok := q.nextIntervalFuncs[endedJob.Options.Schedule.NextInterval]
if !ok {
q.log.Error("NextIntervalFunc not found", slog.String("name", endedJob.Options.Schedule.NextInterval), slog.String("job_rid", endedJob.RID.String()))
return
}
newScheduledAt = nextIntervalFunc(*endedJob.ScheduledAt, endedJob.ScheduleCount)
} else {
newScheduledAt = endedJob.ScheduledAt.Add(time.Duration(endedJob.ScheduleCount) * endedJob.Options.Schedule.Interval)
}
endedJob.ScheduledAt = &newScheduledAt
endedJob.Status = model.JobStatusScheduled
job, err := q.dbJob.InsertJob(endedJob)
if err != nil {
q.log.Error("Error readding scheduled job", slog.String("job_rid", endedJob.RID.String()), slog.String("error", err.Error()))
}
q.log.Info("Job added for next iteration to the queue", slog.String("job_rid", job.RID.String()))
}
}
// Try to run the next job in the queue
err = q.runJobInitial()
if err != nil {
q.log.Error("Error running next job", slog.String("error", err.Error()))
}
}
package queuer
import (
"fmt"
"github.com/siherrmann/queuer/helper"
"github.com/siherrmann/queuer/model"
)
// ListenForJobInsert listens for job insert events and notifies the provided function when a job is added.
func (q *Queuer) ListenForJobUpdate(notifyFunction func(data *model.Job)) error {
if q == nil || q.ctx == nil {
return helper.NewError("queuer check", fmt.Errorf("cannot listen with uninitialized or not running Queuer"))
}
outerReady := make(chan struct{})
ready := make(chan struct{})
go func() {
close(outerReady)
q.jobUpdateListener.Listen(q.ctx, ready, notifyFunction)
}()
<-ready
<-outerReady
return nil
}
// ListenForJobInsert listens for job insert events and notifies the provided function when a job is added.
func (q *Queuer) ListenForJobDelete(notifyFunction func(data *model.Job)) error {
if q == nil || q.ctx == nil {
return helper.NewError("queuer check", fmt.Errorf("cannot listen with uninitialized or not running Queuer"))
}
outerReady := make(chan struct{})
ready := make(chan struct{})
go func() {
close(outerReady)
q.jobDeleteListener.Listen(q.ctx, ready, notifyFunction)
}()
<-ready
<-outerReady
return nil
}
package queuer
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/siherrmann/queuer/core"
"github.com/siherrmann/queuer/helper"
"github.com/siherrmann/queuer/model"
)
func (q *Queuer) masterTicker(ctx context.Context, oldMaster *model.Master, masterSettings *model.MasterSettings) error {
if oldMaster == nil {
return helper.NewError("old master check", fmt.Errorf("old master is nil"))
}
if masterSettings.JobDeleteThreshold == 0 {
err := q.dbJob.RemoveRetentionArchive()
if err != nil {
return helper.NewError("removing retention archive", err)
}
} else if oldMaster.Settings.JobDeleteThreshold != masterSettings.JobDeleteThreshold {
err := q.dbJob.RemoveRetentionArchive()
if err != nil {
q.log.Warn("Error removing retention archive (expected if none existed)", slog.String("error", err.Error()))
}
err = q.dbJob.AddRetentionArchive(masterSettings.JobDeleteThreshold)
if err != nil {
return helper.NewError("adding retention archive", err)
}
}
ctxInner, cancel := context.WithCancel(ctx)
ticker, err := core.NewTicker(
masterSettings.MasterPollInterval,
func() {
q.workerMu.RLock()
worker := q.worker
q.workerMu.RUnlock()
_, err := q.dbMaster.UpdateMaster(worker, masterSettings)
if err != nil {
err := q.pollMasterTicker(ctx, masterSettings)
if err != nil {
q.log.Error("Error restarting poll master ticker", slog.String("error", err.Error()))
}
cancel()
return
}
err = q.checkStaleWorkers()
if err != nil {
q.log.Error("Error checking for stale workers", slog.String("error", err.Error()))
}
err = q.deleteStaleWorkers()
if err != nil {
q.log.Error("Error deleting stale workers", slog.String("error", err.Error()))
}
err = q.checkStaleJobs()
if err != nil {
q.log.Error("Error checking for stale jobs", slog.String("error", err.Error()))
}
// Here we can add any additional logic that needs to run periodically while the worker is master.
// This could include stale jobs, cleaning up the job database etc.
},
)
if err != nil {
return helper.NewError("creating ticker", err)
}
q.log.Info("Starting master poll ticker...")
go ticker.Go(ctxInner)
return nil
}
func (q *Queuer) checkStaleWorkers() error {
staleThreshold := 2 * time.Minute
staleCount, err := q.dbWorker.UpdateStaleWorkers(staleThreshold)
if err != nil {
return helper.NewError("updating stale workers", err)
}
if staleCount > 0 {
q.log.Info("Updated stale workers", slog.Int("count", staleCount))
}
return nil
}
func (q *Queuer) deleteStaleWorkers() error {
deleteThreshold := 10 * time.Minute
deletedCount, err := q.dbWorker.DeleteStaleWorkers(deleteThreshold)
if err != nil {
return helper.NewError("deleting stale workers", err)
}
if deletedCount > 0 {
q.log.Info("Deleted stale workers", slog.Int("count", deletedCount))
}
return nil
}
func (q *Queuer) checkStaleJobs() error {
staleCount, err := q.dbJob.UpdateStaleJobs()
if err != nil {
return helper.NewError("updating stale jobs", err)
}
if staleCount > 0 {
q.log.Info("Updated stale jobs", slog.Int("count", staleCount))
}
return nil
}
package queuer
import (
"log"
"log/slog"
"slices"
"github.com/siherrmann/queuer/helper"
"github.com/siherrmann/queuer/model"
)
// AddNextIntervalFunc adds a NextIntervalFunc to the worker's available next interval functions.
// It takes a NextIntervalFunc and adds it to the worker's AvailableNextIntervalFuncs.
// The function name is derived from the NextIntervalFunc interface using helper.GetTaskNameFromInterface.
// The function is meant to be used before starting the Queuer to ensure that the worker has access to the function.
// It checks if the function is nil or already exists in the worker's available next interval functions.
//
// If the function is nil or already exists, it panics.
// It returns the updated worker after adding the function.
func (q *Queuer) AddNextIntervalFunc(nif model.NextIntervalFunc) *model.Worker {
if nif == nil {
panic("NextIntervalFunc cannot be nil")
}
nifName, err := helper.GetTaskNameFromInterface(nif)
if err != nil {
log.Panicf("error getting function name: %s", err.Error())
}
if slices.Contains(q.worker.AvailableNextIntervalFuncs, nifName) {
log.Panicf("NextIntervalFunc already exists: %s", nifName)
}
q.nextIntervalFuncs[nifName] = nif
q.worker.AvailableNextIntervalFuncs = append(q.worker.AvailableNextIntervalFuncs, nifName)
worker, err := q.dbWorker.UpdateWorker(q.worker)
if err != nil {
log.Panicf("error updating worker: %s", err.Error())
}
q.log.Info("NextInterval function added", slog.String("name", nifName))
return worker
}
// AddNextIntervalFuncWithName adds a NextIntervalFunc to
// the worker's available next interval functions with a specific name.
// It takes a NextIntervalFunc and a name, checks if the function is nil
// or already exists in the worker's available next interval functions,
// and adds it to the worker's AvailableNextIntervalFuncs.
//
// This function is useful when you want to add a NextIntervalFunc
// with a specific name that you control, rather than deriving it from the function itself.
// It ensures that the function is not nil and that the name does not already exist
// in the worker's available next interval functions.
//
// If the function is nil or already exists, it panics.
// It returns the updated worker after adding the function with the specified name.
func (q *Queuer) AddNextIntervalFuncWithName(nif model.NextIntervalFunc, name string) *model.Worker {
if nif == nil {
panic("NextIntervalFunc cannot be nil")
}
if slices.Contains(q.worker.AvailableNextIntervalFuncs, name) {
log.Panicf("NextIntervalFunc with name already exists: %s", name)
}
q.nextIntervalFuncs[name] = nif
q.worker.AvailableNextIntervalFuncs = append(q.worker.AvailableNextIntervalFuncs, name)
worker, err := q.dbWorker.UpdateWorker(q.worker)
if err != nil {
log.Panicf("error updating worker: %s", err.Error())
}
q.log.Info("NextInterval function added", slog.String("name", name))
return worker
}
package queuer
import (
"log"
"log/slog"
"slices"
"github.com/siherrmann/queuer/model"
)
// AddTask adds a new task to the queuer.
// It creates a new task with the provided task interface, adds it to the worker's available tasks,
// and updates the worker in the database.
// The task name is automatically generated based on the task's function name (eg. main.TestTask).
//
// If the task creation fails, it logs a panic error and exits the program.
// It returns the newly created task.
func (q *Queuer) AddTask(task interface{}) *model.Task {
newTask, err := model.NewTask(task)
if err != nil {
log.Panicf("error creating new task: %s", err.Error())
}
if slices.Contains(q.worker.AvailableTasks, newTask.Name) {
log.Panicf("task already exists: %s", newTask.Name)
}
q.tasks[newTask.Name] = newTask
q.worker.AvailableTasks = append(q.worker.AvailableTasks, newTask.Name)
// Update worker in DB
_, err = q.dbWorker.UpdateWorker(q.worker)
if err != nil {
log.Panicf("error updating worker: %s", err.Error())
}
q.log.Info("Task added", slog.String("task_name", newTask.Name))
return newTask
}
// AddTaskWithName adds a new task with a specific name to the queuer.
// It creates a new task with the provided task interface and name, adds it to the worker's available tasks,
// and updates the worker in the database.
//
// If task creation fails, it logs a panic error and exits the program.
// It returns the newly created task.
func (q *Queuer) AddTaskWithName(task interface{}, name string) *model.Task {
newTask, err := model.NewTaskWithName(task, name)
if err != nil {
log.Panicf("error creating new task: %s", err.Error())
}
if slices.Contains(q.worker.AvailableTasks, name) {
log.Panicf("task already exists: %s", newTask.Name)
}
q.tasks[newTask.Name] = newTask
q.worker.AvailableTasks = append(q.worker.AvailableTasks, newTask.Name)
// Update worker in DB
_, err = q.dbWorker.UpdateWorker(q.worker)
if err != nil {
log.Panicf("error updating worker: %s", err.Error())
}
q.log.Info("Task added", slog.String("name", newTask.Name))
return newTask
}
package queuer
import (
"fmt"
"github.com/google/uuid"
"github.com/siherrmann/queuer/helper"
"github.com/siherrmann/queuer/model"
)
// GetWorker retrieves a worker by its RID (Resource Identifier).
func (q *Queuer) GetWorker(workerRid uuid.UUID) (*model.Worker, error) {
worker, err := q.dbWorker.SelectWorker(workerRid)
if err != nil {
return nil, helper.NewError("selecting worker", err)
}
return worker, nil
}
// GetWorkers retrieves a list of workers starting from the lastId and returning the specified number of entries.
func (q *Queuer) GetWorkers(lastId int, entries int) ([]*model.Worker, error) {
if lastId < 0 {
return nil, helper.NewError("lastId check", fmt.Errorf("lastId cannot be negative"))
}
if entries <= 0 {
return nil, helper.NewError("entries check", fmt.Errorf("entries must be greater than zero"))
}
workers, err := q.dbWorker.SelectAllWorkers(lastId, entries)
if err != nil {
return nil, helper.NewError("selecting workers", err)
}
return workers, nil
}
// GetWorkersBySearch retrieves workers that match the given search term.
func (q *Queuer) GetWorkersBySearch(search string, lastId int, entries int) ([]*model.Worker, error) {
workers, err := q.dbWorker.SelectAllWorkersBySearch(search, lastId, entries)
if err != nil {
return nil, helper.NewError("selecting workers by search", err)
}
return workers, nil
}
// GetAllConnections retrieves all connections from the database.
func (q *Queuer) GetConnections() ([]*model.Connection, error) {
connections, err := q.dbWorker.SelectAllConnections()
if err != nil {
return nil, helper.NewError("selecting connections", err)
}
return connections, nil
}