package core import "log" type Broadcaster[T any] struct { name string listeners map[chan T]bool } // NewBroadcaster creates a new Broadcaster instance for the specified type T. // It initializes a map to hold the listeners. // The name parameter is used to identify the broadcaster. // It returns a pointer to the new Broadcaster instance. func NewBroadcaster[T any](name string) *Broadcaster[T] { return &Broadcaster[T]{ name: name, listeners: make(map[chan T]bool), } } // Subscribe adds a new listener channel to the broadcaster. // It returns a channel of type T that can be used to receive messages. // The channel is buffered (100) to allow for non-blocking sends. func (b *Broadcaster[T]) Subscribe() chan T { ch := make(chan T, 100) b.listeners[ch] = true return ch } // Unsubscribe removes a listener channel from the broadcaster. // It closes the channel to signal that no more messages will be sent. // If the channel does not exist in the listeners map, it does nothing. func (b *Broadcaster[T]) Unsubscribe(ch chan T) { if _, ok := b.listeners[ch]; ok { delete(b.listeners, ch) close(ch) } } // Broadcast sends a message of type T to all subscribed listeners. // It logs the number of listeners and the name of the broadcaster. // If a listener's channel is full, it skips sending the message to avoid blocking. // This is a non-blocking send operation. func (b *Broadcaster[T]) Broadcast(msg T) { log.Printf("Broadcasting message to %d listeners for %v", len(b.listeners), b.name) for ch := range b.listeners { select { case ch <- msg: default: // If the channel is full, we skip sending the message // to avoid blocking the broadcaster. // This is a non-blocking send. } } }
package core import ( "context" "fmt" "sync" ) type Listener[T any] struct { broadcaster *Broadcaster[T] waitgroup *sync.WaitGroup } // NewListener creates a new Listener instance for the specified type T. // It initializes a broadcaster and a waitgroup to manage concurrent processing of notifications. // It returns a pointer to the new Listener instance. func NewListener[T any](broadcaster *Broadcaster[T]) (*Listener[T], error) { if broadcaster == nil { return nil, fmt.Errorf("broadcaster cannot be nil") } return &Listener[T]{ broadcaster: broadcaster, waitgroup: &sync.WaitGroup{}, }, nil } // Listen listens for notifications on the broadcaster's channel. // It takes a context for cancellation, a ready channel to signal readiness, // and a notifyFunction that will be called with the data received. // The listener will process notifications in a separate goroutine to avoid blocking. // If the context is done, it will stop listening and return. // The ready channel is closed in the first for loop iteration to signal that the listener is ready. func (l *Listener[T]) Listen(ctx context.Context, ready chan struct{}, notifyFunction func(data T)) { if notifyFunction == nil { return } channel := l.broadcaster.Subscribe() readySignaled := false for { select { case <-ctx.Done(): if !readySignaled { close(ready) } return case data := <-channel: l.waitgroup.Add(1) if !readySignaled { readySignaled = true close(ready) } go func(d T) { defer l.waitgroup.Done() notifyFunction(d) }(data) default: if !readySignaled { readySignaled = true close(ready) } } } } // Notify sends a notification with the provided data to all listeners. // It uses the broadcaster to broadcast the data to all subscribed channels. // This method is typically called when an event occurs that needs to be communicated to all listeners. // As Broadcast is not blocking it does not block and will not wait for listeners to process the notification. func (l *Listener[T]) Notify(data T) { l.broadcaster.Broadcast(data) } // WaitForNotificationsProcessed waits for all notifications to be processed. // It blocks until all goroutines that were started to process notifications have completed. // This is useful to ensure that all notifications have been handled before proceeding with further operations. // It is typically called after calling Listen and Notify to ensure that all processing is complete. func (l *Listener[T]) WaitForNotificationsProcessed() { l.waitgroup.Wait() }
package core import ( "fmt" "time" "github.com/siherrmann/queuer/model" ) type Retryer struct { function func() error sleep time.Duration options *model.OnError } // NewRetryer creates a new Retryer instance. // It initializes the retryer with a function to execute, a sleep duration for retries, // and options for retry behavior. // It returns a pointer to the new Retryer instance or an error if the options are invalid. func NewRetryer(function func() error, options *model.OnError) (*Retryer, error) { if options == nil || options.MaxRetries <= 0 || options.RetryDelay < 0 { return nil, fmt.Errorf("no valid retry options provided") } return &Retryer{ function: function, sleep: time.Duration(options.RetryDelay) * time.Second, options: options, }, nil } // Retry attempts to execute the function up to MaxRetries times. // It sleeps for the specified duration between retries. // The retry behavior is determined by the RetryBackoff option. // If the function returns an error, it will retry according to the specified backoff strategy. // If all retries fail, it returns the last error encountered. // // The function is executed in a loop until it succeeds or the maximum number of retries is reached // If the function succeeds, it returns nil. // // The backoff strategies are: // - RETRY_BACKOFF_NONE: No backoff, retries immediately. // - RETRY_BACKOFF_LINEAR: Increases the sleep duration linearly by the initial delay. // - RETRY_BACKOFF_EXPONENTIAL: Doubles the sleep duration after each retry. func (r *Retryer) Retry() (err error) { for i := 0; i < r.options.MaxRetries; i++ { err = r.function() if err != nil { time.Sleep(r.sleep) if r.options.RetryBackoff == model.RETRY_BACKOFF_NONE { continue } else if r.options.RetryBackoff == model.RETRY_BACKOFF_LINEAR { r.sleep += time.Duration(r.options.RetryDelay) * time.Second continue } else if r.options.RetryBackoff == model.RETRY_BACKOFF_EXPONENTIAL { r.sleep *= 2 continue } } break } return err }
package core import ( "context" "fmt" "math" "reflect" "time" "github.com/siherrmann/queuer/helper" "github.com/siherrmann/queuer/model" ) type Runner struct { cancel context.CancelFunc Options *model.Options Task interface{} Parameters model.Parameters // Result channel to return results ResultsChannel chan []interface{} ErrorChannel chan error } // NewRunner creates a new Runner instance for the specified task and parameters. // It checks if the task and parameters are valid and returns a pointer to the new Runner instance. // It returns an error if the task or parameters are invalid. func NewRunner(options *model.Options, task interface{}, parameters ...interface{}) (*Runner, error) { taskInputParameters, err := helper.GetInputParametersFromTask(task) if err != nil { return nil, fmt.Errorf("error getting input parameters of task: %v", err) } else if len(taskInputParameters) != len(parameters) { return nil, fmt.Errorf("task expects %d parameters, got %d", len(taskInputParameters), len(parameters)) } for i, param := range parameters { // Convert json float to int if the parameter is int if taskInputParameters[i].Kind() == reflect.Int && reflect.TypeOf(param).Kind() == reflect.Float64 { parameters[i] = int(param.(float64)) } else if taskInputParameters[i].Kind() != reflect.TypeOf(param).Kind() { return nil, fmt.Errorf("parameter %d of task must be of type %s, got %s", i, taskInputParameters[i].Kind(), reflect.TypeOf(param).Kind()) } } err = helper.CheckValidTaskWithParameters(task, parameters...) if err != nil { return nil, fmt.Errorf("error checking task: %v", err) } return &Runner{ Task: task, Parameters: model.Parameters(parameters), Options: options, ResultsChannel: make(chan []interface{}, 1), ErrorChannel: make(chan error, 1), }, nil } // NewRunnerFromJob creates a new Runner instance from a job. // It initializes the runner with the job's task and parameters. // It returns a pointer to the new Runner instance or an error if the job is invalid. func NewRunnerFromJob(task *model.Task, job *model.Job) (*Runner, error) { if task == nil || job == nil { return nil, fmt.Errorf("task and job cannot be nil") } parameters := make([]interface{}, len(job.Parameters)) copy(parameters, job.Parameters) runner, err := NewRunner(job.Options, task.Task, job.Parameters...) if err != nil { return nil, fmt.Errorf("error creating runner from job: %v", err) } return runner, nil } // Run executes the task with the provided parameters. // It will return results on ResultsChannel and errors on ErrorChannel. // If the task panics, it will send the panic value to ErrorChannel. // The main intended use of this function is to run the task in a separate goroutine with panic recovery. // It uses a context to manage cancellation and timeout. // If the context is done, it will cancel the task and return an error. // The context's timeout is set based on the OnError options if provided, otherwise it uses a cancelable context. func (r *Runner) Run(ctx context.Context) { if r.Options != nil && r.Options.OnError != nil && r.Options.OnError.Timeout > 0 { ctx, r.cancel = context.WithTimeout( ctx, time.Duration(math.Round(r.Options.OnError.Timeout*1000))*time.Millisecond, ) } else { ctx, r.cancel = context.WithCancel(ctx) } resultsChannel := make(chan []interface{}, 1) errorChannel := make(chan error, 1) panicChan := make(chan interface{}) go func() { defer func() { if p := recover(); p != nil { panicChan <- p } }() taskFunc := reflect.ValueOf(r.Task) results := taskFunc.Call(r.Parameters.ToReflectValues()) resultValues := []interface{}{} for _, result := range results { resultValues = append(resultValues, result.Interface()) } outputParameters, err := helper.GetOutputParametersFromTask(r.Task) if err != nil { errorChannel <- fmt.Errorf("error getting output parameters of task: %v", err) return } var ok bool if len(resultValues) > 0 { if err, ok = resultValues[len(resultValues)-1].(error); ok || (outputParameters[1].String() == "error" && resultValues[len(resultValues)-1] == nil) { resultValues = resultValues[:len(resultValues)-1] } } if err != nil { errorChannel <- fmt.Errorf("runner failed with error: %v", err) } else { resultsChannel <- resultValues } }() for { select { case p := <-panicChan: r.ErrorChannel <- fmt.Errorf("panic running task: %v", p) return case err := <-errorChannel: r.ErrorChannel <- fmt.Errorf("error running task: %v", err) return case results := <-resultsChannel: r.ResultsChannel <- results return case <-ctx.Done(): r.ErrorChannel <- fmt.Errorf("error running task: %v", ctx.Err()) return } } } func (r *Runner) Cancel(onCancel ...func()) { if len(onCancel) > 0 { for _, cancelFunc := range onCancel { if cancelFunc != nil { cancelFunc() } } } // Cancel the context if it exists if r.cancel != nil { r.cancel() } }
package core import ( "context" "fmt" "log" "reflect" "time" "github.com/siherrmann/queuer/helper" "github.com/siherrmann/queuer/model" ) type Scheduler struct { Task interface{} Parameters model.Parameters StartTime *time.Time } // NewScheduler creates a new Scheduler instance for the specified task and parameters. // It checks if the task and parameters are valid and returns a pointer to the new Scheduler instance. // It returns an error if the task or parameters are invalid. func NewScheduler(startTime *time.Time, task interface{}, parameters ...interface{}) (*Scheduler, error) { err := helper.CheckValidTaskWithParameters(task, parameters...) if err != nil { return nil, fmt.Errorf("error checking task: %v", err) } return &Scheduler{ Task: task, Parameters: model.Parameters(parameters), StartTime: startTime, }, nil } // Go starts the scheduler to run the task at the specified start time. // It creates a new Runner instance and runs the task after the specified duration. // If the start time is nil, it runs the task immediately. // It uses a context to manage cancellation and timeout. // If the context is done, it will cancel the task and return an error. // The context's timeout is set based on the OnError options if provided, otherwise it uses a cancelable context. func (s *Scheduler) Go(ctx context.Context) { var duration time.Duration if s.StartTime != nil { duration = time.Until(*s.StartTime) } runner, err := NewRunner( nil, func() { time.AfterFunc(duration, func() { reflect.ValueOf(s.Task).Call(s.Parameters.ToReflectValues()) }) }, ) if err != nil { log.Printf("Error creating runner: %v", err) return } go runner.Run(ctx) }
package core import ( "context" "fmt" "log" "reflect" "time" "github.com/siherrmann/queuer/helper" ) // Ticker represents a recurring task runner. type Ticker struct { interval time.Duration runner *Runner } // NewTicker creates and returns a new Ticker instance. // It initializes the ticker with a specified interval and a task to run. // The task must be valid and compatible with the provided parameters. // It returns a pointer to the new Ticker instance or an error if the interval, task or parameters are invalid. func NewTicker(interval time.Duration, task interface{}, parameters ...interface{}) (*Ticker, error) { if interval <= 0 { return nil, fmt.Errorf("ticker interval must be greater than zero") } err := helper.CheckValidTaskWithParameters(task, parameters...) if err != nil { return nil, fmt.Errorf("error checking task: %s", reflect.TypeOf(task).Kind()) } runner, err := NewRunner(nil, task, parameters...) if err != nil { return nil, fmt.Errorf("error creating runner: %v", err) } return &Ticker{ interval: interval, runner: runner, }, nil } // Go starts the Ticker. It runs the task at the specified interval // until the provided context is cancelled. // It uses a ticker to trigger the task execution at the specified interval. // If the context is done, it will stop the ticker and return. // The task is run in a separate goroutine to avoid blocking the ticker. // If the task returns an error, it will log the error. // The ticker will continue to run until the context is cancelled or an error occurs. func (t *Ticker) Go(ctx context.Context) { go t.runner.Run(ctx) ticker := time.NewTicker(t.interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: go t.runner.Run(ctx) case err := <-t.runner.ErrorChannel: if err != nil { // TODO also implement error channel? log.Printf("Error running task: %v", err) return } } } }
package database import ( "context" "database/sql" "fmt" "log" "time" "github.com/google/uuid" "github.com/lib/pq" "github.com/siherrmann/queuer/helper" "github.com/siherrmann/queuer/model" ) // JobDBHandlerFunctions defines the interface for Job database operations. type JobDBHandlerFunctions interface { CheckTablesExistance() (bool, error) CreateTable() error DropTables() error InsertJob(job *model.Job) (*model.Job, error) InsertJobTx(tx *sql.Tx, job *model.Job) (*model.Job, error) BatchInsertJobs(jobs []*model.Job) error UpdateJobsInitial(worker *model.Worker) ([]*model.Job, error) UpdateJobFinal(job *model.Job) (*model.Job, error) DeleteJob(rid uuid.UUID) error SelectJob(rid uuid.UUID) (*model.Job, error) SelectAllJobs(lastID int, entries int) ([]*model.Job, error) SelectAllJobsByWorkerRID(workerRid uuid.UUID, lastID int, entries int) ([]*model.Job, error) SelectAllJobsBySearch(search string, lastID int, entries int) ([]*model.Job, error) // Job Archive AddRetentionArchive(retention time.Duration) error RemoveRetentionArchive() error SelectJobFromArchive(rid uuid.UUID) (*model.Job, error) SelectAllJobsFromArchive(lastID int, entries int) ([]*model.Job, error) SelectAllJobsFromArchiveBySearch(search string, lastID int, entries int) ([]*model.Job, error) } // JobDBHandler implements JobDBHandlerFunctions and holds the database connection. type JobDBHandler struct { db *helper.Database } // NewJobDBHandler creates a new instance of JobDBHandler. // It initializes the database connection and optionally drops existing tables. // If withTableDrop is true, it will drop the existing job tables before creating new ones func NewJobDBHandler(dbConnection *helper.Database, withTableDrop bool) (*JobDBHandler, error) { if dbConnection == nil { return nil, fmt.Errorf("database connection is nil") } jobDbHandler := &JobDBHandler{ db: dbConnection, } if withTableDrop { err := jobDbHandler.DropTables() if err != nil { return nil, fmt.Errorf("error dropping job table: %#v", err) } } err := jobDbHandler.CreateTable() if err != nil { return nil, fmt.Errorf("error creating job table: %#v", err) } return jobDbHandler, nil } // CheckTablesExistance checks if the 'job' and 'job_archive' tables exist in the database. // It returns true if both tables exist, otherwise false. func (r JobDBHandler) CheckTablesExistance() (bool, error) { jobExists, err := r.db.CheckTableExistance("job") if err != nil { return false, fmt.Errorf("error checking job table existence: %w", err) } jobArchiveExists, err := r.db.CheckTableExistance("job_archive") if err != nil { return false, fmt.Errorf("error checking job archive table existence: %w", err) } return jobExists && jobArchiveExists, err } // CreateTable creates the 'job' and 'job_archive' tables in the database. // If the tables already exist, it does not create them again. // It also creates a trigger for notifying events on the table and all necessary indexes. func (r JobDBHandler) CreateTable() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _, err := r.db.Instance.ExecContext( ctx, `CREATE TABLE IF NOT EXISTS job ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, rid UUID UNIQUE DEFAULT gen_random_uuid(), worker_id BIGINT DEFAULT 0, worker_rid UUID DEFAULT NULL, options JSONB DEFAULT '{}', task_name VARCHAR(100) DEFAULT '', parameters JSONB DEFAULT '[]', status VARCHAR(50) DEFAULT 'QUEUED', scheduled_at TIMESTAMP DEFAULT NULL, started_at TIMESTAMP DEFAULT NULL, schedule_count INT DEFAULT 0, attempts INT DEFAULT 0, results JSONB DEFAULT '[]', error TEXT DEFAULT '', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS job_archive ( LIKE job INCLUDING DEFAULTS INCLUDING CONSTRAINTS, PRIMARY KEY (id, updated_at) ); SELECT create_hypertable('job_archive', by_range('updated_at'), if_not_exists => TRUE);`, ) if err != nil { log.Panicf("error creating job table: %#v", err) } _, err = r.db.Instance.ExecContext( ctx, `CREATE OR REPLACE TRIGGER job_notify_event BEFORE INSERT OR UPDATE ON job FOR EACH ROW EXECUTE PROCEDURE notify_event();`, ) if err != nil { log.Panicf("error creating notify trigger on job table: %#v", err) } _, err = r.db.Instance.ExecContext( ctx, `CREATE OR REPLACE TRIGGER job_archive_notify_event BEFORE INSERT ON job_archive FOR EACH ROW EXECUTE PROCEDURE notify_event();`, ) if err != nil { log.Panicf("error creating notify trigger on job_archive table: %#v", err) } _, err = r.db.Instance.Exec( `CREATE INDEX IF NOT EXISTS idx_next_interval ON job USING HASH ((options->'schedule'->>'next_interval'));`, ) if err != nil { log.Panicf("error creating index on next_interval: %#v", err) } err = r.db.CreateIndexes("job", "worker_id", "worker_rid", "status", "created_at", "updated_at") if err != nil { log.Panic(err) } r.db.Logger.Println("created table job") return nil } // DropTables drops the 'job' and 'job_archive' tables from the database. func (r JobDBHandler) DropTables() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() query := `DROP TABLE IF EXISTS job` _, err := r.db.Instance.ExecContext(ctx, query) if err != nil { return fmt.Errorf("error dropping job table: %#v", err) } query = `DROP TABLE IF EXISTS job_archive` _, err = r.db.Instance.ExecContext(ctx, query) if err != nil { return fmt.Errorf("error dropping job table: %#v", err) } r.db.Logger.Printf("Dropped table job") return nil } // InsertJob inserts a new job record into the database. func (r JobDBHandler) InsertJob(job *model.Job) (*model.Job, error) { newJob := &model.Job{} row := r.db.Instance.QueryRow( `INSERT INTO job (options, task_name, parameters, status, scheduled_at, schedule_count) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, schedule_count, attempts, created_at, updated_at;`, job.Options, job.TaskName, job.Parameters, job.Status, job.ScheduledAt, job.ScheduleCount, ) err := row.Scan( &newJob.ID, &newJob.RID, &newJob.WorkerID, &newJob.WorkerRID, &newJob.Options, &newJob.TaskName, &newJob.Parameters, &newJob.Status, &newJob.ScheduledAt, &newJob.ScheduleCount, &newJob.Attempts, &newJob.CreatedAt, &newJob.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error scanning new job: %w", err) } return newJob, nil } // InsertJobTx inserts a new job record into the database within a transaction. func (r JobDBHandler) InsertJobTx(tx *sql.Tx, job *model.Job) (*model.Job, error) { newJob := &model.Job{} row := tx.QueryRow( `INSERT INTO job (options, task_name, parameters, status, scheduled_at) VALUES ($1, $2, $3, $4, $5) RETURNING id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, schedule_count, attempts, created_at, updated_at;`, job.Options, job.TaskName, job.Parameters, job.Status, job.ScheduledAt, ) err := row.Scan( &newJob.ID, &newJob.RID, &newJob.WorkerID, &newJob.WorkerRID, &newJob.Options, &newJob.TaskName, &newJob.Parameters, &newJob.Status, &newJob.ScheduledAt, &newJob.ScheduleCount, &newJob.Attempts, &newJob.CreatedAt, &newJob.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error scanning new job: %w", err) } return newJob, nil } // BatchInsertJobs inserts multiple job records into the database in a single transaction. func (r JobDBHandler) BatchInsertJobs(jobs []*model.Job) error { if len(jobs) == 0 { return nil } tx, err := r.db.Instance.Begin() if err != nil { return fmt.Errorf("error starting transaction: %w", err) } stmt, err := tx.Prepare(pq.CopyIn("job", "options", "task_name", "parameters", "scheduled_at")) if err != nil { return fmt.Errorf("error preparing statement for batch insert: %w", err) } for _, job := range jobs { var err error optionsJSON := []byte("{}") parametersJSON := []byte("[]") if job.Options != nil { optionsJSON, err = job.Options.Marshal() if err != nil { return fmt.Errorf("error marshaling job options for batch insert: %w", err) } } if job.Parameters != nil { parametersJSON, err = job.Parameters.Marshal() if err != nil { return fmt.Errorf("error marshaling job parameters for batch insert: %v", err) } } _, err = stmt.Exec( string(optionsJSON), job.TaskName, string(parametersJSON), job.ScheduledAt, ) if err != nil { return fmt.Errorf("error executing batch insert for job %s: %w", job.RID, err) } } _, err = stmt.Exec() if err != nil { return fmt.Errorf("error executing final batch insert: %w", err) } err = stmt.Close() if err != nil { return fmt.Errorf("error closing prepared statement: %w", err) } err = tx.Commit() if err != nil { return fmt.Errorf("error committing transaction: %w", err) } return nil } // UpdateJobsInitial updates an existing queued non locked job record in the database. // // Checks if the job is in 'QUEUED' or 'FAILED' status and if the worker can handle the task. // The worker must have the task in its available tasks and the next interval must be available if set. // If the job is scheduled it must be scheduled within the next 10 minutes. // It updates the job to 'RUNNING' status, increments the schedule count and attempts, and sets the started_at timestamp. // It uses the `FOR UPDATE SKIP LOCKED` clause to avoid locking issues with concurrent updates. // // It returns the updated job records. func (r JobDBHandler) UpdateJobsInitial(worker *model.Worker) ([]*model.Job, error) { rows, err := r.db.Instance.Query( `WITH current_concurrency AS ( SELECT COUNT(*) AS count FROM job WHERE worker_id = $1 AND status = 'RUNNING' ), current_worker AS ( SELECT id, rid, available_tasks, available_next_interval, max_concurrency, COALESCE(cc.count, 0) AS current_concurrency FROM worker, current_concurrency AS cc WHERE id = $1 AND (max_concurrency > COALESCE(cc.count, 0)) FOR UPDATE ), job_ids AS ( SELECT j.id FROM current_worker AS cw, current_concurrency AS cc, LATERAL ( SELECT job.id FROM job WHERE job.task_name = ANY(cw.available_tasks::VARCHAR[]) AND ( options->'schedule'->>'next_interval' IS NULL OR options->'schedule'->>'next_interval' = '' OR options->'schedule'->>'next_interval' = ANY(cw.available_next_interval::VARCHAR[])) AND ( job.status = 'QUEUED' OR (job.status = 'SCHEDULED' AND job.scheduled_at <= (CURRENT_TIMESTAMP + '10 minutes'::INTERVAL)) ) ORDER BY job.created_at ASC LIMIT (cw.max_concurrency - COALESCE(cc.count, 0)) FOR UPDATE SKIP LOCKED ) AS j ) UPDATE job SET worker_id = cw.id, worker_rid = cw.rid, status = 'RUNNING', started_at = CURRENT_TIMESTAMP, schedule_count = schedule_count + 1, attempts = attempts + 1, updated_at = CURRENT_TIMESTAMP FROM current_worker AS cw, job_ids WHERE job.id = ANY(SELECT id FROM job_ids) AND EXISTS (SELECT 1 FROM current_worker) RETURNING job.id, job.rid, job.worker_id, job.worker_rid, job.options, job.task_name, job.parameters, job.status, job.scheduled_at, job.started_at, job.schedule_count, job.attempts, job.created_at, job.updated_at;`, worker.ID, ) if err != nil { return nil, fmt.Errorf("error querying job for worker id %v: %w", worker.ID, err) } defer rows.Close() var jobs []*model.Job for rows.Next() { job := &model.Job{} err := rows.Scan( &job.ID, &job.RID, &job.WorkerID, &job.WorkerRID, &job.Options, &job.TaskName, &job.Parameters, &job.Status, &job.ScheduledAt, &job.StartedAt, &job.ScheduleCount, &job.Attempts, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error updating initial job for worker id %v: %w", job.WorkerRID, err) } jobs = append(jobs, job) } return jobs, nil } // UpdateJobFinal updates an existing job record in the database to state 'FAILED' or 'SUCCEEDED'. // // It deletes the job from the 'job' table and inserts it into the 'job_archive' table. // The archived job will have the status set to the provided status, and it will include results and error information. // // It returns the archived job record. func (r JobDBHandler) UpdateJobFinal(job *model.Job) (*model.Job, error) { row := r.db.Instance.QueryRow( `WITH jobs_old AS ( DELETE FROM job WHERE id = $1 RETURNING id, rid, worker_id, worker_rid, options, task_name, parameters, scheduled_at, started_at, schedule_count, attempts, created_at, updated_at ) INSERT INTO job_archive ( id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, schedule_count, attempts, results, error, created_at, updated_at ) SELECT jobs_old.id, jobs_old.rid, jobs_old.worker_id, jobs_old.worker_rid, jobs_old.options, jobs_old.task_name, jobs_old.parameters, $2, jobs_old.scheduled_at, jobs_old.started_at, jobs_old.schedule_count, jobs_old.attempts, $3, $4, jobs_old.created_at, CURRENT_TIMESTAMP FROM jobs_old RETURNING id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, schedule_count, attempts, results, error, created_at, updated_at;`, job.ID, job.Status, job.Results, job.Error, ) archivedJob := &model.Job{} err := row.Scan( &archivedJob.ID, &archivedJob.RID, &archivedJob.WorkerID, &archivedJob.WorkerRID, &archivedJob.Options, &archivedJob.TaskName, &archivedJob.Parameters, &archivedJob.Status, &archivedJob.ScheduledAt, &archivedJob.StartedAt, &archivedJob.ScheduleCount, &archivedJob.Attempts, &archivedJob.Results, &archivedJob.Error, &archivedJob.CreatedAt, &archivedJob.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error updating final job for worker id %v: %w", job.WorkerRID, err) } return archivedJob, nil } // DeleteJob deletes a job record from the database based on its RID. func (r JobDBHandler) DeleteJob(rid uuid.UUID) error { _, err := r.db.Instance.Exec( `DELETE FROM job WHERE rid = $1`, rid, ) if err != nil { return fmt.Errorf("error deleting job with RID %s: %w", rid, err) } return nil } // SelectJob retrieves a single job record from the database based on its RID. func (r JobDBHandler) SelectJob(rid uuid.UUID) (*model.Job, error) { row := r.db.Instance.QueryRow( `SELECT id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, schedule_count, attempts, results, error, created_at, updated_at FROM job WHERE rid = $1`, rid, ) job := &model.Job{} err := row.Scan( &job.ID, &job.RID, &job.WorkerID, &job.WorkerRID, &job.Options, &job.TaskName, &job.Parameters, &job.Status, &job.ScheduledAt, &job.StartedAt, &job.ScheduleCount, &job.Attempts, &job.Results, &job.Error, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error scanning job with RID %s: %w", rid, err) } return job, nil } // SelectAllJobs retrieves a paginated list of jobs for a all workers. // It returns jobs that were created before the specified lastID, or the newest jobs if lastID is 0. func (r JobDBHandler) SelectAllJobs(lastID int, entries int) ([]*model.Job, error) { rows, err := r.db.Instance.Query( `SELECT id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, schedule_count, attempts, results, error, created_at, updated_at FROM job WHERE (0 = $1 OR created_at < ( SELECT d.created_at FROM job AS d WHERE d.id = $1)) ORDER BY created_at DESC LIMIT $2;`, lastID, entries, ) if err != nil { return []*model.Job{}, fmt.Errorf("error querying all jobs: %w", err) } defer rows.Close() var jobs []*model.Job for rows.Next() { job := &model.Job{} err := rows.Scan( &job.ID, &job.RID, &job.WorkerID, &job.WorkerRID, &job.Options, &job.TaskName, &job.Parameters, &job.Status, &job.ScheduledAt, &job.StartedAt, &job.ScheduleCount, &job.Attempts, &job.Results, &job.Error, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return []*model.Job{}, fmt.Errorf("error scanning job row: %w", err) } jobs = append(jobs, job) } return jobs, nil } // SelectAllJobsByWorkerRID retrieves a paginated list of jobs for a specific worker, filtered by worker RID. // It returns jobs that were created before the specified lastID, or the newest jobs if lastID is 0. func (r JobDBHandler) SelectAllJobsByWorkerRID(workerRid uuid.UUID, lastID int, entries int) ([]*model.Job, error) { rows, err := r.db.Instance.Query( `SELECT id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, schedule_count, attempts, results, error, created_at, updated_at FROM job WHERE worker_rid = $1 AND (0 = $2 OR created_at < ( SELECT d.created_at FROM job AS d WHERE d.id = $2)) ORDER BY created_at DESC LIMIT $3;`, workerRid, lastID, entries, ) if err != nil { return []*model.Job{}, fmt.Errorf("error querying jobs by worker RID: %w", err) } defer rows.Close() var jobs []*model.Job for rows.Next() { job := &model.Job{} err := rows.Scan( &job.ID, &job.RID, &job.WorkerID, &job.WorkerRID, &job.Options, &job.TaskName, &job.Parameters, &job.Status, &job.ScheduledAt, &job.StartedAt, &job.ScheduleCount, &job.Attempts, &job.Results, &job.Error, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return []*model.Job{}, fmt.Errorf("error scanning job row for worker: %w", err) } jobs = append(jobs, job) } return jobs, nil } // SelectAllJobsBySearch retrieves a paginated list of jobs for a worker, filtered by search string. // // It searches across 'rid', 'worker_id', and 'status' fields. // The search is case-insensitive and uses ILIKE for partial matches. // // It returns jobs that were created before the specified lastID, or the newest jobs if lastID is 0. func (r JobDBHandler) SelectAllJobsBySearch(search string, lastID int, entries int) ([]*model.Job, error) { rows, err := r.db.Instance.Query(` SELECT id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, schedule_count, attempts, results, error, created_at, updated_at FROM job WHERE (rid::text ILIKE '%' || $1 || '%' OR worker_id::text ILIKE '%' || $1 || '%' OR task_name ILIKE '%' || $1 || '%' OR status ILIKE '%' || $1 || '%') AND (0 = $2 OR created_at < ( SELECT u.created_at FROM job AS u WHERE u.id = $2)) ORDER BY created_at DESC LIMIT $3`, search, lastID, entries, ) if err != nil { return []*model.Job{}, fmt.Errorf("error querying jobs by search: %w", err) } defer rows.Close() var jobs []*model.Job for rows.Next() { job := &model.Job{} err := rows.Scan( &job.ID, &job.RID, &job.WorkerID, &job.WorkerRID, &job.Options, &job.TaskName, &job.Parameters, &job.Status, &job.ScheduledAt, &job.StartedAt, &job.ScheduleCount, &job.Attempts, &job.Results, &job.Error, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return []*model.Job{}, fmt.Errorf("error scanning job row during search: %w", err) } jobs = append(jobs, job) } if err = rows.Err(); err != nil { return []*model.Job{}, fmt.Errorf("error after iterating rows during search: %w", err) } return jobs, nil } // Job Archive // AddRetentionArchive updates the retention archive settings for the job archive. func (r JobDBHandler) AddRetentionArchive(retention time.Duration) error { _, err := r.db.Instance.Exec( `SELECT add_retention_policy('job_archive', ($1 * INTERVAL '1 day'));`, int(retention.Hours()/24), ) if err != nil { return fmt.Errorf("error updating retention archive: %w", err) } return nil } // RemoveRetentionArchive removes the retention archive settings for the job archive. func (r JobDBHandler) RemoveRetentionArchive() error { _, err := r.db.Instance.Exec( `SELECT remove_retention_policy('job_archive');`, ) if err != nil { return fmt.Errorf("error removing retention archive: %w", err) } return nil } // SelectJobFromArchive retrieves a single archived job record from the database based on its RID. func (r JobDBHandler) SelectJobFromArchive(rid uuid.UUID) (*model.Job, error) { row := r.db.Instance.QueryRow( `SELECT id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, schedule_count, attempts, results, error, created_at, updated_at FROM job_archive WHERE rid = $1`, rid, ) job := &model.Job{} err := row.Scan( &job.ID, &job.RID, &job.WorkerID, &job.WorkerRID, &job.Options, &job.TaskName, &job.Parameters, &job.Status, &job.ScheduledAt, &job.StartedAt, &job.ScheduleCount, &job.Attempts, &job.Results, &job.Error, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error scanning archived job with RID %s: %w", rid, err) } return job, nil } // SelectAllJobsFromArchive retrieves a paginated list of archived jobs. // It returns jobs that were created before the specified lastID, or the newest jobs if lastID is 0. func (r JobDBHandler) SelectAllJobsFromArchive(lastID int, entries int) ([]*model.Job, error) { rows, err := r.db.Instance.Query( `SELECT id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, schedule_count, attempts, results, error, created_at, updated_at FROM job_archive WHERE (0 = $1 OR created_at < ( SELECT d.created_at FROM job_archive AS d WHERE d.id = $1)) ORDER BY created_at DESC LIMIT $2;`, lastID, entries, ) if err != nil { return []*model.Job{}, fmt.Errorf("error querying all archived jobs: %w", err) } defer rows.Close() var jobs []*model.Job for rows.Next() { job := &model.Job{} err := rows.Scan( &job.ID, &job.RID, &job.WorkerID, &job.WorkerRID, &job.Options, &job.TaskName, &job.Parameters, &job.Status, &job.ScheduledAt, &job.StartedAt, &job.ScheduleCount, &job.Attempts, &job.Results, &job.Error, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return []*model.Job{}, fmt.Errorf("error scanning archived job row: %w", err) } jobs = append(jobs, job) } return jobs, nil } // SelectAllJobsFromArchiveBySearch retrieves a paginated list of archived jobs filtered by search string. // It searches across 'rid', 'worker_id', 'task_name', and 'status' fields. // It returns jobs that were created before the specified lastID, or the newest jobs if lastID func (r JobDBHandler) SelectAllJobsFromArchiveBySearch(search string, lastID int, entries int) ([]*model.Job, error) { rows, err := r.db.Instance.Query(` SELECT id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, schedule_count, attempts, results, error, created_at, updated_at FROM job_archive WHERE (rid::text ILIKE '%' || $1 || '%' OR worker_id::text ILIKE '%' || $1 || '%' OR task_name ILIKE '%' || $1 || '%' OR status ILIKE '%' || $1 || '%') AND (0 = $2 OR created_at < ( SELECT u.created_at FROM job_archive AS u WHERE u.id = $2)) ORDER BY created_at DESC LIMIT $3`, search, lastID, entries, ) if err != nil { return []*model.Job{}, fmt.Errorf("error querying archived jobs by search: %w", err) } defer rows.Close() var jobs []*model.Job for rows.Next() { job := &model.Job{} err := rows.Scan( &job.ID, &job.RID, &job.WorkerID, &job.WorkerRID, &job.Options, &job.TaskName, &job.Parameters, &job.Status, &job.ScheduledAt, &job.StartedAt, &job.ScheduleCount, &job.Attempts, &job.Results, &job.Error, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return []*model.Job{}, fmt.Errorf("error scanning archived job row during search: %w", err) } jobs = append(jobs, job) } if err = rows.Err(); err != nil { return []*model.Job{}, fmt.Errorf("error after iterating rows during search: %w", err) } return jobs, nil }
package database import ( "context" "fmt" "log" "time" "github.com/lib/pq" "github.com/siherrmann/queuer/helper" ) type QueuerListener struct { Listener *pq.Listener Channel string } // NewQueuerDBListener creates a new QueuerListener instance. // It initializes a PostgreSQL listener for the specified channel using the provided database configuration. // The listener will automatically reconnect if the connection is lost, with a 10-second timeout and a 1-minute interval for reconnection attempts. // If an error occurs during the creation of the listener, it returns an error. // The listener will log any errors encountered during listening. func NewQueuerDBListener(dbConfig *helper.DatabaseConfiguration, channel string) (*QueuerListener, error) { listener := pq.NewListener(dbConfig.DatabaseConnectionString(), 10*time.Second, time.Minute, func(ev pq.ListenerEventType, err error) { if err != nil { fmt.Printf("error creating postgres listener: %v", err) } }) err := listener.Listen(channel) if err != nil { return nil, fmt.Errorf("error listening to channel %v: %v", channel, err) } log.Println("Added listener to channel: ", channel) return &QueuerListener{ Listener: listener, Channel: channel, }, nil } // Listen listens for events on the specified channel and processes them. // It takes a context for cancellation, a cancel function to stop listening, // and a notifyFunction that will be called with the event data when an event is received. // The listener will check the connection every 90 seconds and will cancel the context if an error occurs during the ping. // The notifyFunction will be called in a separate goroutine to avoid blocking the listener. // If the context is done, the listener will stop listening and returns. // It will log any errors encountered during the ping operation. func (l *QueuerListener) Listen(ctx context.Context, cancel context.CancelFunc, notifyFunction func(data string)) { for { select { case <-ctx.Done(): return case n := <-l.Listener.Notify: if n != nil { go notifyFunction(n.Extra) } case <-time.After(90 * time.Second): // Checking connection all 90 seconds err := l.Listener.Ping() if err != nil { log.Printf("Error pinging listener: %v", err) cancel() return } } } }
package database import ( "context" "database/sql" "fmt" "log" "time" "github.com/siherrmann/queuer/helper" "github.com/siherrmann/queuer/model" ) // MasterDBHandlerFunctions defines the interface for Master database operations. type MasterDBHandlerFunctions interface { CheckTableExistance() (bool, error) CreateTable() error DropTable() error UpdateMaster(worker *model.Worker, settings *model.MasterSettings) (*model.Master, error) SelectMaster() (*model.Master, error) } // MasterDBHandler implements MasterDBHandlerFunctions and holds the database connection. type MasterDBHandler struct { db *helper.Database } // NewMasterDBHandler creates a new instance of MasterDBHandler. // It initializes the database connection and creates the master table if it does not exist. func NewMasterDBHandler(dbConnection *helper.Database, withTableDrop bool) (*MasterDBHandler, error) { if dbConnection == nil { return nil, fmt.Errorf("database connection is nil") } masterDbHandler := &MasterDBHandler{ db: dbConnection, } if withTableDrop { err := masterDbHandler.DropTable() if err != nil { return nil, fmt.Errorf("error dropping master table: %w", err) } } err := masterDbHandler.CreateTable() if err != nil { return nil, fmt.Errorf("error creating mater table: %#v", err) } return masterDbHandler, nil } // CheckTableExistance checks if the 'master' table exists in the database. // It returns true if the table exists, otherwise false. func (r MasterDBHandler) CheckTableExistance() (bool, error) { masterTableExists, err := r.db.CheckTableExistance("master") if err != nil { return false, fmt.Errorf("error checking master table existence: %w", err) } return masterTableExists, err } // CreateTable creates the 'mater' and 'mater_archive' tables in the database. // If the tables already exist, it does not create them again. // It also creates a trigger for notifying events on the table and all necessary indexes. func (r MasterDBHandler) CreateTable() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _, err := r.db.Instance.ExecContext( ctx, `CREATE TABLE IF NOT EXISTS master ( id INTEGER PRIMARY KEY DEFAULT 1, worker_id BIGINT DEFAULT 0, worker_rid UUID, settings JSONB DEFAULT '{}'::JSONB, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );`, ) if err != nil { log.Panicf("error creating master table: %#v", err) } _, err = r.db.Instance.ExecContext( ctx, `INSERT INTO master DEFAULT VALUES ON CONFLICT (id) DO NOTHING;`, ) if err != nil { log.Panicf("error inserting initial master entry: %#v", err) } r.db.Logger.Println("created table master") return nil } // DropTables drops the 'mater' and 'mater_archive' tables from the database. func (r MasterDBHandler) DropTable() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() query := `DROP TABLE IF EXISTS master` _, err := r.db.Instance.ExecContext(ctx, query) if err != nil { return fmt.Errorf("error dropping master table: %#v", err) } r.db.Logger.Printf("Dropped table master") return nil } // UpdateMaster updates the master entry with the given worker's ID and settings. // It locks the row for update to ensure that only one worker can update the master at a time. // It returns the old master entry if it was successfully updated, or nil if no update was done. func (r MasterDBHandler) UpdateMaster(worker *model.Worker, settings *model.MasterSettings) (*model.Master, error) { row := r.db.Instance.QueryRow( `WITH current_master AS ( SELECT id, worker_id, worker_rid, settings, created_at, updated_at FROM master WHERE id = 1 AND ( updated_at < (CURRENT_TIMESTAMP - ($4 * INTERVAL '1 minute')) OR worker_id = $1 OR worker_id = 0 ) FOR UPDATE SKIP LOCKED ) UPDATE master SET worker_id = $1, worker_rid = $2, settings = $3::JSONB, updated_at = CURRENT_TIMESTAMP FROM current_master WHERE master.id = current_master.id RETURNING current_master.id, current_master.worker_id, current_master.worker_rid, current_master.settings, current_master.created_at, current_master.updated_at;`, worker.ID, worker.RID, settings, int(settings.MasterLockTimeout.Minutes()), ) master := &model.Master{} err := row.Scan( &master.ID, &master.WorkerID, &master.WorkerRID, &master.Settings, &master.CreatedAt, &master.UpdatedAt, ) if err != nil { if err == sql.ErrNoRows { return nil, nil } return nil, fmt.Errorf("error scanning master for worker id %v: %w", worker.ID, err) } return master, nil } // SelectMaster retrieves the current master entry from the database. func (r MasterDBHandler) SelectMaster() (*model.Master, error) { row := r.db.Instance.QueryRow( `SELECT id, worker_id, worker_rid, settings, created_at, updated_at FROM master WHERE id = 1;`, ) master := &model.Master{} err := row.Scan( &master.ID, &master.WorkerID, &master.WorkerRID, &master.Settings, &master.CreatedAt, &master.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error scanning master: %w", err) } return master, nil }
package database import ( "context" "fmt" "log" "time" "github.com/google/uuid" "github.com/lib/pq" "github.com/siherrmann/queuer/helper" "github.com/siherrmann/queuer/model" ) // WorkerDBHandlerFunctions defines the interface for Worker database operations. type WorkerDBHandlerFunctions interface { CheckTableExistance() (bool, error) CreateTable() error DropTable() error InsertWorker(worker *model.Worker) (*model.Worker, error) UpdateWorker(worker *model.Worker) (*model.Worker, error) DeleteWorker(rid uuid.UUID) error SelectWorker(rid uuid.UUID) (*model.Worker, error) SelectAllWorkers(lastID int, entries int) ([]*model.Worker, error) SelectAllWorkersBySearch(search string, lastID int, entries int) ([]*model.Worker, error) } // WorkerDBHandler implements WorkerDBHandlerFunctions and holds the database connection. type WorkerDBHandler struct { db *helper.Database } // NewWorkerDBHandler creates a new instance of WorkerDBHandler. // It initializes the database connection and optionally drops the existing worker table. // If withTableDrop is true, it will drop the existing worker table before creating a new one. func NewWorkerDBHandler(dbConnection *helper.Database, withTableDrop bool) (*WorkerDBHandler, error) { if dbConnection == nil { return nil, fmt.Errorf("database connection is nil") } workerDbHandler := &WorkerDBHandler{ db: dbConnection, } if withTableDrop { err := workerDbHandler.DropTable() if err != nil { return nil, fmt.Errorf("error dropping worker table: %#v", err) } } err := workerDbHandler.CreateTable() if err != nil { return nil, fmt.Errorf("error creating worker table: %#v", err) } return workerDbHandler, nil } // CheckTableExistance checks if the 'worker' table exists in the database. // It returns true if the table exists, otherwise false. func (r WorkerDBHandler) CheckTableExistance() (bool, error) { exists := false exists, err := r.db.CheckTableExistance("worker") return exists, err } // CreateTable creates the 'worker' table in the database if it doesn't already exist. // It defines the structure of the table with appropriate columns and types. // If the table already exists, it will not create it again. // It also creates necessary indexes for efficient querying. func (r WorkerDBHandler) CreateTable() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _, err := r.db.Instance.ExecContext( ctx, `CREATE TABLE IF NOT EXISTS worker ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, rid UUID UNIQUE DEFAULT gen_random_uuid(), name VARCHAR(100) DEFAULT '', options JSONB DEFAULT '{}', available_tasks VARCHAR[] DEFAULT ARRAY[]::VARCHAR[], available_next_interval VARCHAR[] DEFAULT ARRAY[]::VARCHAR[], current_concurrency INT DEFAULT 0, max_concurrency INT DEFAULT 1, status VARCHAR(50) DEFAULT 'RUNNING', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP )`, ) if err != nil { log.Panicf("error creating worker table: %#v", err) } err = r.db.CreateIndexes("worker", "rid", "name", "status") if err != nil { r.db.Logger.Fatal(err) } r.db.Logger.Println("created table worker") return nil } // DropTable drops the 'worker' table from the database. // It will remove the table and all its data. // This operation is irreversible, so it should be used with caution. // It is used during testing or when resetting the database schema. // If the table does not exist, it will not return an error. func (r WorkerDBHandler) DropTable() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() query := `DROP TABLE IF EXISTS worker` _, err := r.db.Instance.ExecContext(ctx, query) if err != nil { return fmt.Errorf("error dropping worker table: %#v", err) } r.db.Logger.Printf("Dropped table worker") return nil } // InsertWorker inserts a new worker record with name, otpions and max concurrency into the database. // It returns the newly created worker with an automatically generated RID. // If the insertion fails, it returns an error. func (r WorkerDBHandler) InsertWorker(worker *model.Worker) (*model.Worker, error) { row := r.db.Instance.QueryRow( `INSERT INTO worker (name, options, max_concurrency) VALUES ($1, $2, $3) RETURNING id, rid, name, options, max_concurrency, status, created_at, updated_at`, worker.Name, worker.Options, worker.MaxConcurrency, ) newWorker := &model.Worker{} err := row.Scan( &newWorker.ID, &newWorker.RID, &newWorker.Name, &newWorker.Options, &newWorker.MaxConcurrency, &newWorker.Status, &newWorker.CreatedAt, &newWorker.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error scanning new worker: %w", err) } return newWorker, nil } // UpdateWorker updates an existing worker record in the database based on its RID. // It updates the worker's name, options, available tasks, next interval functions, max concurrency, and status. // It returns the updated worker record with an automatically updated updated_at timestamp. // If the update fails, it returns an error. func (r WorkerDBHandler) UpdateWorker(worker *model.Worker) (*model.Worker, error) { row := r.db.Instance.QueryRow( `UPDATE worker SET name = $1, options = $2, available_tasks = $3, available_next_interval = $4, max_concurrency = $5, status = $6, updated_at = CURRENT_TIMESTAMP WHERE rid = $7 RETURNING id, rid, name, options, available_tasks, available_next_interval, max_concurrency, status, created_at, updated_at;`, worker.Name, worker.Options, pq.Array(worker.AvailableTasks), pq.Array(worker.AvailableNextIntervalFuncs), worker.MaxConcurrency, worker.Status, worker.RID, ) updatedWorker := &model.Worker{} err := row.Scan( &updatedWorker.ID, &updatedWorker.RID, &updatedWorker.Name, &updatedWorker.Options, pq.Array(&updatedWorker.AvailableTasks), pq.Array(&updatedWorker.AvailableNextIntervalFuncs), &updatedWorker.MaxConcurrency, &updatedWorker.Status, &updatedWorker.CreatedAt, &updatedWorker.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error scanning updated worker: %w", err) } return updatedWorker, err } // DeleteWorker deletes a worker record from the database based on its RID. // It removes the worker from the database and returns an error if the deletion fails. func (r WorkerDBHandler) DeleteWorker(rid uuid.UUID) error { _, err := r.db.Instance.Exec( `DELETE FROM worker WHERE rid = $1`, rid, ) if err != nil { return fmt.Errorf("error deleting worker with RID %s: %w", rid.String(), err) } return nil } // SelectWorker retrieves a single worker record from the database based on its RID. // It returns the worker record. // If the worker is not found or an error occurs during the query, it returns an error. func (r WorkerDBHandler) SelectWorker(rid uuid.UUID) (*model.Worker, error) { worker := &model.Worker{} row := r.db.Instance.QueryRow( `SELECT id, rid, name, options, available_tasks, available_next_interval, max_concurrency, status, created_at, updated_at FROM worker WHERE rid = $1`, rid, ) err := row.Scan( &worker.ID, &worker.RID, &worker.Name, &worker.Options, pq.Array(&worker.AvailableTasks), pq.Array(&worker.AvailableNextIntervalFuncs), &worker.MaxConcurrency, &worker.Status, &worker.CreatedAt, &worker.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error scanning worker with RID %s: %w", rid.String(), err) } return worker, nil } // SelectAllWorkers retrieves a paginated list of all workers. // It returns a slice of worker records, ordered by creation date in descending order. // It returns workers that were created before the specified lastID, or the newest workers if lastID is 0. func (r WorkerDBHandler) SelectAllWorkers(lastID int, entries int) ([]*model.Worker, error) { var workers []*model.Worker rows, err := r.db.Instance.Query( `SELECT id, rid, name, options, available_tasks, available_next_interval, max_concurrency, status, created_at, updated_at FROM worker WHERE (0 = $1 OR created_at < ( SELECT d.created_at FROM worker AS d WHERE d.id = $1)) ORDER BY created_at DESC LIMIT $2`, lastID, entries, ) if err != nil { return []*model.Worker{}, fmt.Errorf("error querying all workers: %w", err) } defer rows.Close() for rows.Next() { worker := &model.Worker{} err := rows.Scan( &worker.ID, &worker.RID, &worker.Name, &worker.Options, pq.Array(&worker.AvailableTasks), pq.Array(&worker.AvailableNextIntervalFuncs), &worker.MaxConcurrency, &worker.Status, &worker.CreatedAt, &worker.UpdatedAt, ) if err != nil { return []*model.Worker{}, fmt.Errorf("error scanning worker row: %w", err) } workers = append(workers, worker) } if err = rows.Err(); err != nil { return []*model.Worker{}, fmt.Errorf("error iterating rows: %w", err) } return workers, nil } // SelectAllWorkersBySearch retrieves a paginated list of workers, filtered by search string. // It searches across 'queue_name', 'name', and 'status' fields. // The search is case-insensitive and uses ILIKE for partial matches. // It returns a slice of worker records, ordered by creation date in descending order. // It returns workers that were created before the specified lastID, or the newest workers if last func (r WorkerDBHandler) SelectAllWorkersBySearch(search string, lastID int, entries int) ([]*model.Worker, error) { var workers []*model.Worker rows, err := r.db.Instance.Query(` SELECT id, rid, name, options, available_tasks, available_next_interval, max_concurrency, status, created_at, updated_at FROM worker WHERE (name ILIKE '%' || $1 || '%' OR array_to_string(available_tasks, ',') ILIKE '%' || $1 || '%' OR status ILIKE '%' || $1 || '%') AND (0 = $2 OR created_at < ( SELECT u.created_at FROM worker AS u WHERE u.id = $2)) ORDER BY created_at DESC LIMIT $3`, search, lastID, entries, ) if err != nil { return []*model.Worker{}, fmt.Errorf("error querying workers by search: %w", err) } defer rows.Close() for rows.Next() { worker := &model.Worker{} err := rows.Scan( &worker.ID, &worker.RID, &worker.Name, &worker.Options, pq.Array(&worker.AvailableTasks), pq.Array(&worker.AvailableNextIntervalFuncs), &worker.MaxConcurrency, &worker.Status, &worker.CreatedAt, &worker.UpdatedAt, ) if err != nil { return []*model.Worker{}, fmt.Errorf("error scanning worker row during search: %w", err) } workers = append(workers, worker) } if err = rows.Err(); err != nil { return []*model.Worker{}, fmt.Errorf("error iterating rows: %w", err) } return workers, nil }
package helper import ( "context" "database/sql" "fmt" "log" "os" "strconv" "strings" "sync" "time" _ "github.com/joho/godotenv/autoload" "github.com/lib/pq" ) // Database represents a service that interacts with a database. type Database struct { Name string Logger *log.Logger Instance *sql.DB } func NewDatabase(name string, dbConfig *DatabaseConfiguration) *Database { logger := log.New(os.Stdout, "Database "+name+": ", log.Ltime) if dbConfig != nil { db := &Database{Name: name, Logger: logger} db.ConnectToDatabase(dbConfig, logger) if db.Instance == nil { logger.Fatal("failed to connect to database") } err := db.AddNotifyFunction() if err != nil { logger.Panicf("failed to add notify function: %v", err) } return db } else { return &Database{ Name: name, Logger: logger, Instance: nil, } } } func NewDatabaseWithDB(name string, dbConnnection *sql.DB) *Database { logger := log.New(os.Stdout, "Database "+name+": ", log.Ltime) return &Database{ Name: name, Logger: logger, Instance: dbConnnection, } } type DatabaseConfiguration struct { Host string Port string Database string Username string Password string Schema string WithTableDrop bool } // NewDatabaseConfiguration creates a new DatabaseConfiguration instance. // It reads the database configuration from environment variables. // It returns a pointer to the new DatabaseConfiguration instance or an error if any required environment variable is not set. func NewDatabaseConfiguration() (*DatabaseConfiguration, error) { config := &DatabaseConfiguration{ Host: os.Getenv("QUEUER_DB_HOST"), Port: os.Getenv("QUEUER_DB_PORT"), Database: os.Getenv("QUEUER_DB_DATABASE"), Username: os.Getenv("QUEUER_DB_USERNAME"), Password: os.Getenv("QUEUER_DB_PASSWORD"), Schema: os.Getenv("QUEUER_DB_SCHEMA"), WithTableDrop: os.Getenv("QUEUER_DB_WITH_TABLE_DROP") == "true", } if len(strings.TrimSpace(config.Host)) == 0 || len(strings.TrimSpace(config.Port)) == 0 || len(strings.TrimSpace(config.Database)) == 0 || len(strings.TrimSpace(config.Username)) == 0 || len(strings.TrimSpace(config.Password)) == 0 || len(strings.TrimSpace(config.Schema)) == 0 { return nil, fmt.Errorf("QUEUER_DB_HOST, QUEUER_DB_PORT, QUEUER_DB_DATABASE, QUEUER_DB_USERNAME, QUEUER_DB_PASSWORD and QUEUER_DB_SCHEMA environment variables must be set") } return config, nil } func (d *DatabaseConfiguration) DatabaseConnectionString() string { return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable&search_path=%s", d.Username, d.Password, d.Host, d.Port, d.Database, d.Schema) } // Internal function for the service creation to connect to a database. // DatabaseConfiguration must contain uri, username and password. // It initializes the database connection and sets the Instance field of the Database struct. func (d *Database) ConnectToDatabase(dbConfig *DatabaseConfiguration, logger *log.Logger) { if len(strings.TrimSpace(dbConfig.Host)) == 0 || len(strings.TrimSpace(dbConfig.Port)) == 0 || len(strings.TrimSpace(dbConfig.Database)) == 0 || len(strings.TrimSpace(dbConfig.Username)) == 0 || len(strings.TrimSpace(dbConfig.Password)) == 0 || len(strings.TrimSpace(dbConfig.Schema)) == 0 { logger.Fatalln("database configuration must contain uri, username and password.") } var connectOnce sync.Once var db *sql.DB connectOnce.Do(func() { dsn, err := pq.ParseURL(dbConfig.DatabaseConnectionString()) if err != nil { logger.Fatalf("error parsing database connection string: %v", err) } base, err := pq.NewConnector(dsn) if err != nil { log.Panic(err) } db = sql.OpenDB(pq.ConnectorWithNoticeHandler(base, func(notice *pq.Error) { // log.Printf("Notice sent: %s", notice.Message) })) db.SetMaxOpenConns(0) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _, err = db.ExecContext( ctx, "CREATE EXTENSION IF NOT EXISTS pg_trgm;", ) if err != nil { logger.Fatal(err) } pingErr := db.Ping() if pingErr != nil { logger.Fatal(pingErr) } logger.Println("Connected to db") }) d.Instance = db } // AddNotifyFunction adds a PostgreSQL function to the database that will be called on certain table operations. // It creates a function that raises a notification on the specified channel when a row is inserted, // updated, or deleted in the job or worker table. // The function uses the row_to_json function to convert the row data to JSON format. // It returns an error if the function creation fails. func (d *Database) AddNotifyFunction() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // RAISE NOTICE 'Trigger called on table: %, operation: %', TG_TABLE_NAME, TG_OP; _, err := d.Instance.ExecContext( ctx, `CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$ DECLARE data JSON; channel TEXT; BEGIN IF (TG_TABLE_NAME = 'job') OR (TG_TABLE_NAME = 'worker') THEN channel := TG_TABLE_NAME; ELSE channel := 'job_archive'; END IF; IF (TG_OP = 'DELETE') THEN data = row_to_json(OLD); ELSE data = row_to_json(NEW); END IF; PERFORM pg_notify(channel, data::text); RETURN NEW; END; $$ LANGUAGE plpgsql;`, ) if err != nil { return fmt.Errorf("error creating notify function: %#v", err) } return nil } // CheckTableExistance checks if a table with the specified name exists in the database. // It queries the information_schema.tables to check for the existence of the table. // It returns true if the table exists, false otherwise, and an error if the query fails. func (d *Database) CheckTableExistance(tableName string) (bool, error) { exists := false tableNameQuoted := pq.QuoteLiteral(tableName) row := d.Instance.QueryRow( fmt.Sprintf(` SELECT EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_name = %s ) AS table_existence`, tableNameQuoted, ), ) err := row.Scan( &exists, ) if err != nil { return false, err } return exists, nil } // CreateIndex creates an index on the specified column of the specified table. // It uses the PostgreSQL CREATE INDEX statement to create the index. // If the index already exists, it will not create a new one. // It returns an error if the index creation fails. func (d *Database) CreateIndex(tableName string, columnName string) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() tableNameQuoted := pq.QuoteIdentifier(tableName) indexQuoted := pq.QuoteIdentifier("idx_" + tableName + "_" + columnName) columnNameQuoted := pq.QuoteIdentifier(columnName) _, err := d.Instance.ExecContext( ctx, fmt.Sprintf("CREATE INDEX IF NOT EXISTS %s ON %s(%s)", indexQuoted, tableNameQuoted, columnNameQuoted), ) if err != nil { return fmt.Errorf("error creating %s index: %#v", indexQuoted, err) } return nil } // CreateIndexes creates indexes on the specified columns of the specified table. // It iterates over the column names and calls CreateIndex for each one. // It returns an error if any of the index creations fail. func (d *Database) CreateIndexes(tableName string, columnNames ...string) error { for _, columnName := range columnNames { err := d.CreateIndex(tableName, columnName) if err != nil { return err } } return nil } // CreateCombinedIndex creates a combined index on the specified columns of the specified table. // It uses the PostgreSQL CREATE INDEX statement to create the index. // If the index already exists, it will not create a new one. // It returns an error if the index creation fails. func (d *Database) CreateCombinedIndex(tableName string, columnName1 string, columnName2 string) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() tableNameQuoted := pq.QuoteIdentifier(tableName) indexQuoted := pq.QuoteIdentifier("idx_" + tableName + "_" + columnName1 + "_" + columnName2) columnName1Quoted := pq.QuoteIdentifier(columnName1) columnName2Quoted := pq.QuoteIdentifier(columnName2) _, err := d.Instance.ExecContext( ctx, fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s ON %s (%s, %s)`, indexQuoted, tableNameQuoted, columnName1Quoted, columnName2Quoted), ) if err != nil { return fmt.Errorf("error creating %s index: %#v", indexQuoted, err) } return nil } // CreateUniqueCombinedIndex creates a unique combined index on the specified columns of the specified table. // It uses the PostgreSQL CREATE UNIQUE INDEX statement to create the index. // If the index already exists, it will not create a new one. // It returns an error if the index creation fails. func (d *Database) CreateUniqueCombinedIndex(tableName string, columnName1 string, columnName2 string) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() tableNameQuoted := pq.QuoteIdentifier(tableName) indexQuoted := pq.QuoteIdentifier("idx_" + tableName + "_" + columnName1 + "_" + columnName2) columnName1Quoted := pq.QuoteIdentifier(columnName1) columnName2Quoted := pq.QuoteIdentifier(columnName2) _, err := d.Instance.ExecContext( ctx, fmt.Sprintf(`CREATE UNIQUE INDEX IF NOT EXISTS %s ON %s (%s, %s)`, indexQuoted, tableNameQuoted, columnName1Quoted, columnName2Quoted), ) if err != nil { return fmt.Errorf("error creating %s index: %#v", indexQuoted, err) } return nil } // DropIndex drops the index on the specified table and column. // It uses the PostgreSQL DROP INDEX statement to drop the index. // If the index does not exist, it will not return an error. // It returns an error if the index dropping fails. func (d *Database) DropIndex(tableName string, jsonMapKey string) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() indexQuoted := pq.QuoteIdentifier("idx_" + tableName + "_" + jsonMapKey) _, err := d.Instance.ExecContext( ctx, fmt.Sprintf(`DROP INDEX IF EXISTS %s;`, indexQuoted), ) if err != nil { return fmt.Errorf("error dropping %s index: %#v", indexQuoted, err) } return nil } // Health checks the health of the database connection by pinging the database. // It returns a map with keys indicating various health statistics. func (d *Database) Health() map[string]string { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() stats := make(map[string]string) // Ping the database err := d.Instance.PingContext(ctx) if err != nil { stats["status"] = "down" stats["error"] = fmt.Sprintf("db down: %v", err) log.Panicf("db down: %v", err) // Log the error and terminate the program return stats } // Database is up, add more statistics stats["status"] = "up" stats["message"] = "It's healthy" // Get database stats (like open connections, in use, idle, etc.) dbStats := d.Instance.Stats() stats["open_connections"] = strconv.Itoa(dbStats.OpenConnections) stats["in_use"] = strconv.Itoa(dbStats.InUse) stats["idle"] = strconv.Itoa(dbStats.Idle) stats["wait_count"] = strconv.FormatInt(dbStats.WaitCount, 10) stats["wait_duration"] = dbStats.WaitDuration.String() stats["max_idle_closed"] = strconv.FormatInt(dbStats.MaxIdleClosed, 10) stats["max_lifetime_closed"] = strconv.FormatInt(dbStats.MaxLifetimeClosed, 10) // Evaluate stats to provide a health message if dbStats.OpenConnections > 40 { // Assuming 50 is the max for this example stats["message"] = "The database is experiencing heavy load." } if dbStats.WaitCount > 1000 { stats["message"] = "The database has a high number of wait events, indicating potential bottlenecks." } if dbStats.MaxIdleClosed > int64(dbStats.OpenConnections)/2 { stats["message"] = "Many idle connections are being closed, consider revising the connection pool settings." } if dbStats.MaxLifetimeClosed > int64(dbStats.OpenConnections)/2 { stats["message"] = "Many connections are being closed due to max lifetime, consider increasing max lifetime or revising the connection usage pattern." } return stats } // Close closes the database connection. // It logs a message indicating the disconnection from the specific database. // If the connection is successfully closed, it returns nil. // If an error occurs while closing the connection, it returns the error. func (d *Database) Close() error { log.Printf("Disconnected from database: %v", d.Instance) return d.Instance.Close() }
package helper import ( "context" "fmt" "net/url" "testing" "time" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/modules/postgres" "github.com/testcontainers/testcontainers-go/wait" ) const ( dbName = "database" dbUser = "user" dbPwd = "password" ) // MustStartPostgresContainer starts a PostgreSQL container for testing purposes. // It uses the timescale/timescaledb image with PostgreSQL 17. // It returns a function to terminate the container, the port on which the database is accessible, // and an error if the container could not be started. func MustStartPostgresContainer() (func(ctx context.Context, opts ...testcontainers.TerminateOption) error, string, error) { ctx := context.Background() pgContainer, err := postgres.Run( ctx, "timescale/timescaledb:latest-pg17", postgres.WithDatabase(dbName), postgres.WithUsername(dbUser), postgres.WithPassword(dbPwd), testcontainers.WithWaitStrategy( wait.ForLog("database system is ready to accept connections"). WithOccurrence(2).WithStartupTimeout(5*time.Second), ), ) if err != nil { return nil, "", fmt.Errorf("error starting postgres container: %w", err) } connStr, err := pgContainer.ConnectionString(ctx, "sslmode=disable") if err != nil { return nil, "", fmt.Errorf("error getting connection string: %w", err) } u, err := url.Parse(connStr) if err != nil { return nil, "", fmt.Errorf("error parsing connection string: %v", err) } return pgContainer.Terminate, u.Port(), err } // NewTestDatabase creates a new Database instance for testing purposes. // It initializes the database with the provided configuration and the name "test_db". // It returns a pointer to the new Database instance. func NewTestDatabase(config *DatabaseConfiguration) *Database { return NewDatabase( "test_db", config, ) } // SetTestDatabaseConfigEnvs sets the environment variables for the test database configuration. // It sets the host, port, database name, username, password, schema, // and table drop options for the test database. func SetTestDatabaseConfigEnvs(t *testing.T, port string) { t.Setenv("QUEUER_DB_HOST", "localhost") t.Setenv("QUEUER_DB_PORT", port) t.Setenv("QUEUER_DB_DATABASE", dbName) t.Setenv("QUEUER_DB_USERNAME", dbUser) t.Setenv("QUEUER_DB_PASSWORD", dbPwd) t.Setenv("QUEUER_DB_SCHEMA", "public") t.Setenv("QUEUER_DB_WITH_TABLE_DROP", "true") }
package helper import ( "fmt" "reflect" "runtime" ) // CheckValidTask checks if the provided task is a valid function. // It returns an error if the task is nil, not a function, or if its value is nil. func CheckValidTask(task interface{}) error { if task == nil { return fmt.Errorf("task must not be nil") } if reflect.ValueOf(task).Kind() != reflect.Func { return fmt.Errorf("task must be a function, got %s", reflect.TypeOf(task).Kind()) } if reflect.ValueOf(task).IsNil() { return fmt.Errorf("task value must not be nil") } return nil } // CheckValidTaskWithParameters checks if the provided task and parameters are valid. // It checks if the task is a valid function and if the parameters match the task's input types. func CheckValidTaskWithParameters(task interface{}, parameters ...interface{}) error { err := CheckValidTask(task) if err != nil { return err } taskType := reflect.TypeOf(task) if taskType.NumIn() != len(parameters) { return fmt.Errorf("task expects %d parameters, got %d", taskType.NumIn(), len(parameters)) } for i, param := range parameters { if !reflect.TypeOf(param).AssignableTo(taskType.In(i)) { return fmt.Errorf("parameter %d of task must be of type %s, got %s", i, taskType.In(i).Kind(), reflect.TypeOf(param).Kind()) } } return nil } // GetTaskNameFromFunction retrieves the name of the function from the provided task. // It checks if the task is a valid function and returns its name. func GetTaskNameFromFunction(f interface{}) (string, error) { err := CheckValidTask(f) if err != nil { return "", err } return runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), nil } // GetTaskNameFromInterface retrieves the name of the task from the provided interface. // It checks if the task is a string or a function and returns its name accordingly. func GetTaskNameFromInterface(task interface{}) (string, error) { if taskNameString, ok := task.(string); ok { return taskNameString, nil } return GetTaskNameFromFunction(task) } // GetInputParametersFromTask retrieves the input parameters of the provided task. // It checks if the task is a valid function and returns its input parameter types. func GetInputParametersFromTask(task interface{}) ([]reflect.Type, error) { inputCount := reflect.TypeOf(task).NumIn() inputParameters := []reflect.Type{} for i := 0; i < inputCount; i++ { inputParameters = append(inputParameters, reflect.TypeOf(task).In(i)) } return inputParameters, nil } // GetOutputParametersFromTask retrieves the output parameters of the provided task. // It checks if the task is a valid function and returns its output parameter types. func GetOutputParametersFromTask(task interface{}) ([]reflect.Type, error) { outputCount := reflect.TypeOf(task).NumOut() outputParameters := []reflect.Type{} for i := 0; i < outputCount; i++ { outputParameters = append(outputParameters, reflect.TypeOf(task).Out(i)) } return outputParameters, nil }
package model import ( "database/sql/driver" "encoding/json" "errors" "fmt" "reflect" "strings" "time" "github.com/google/uuid" "github.com/siherrmann/queuer/helper" ) const ( // Job statuses before processing JobStatusQueued = "QUEUED" JobStatusScheduled = "SCHEDULED" // Running status is used when the job is being processed by a worker. JobStatusRunning = "RUNNING" // Job statuses after processing JobStatusFailed = "FAILED" JobStatusSucceeded = "SUCCEEDED" JobStatusCancelled = "CANCELLED" ) type Parameters []interface{} func (c Parameters) Value() (driver.Value, error) { return c.Marshal() } func (c *Parameters) Scan(value interface{}) error { return c.Unmarshal(value) } func (r Parameters) Marshal() ([]byte, error) { return json.Marshal(r) } func (r *Parameters) Unmarshal(value interface{}) error { if s, ok := value.(Parameters); ok { *r = Parameters(s) } else { b, ok := value.([]byte) if !ok { return errors.New("type assertion to []byte failed") } return json.Unmarshal(b, r) } return nil } func (r *Parameters) ToReflectValues() []reflect.Value { if r == nil { return []reflect.Value{} } reflectValues := []reflect.Value{} for _, p := range *r { reflectValues = append(reflectValues, reflect.ValueOf(p)) } return reflectValues } func (r *Parameters) ToInterfaceSlice() []interface{} { if r == nil { return []interface{}{} } interfaceSlice := make([]interface{}, len(*r)) copy(interfaceSlice, *r) return interfaceSlice } // Job represents an assigned task to a worker. // It contains all necessary information to execute the task, // including OnError and Schedule options, parameters, and status. // // ID, RID, CreatedAt, and UpdatedAt are set automatically on creation. // // Status, ScheduledAt, StartedAt, ScheduleCount, Attempts, // Results, Error, CreatedAt, and UpdatedAt are set automatically on update. type Job struct { ID int `json:"id"` RID uuid.UUID `json:"rid"` WorkerID int `json:"worker_id"` WorkerRID uuid.UUID `json:"worker_rid"` Options *Options `json:"options"` TaskName string `json:"task_name"` Parameters Parameters `json:"parameters"` Status string `json:"status"` ScheduledAt *time.Time `json:"scheduled_at"` StartedAt *time.Time `json:"started_at"` ScheduleCount int `json:"schedule_count"` Attempts int `json:"attempts"` Results Parameters `json:"result"` Error string `json:"error"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } // NewJob creates a new Job instance with the provided task, options, and parameters. // It validates the task name and options, and initializes the job status and scheduled time if applicable. // It returns a pointer to the new Job instance or an error if something is invalid. func NewJob(task interface{}, options *Options, parameters ...interface{}) (*Job, error) { taskName, err := helper.GetTaskNameFromInterface(task) if err != nil { return nil, fmt.Errorf("error getting task name: %v", err) } if len(taskName) == 0 || len(taskName) > 100 { return nil, fmt.Errorf("taskName must have a length between 1 and 100") } if options != nil && options.OnError != nil { err := options.OnError.IsValid() if err != nil { return nil, fmt.Errorf("invalid OnError options: %v", err) } } if options != nil && options.Schedule != nil { err := options.Schedule.IsValid() if err != nil { return nil, fmt.Errorf("invalid Schedule options: %v", err) } } status := JobStatusQueued var scheduledAt time.Time if options != nil && options.Schedule != nil { status = JobStatusScheduled scheduledAt = options.Schedule.Start.UTC() } return &Job{ TaskName: taskName, Status: status, ScheduledAt: &scheduledAt, Options: options, Parameters: parameters, }, nil } // JobFromNotification represents a job received from a notification. // It contains all fields from Job, but with DBTime // for time fields to handle database-specific time formats. type JobFromNotification struct { ID int `json:"id"` RID uuid.UUID `json:"rid"` WorkerID int `json:"worker_id"` WorkerRID uuid.UUID `json:"worker_rid"` Options *Options `json:"options"` TaskName string `json:"task_name"` Parameters Parameters `json:"parameters"` Status string `json:"status"` ScheduledAt DBTime `json:"scheduled_at"` StartedAt DBTime `json:"started_at"` Attempts int `json:"attempts"` Results Parameters `json:"result"` Error string `json:"error"` CreatedAt DBTime `json:"created_at"` UpdatedAt DBTime `json:"updated_at"` } // ToJob converts a JobFromNotification to a Job instance. func (jn *JobFromNotification) ToJob() *Job { return &Job{ ID: jn.ID, RID: jn.RID, WorkerID: jn.WorkerID, WorkerRID: jn.WorkerRID, Options: jn.Options, TaskName: jn.TaskName, Parameters: jn.Parameters, Status: jn.Status, ScheduledAt: &jn.ScheduledAt.Time, StartedAt: &jn.StartedAt.Time, Attempts: jn.Attempts, Results: jn.Results, Error: jn.Error, CreatedAt: jn.CreatedAt.Time, UpdatedAt: jn.UpdatedAt.Time, } } // DBTime is a custom time type for handling database-specific time formats. type DBTime struct { time.Time } const dbTimeLayoutWithoutZeroes = "2006-01-02T15:04:05." const dbTimeLayout = "2006-01-02T15:04:05.000000" func (ct DBTime) MarshalJSON() ([]byte, error) { if ct.Time.IsZero() { return []byte("null"), nil } return []byte(fmt.Sprintf("\"%s\"", ct.Time.Format(dbTimeLayout))), nil } func (ct *DBTime) UnmarshalJSON(b []byte) error { s := strings.Trim(string(b), "\"") // Handle db null value and zero time if s == "null" || s == "0001-01-01T00:00:00" { ct.Time = time.Time{} return nil } tSplit := strings.Split(s, ".") if len(tSplit) != 2 { return fmt.Errorf("invalid time format: %s", s) } var err error ct.Time, err = time.Parse(dbTimeLayoutWithoutZeroes+strings.Repeat("0", len(tSplit[1])), s) if err != nil { return fmt.Errorf("error parsing time: %s, error: %w", s, err) } return nil } func (ct *DBTime) IsSet() bool { return !ct.IsZero() }
package model import ( "database/sql/driver" "encoding/json" "errors" "time" "github.com/google/uuid" ) type Master struct { ID int `json:"id"` Settings MasterSettings `json:"settings"` WorkerID int `json:"worker_id"` WorkerRID uuid.UUID `json:"worker_rid"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } type MasterSettings struct { // MasterLockTimeout is the duration after which a master lock // is considered stale and a new worker can get master. MasterLockTimeout time.Duration `json:"master_lock_timeout"` // MasterPollInterval is the interval at which the master worker // updates the master entry to stay master. MasterPollInterval time.Duration `json:"master_poll_interval"` // RetentionArchive is the duration for which archived data is retained. RetentionArchive time.Duration `json:"retention_archive"` } func (c MasterSettings) Value() (driver.Value, error) { return c.Marshal() } func (c *MasterSettings) Scan(value interface{}) error { return c.Unmarshal(value) } func (r MasterSettings) Marshal() ([]byte, error) { return json.Marshal(r) } func (r *MasterSettings) Unmarshal(value interface{}) error { if o, ok := value.(MasterSettings); ok { *r = o } else { b, ok := value.([]byte) if !ok { return errors.New("type assertion to []byte failed") } return json.Unmarshal(b, r) } return nil }
package model import ( "database/sql/driver" "encoding/json" "errors" ) type Options struct { OnError *OnError `json:"on_error,omitempty"` Schedule *Schedule `json:"schedule,omitempty"` } func (c *Options) IsValid() error { // nil options are considered valid if c == nil { return nil } if c.OnError != nil { if err := c.OnError.IsValid(); err != nil { return err } } if c.Schedule != nil { if err := c.Schedule.IsValid(); err != nil { return err } } return nil } func (c Options) Value() (driver.Value, error) { return c.Marshal() } func (c *Options) Scan(value interface{}) error { return c.Unmarshal(value) } func (r Options) Marshal() ([]byte, error) { return json.Marshal(r) } func (r *Options) Unmarshal(value interface{}) error { if o, ok := value.(Options); ok { *r = o } else { b, ok := value.([]byte) if !ok { return errors.New("type assertion to []byte failed") } return json.Unmarshal(b, r) } return nil }
package model import ( "database/sql/driver" "encoding/json" "errors" ) const ( RETRY_BACKOFF_NONE = "none" RETRY_BACKOFF_LINEAR = "linear" RETRY_BACKOFF_EXPONENTIAL = "exponential" ) // OptionsOnError represents the options to handle errors during job execution. // It includes timeout, maximum retries, retry delay, and retry backoff strategy. // It is used to define how the system should behave when a job fails. // // Parameters: // - Timeout is the maximum time in seconds to wait for a job to complete before considering it failed. // - MaxRetries is the maximum number of retries to attempt if a job fails. // - RetryDelay is the delay in seconds before retrying a failed job. // - RetryBackoff is the strategy to use for retrying failed jobs. It can be one of the following: // - none: no retry, the job will fail immediately. // - linear: retry with a fixed delay. // - exponential: retry with an exponentially increasing delay. // If RetryBackoff is not provided, it defaults to "none". type OnError struct { Timeout float64 `json:"timeout"` MaxRetries int `json:"max_retries"` RetryDelay float64 `json:"retry_delay"` RetryBackoff string `json:"retry_backoff"` } // IsValid checks if the OnError options are valid. // Timeout, MaxRetries, and RetryDelay must be non-negative. // RetryBackoff must be one of the predefined strategies: none, linear, or exponential. // It returns an error if any of the conditions are not met. func (c *OnError) IsValid() error { if c.Timeout < 0 { return errors.New("timeout cannot be negative") } if c.MaxRetries < 0 { return errors.New("max retries cannot be negative") } if c.RetryDelay < 0 { return errors.New("retry delay cannot be negative") } if c.RetryBackoff != RETRY_BACKOFF_NONE && c.RetryBackoff != RETRY_BACKOFF_LINEAR && c.RetryBackoff != RETRY_BACKOFF_EXPONENTIAL { return errors.New("retry backoff must be one of: none, linear, exponential") } return nil } func (c OnError) Value() (driver.Value, error) { return c.Marshal() } func (c *OnError) Scan(value interface{}) error { return c.Unmarshal(value) } func (r OnError) Marshal() ([]byte, error) { return json.Marshal(r) } func (r *OnError) Unmarshal(value interface{}) error { if o, ok := value.(OnError); ok { *r = o } else { b, ok := value.([]byte) if !ok { return errors.New("type assertion to []byte failed") } return json.Unmarshal(b, r) } return nil }
package model import ( "database/sql/driver" "encoding/json" "errors" "time" ) // NextIntervalFunc defines a function type that calculates the next interval // based on the start time and the current count of executions. type NextIntervalFunc func(start time.Time, currentCount int) time.Time // Schedule represents the scheduling options for a job. // It includes the start time, maximum count of executions, interval between executions, // and an optional next interval function to calculate the next execution time. // It is used to define how often a job should be executed. // // Parameters: // - Start is the time when the job should start executing. // - If MaxCount is 0, the job will run indefinitely. // - If MaxCount greater 0, the job will run MaxCount times. // - Interval is the duration between executions. // - If MaxCount is equal or greater than 0, Interval must be greater than zero or NextInterval must be provided. // - If MaxCount is 1, NextInterval can be provided to specify the next execution time. type Schedule struct { Start time.Time `json:"start"` MaxCount int `json:"max_count"` Interval time.Duration `json:"interval"` NextInterval string `json:"next_interval,omitempty"` } // IsValid checks if the Schedule options are valid. // Start time must not be zero, MaxCount must be non-negative, // Interval must be greater than zero if MaxCount is greater than 1, // or NextInterval must be provided. func (c *Schedule) IsValid() error { if c.Start.IsZero() { return errors.New("start time cannot be zero") } if c.MaxCount < 0 { return errors.New("maxCount must be greater than or equal to 0") } if c.MaxCount > 1 && c.Interval <= time.Duration(0) && c.NextInterval == "" { return errors.New("interval must be greater than zero or nextInterval must be provided") } return nil } func (c Schedule) Value() (driver.Value, error) { return c.Marshal() } func (c *Schedule) Scan(value interface{}) error { return c.Unmarshal(value) } func (r Schedule) Marshal() ([]byte, error) { return json.Marshal(r) } func (r *Schedule) Unmarshal(value interface{}) error { if o, ok := value.(Schedule); ok { *r = o } else { b, ok := value.([]byte) if !ok { return errors.New("type assertion to []byte failed") } return json.Unmarshal(b, r) } return nil }
package model import ( "fmt" "reflect" "github.com/siherrmann/queuer/helper" ) // Task represents a job task with its function, name, input parameters, and output parameters. // It is used to define a job that can be executed by the queuer system. // // Parameters: // - Task is the function that will be executed as a job. // - Name is the name of the task, which should be unique and descriptive. // - InputParameters is a slice of reflect.Type representing the types of input parameters for the task. type Task struct { Task interface{} Name string InputParameters []reflect.Type OutputParameters []reflect.Type } func NewTask(task interface{}) (*Task, error) { taskName, err := helper.GetTaskNameFromFunction(task) if err != nil { return nil, fmt.Errorf("error getting task name: %v", err) } return NewTaskWithName(task, taskName) } func NewTaskWithName(task interface{}, taskName string) (*Task, error) { if len(taskName) == 0 || len(taskName) > 100 { return nil, fmt.Errorf("taskName must have a length between 1 and 100") } err := helper.CheckValidTask(task) if err != nil { return nil, fmt.Errorf("task must be a function, got %s", reflect.TypeOf(task).Kind()) } inputParameters := []reflect.Type{} inputCount := reflect.TypeOf(task).NumIn() for i := 0; i < inputCount; i++ { inputParameters = append(inputParameters, reflect.TypeOf(task).In(i)) } outputParameters := []reflect.Type{} outputCount := reflect.TypeOf(task).NumOut() for i := 0; i < outputCount; i++ { outputParameters = append(outputParameters, reflect.TypeOf(task).Out(i)) } return &Task{ Task: task, Name: taskName, InputParameters: inputParameters, OutputParameters: outputParameters, }, nil }
package model import ( "fmt" "time" "github.com/google/uuid" ) const ( WorkerStatusRunning = "RUNNING" WorkerStatusFailed = "FAILED" ) // Worker represents a worker that can execute tasks. // It includes the worker's ID, name, options for error handling, maximum concurrency, // available tasks, and status. // // ID, RID, Status, CreatedAt, and UpdatedAt are set automatically on creation. // // Parameters: // - Name is the name of the worker, which should be unique and descriptive. // - Options is an optional field that can be used to specify error handling options. // If the Job has options set, the Job options are used as primary options. // - MaxConcurrency is the maximum number of tasks that can be executed concurrently by the worker. // - AvailableTasks is a list of task names that the worker can execute. // - AvailableNextIntervalFuncs is a list of next interval functions that the worker can use for type Worker struct { ID int `json:"id"` RID uuid.UUID `json:"rid"` Name string `json:"name"` Options *OnError `json:"options,omitempty"` MaxConcurrency int `json:"max_concurrency"` AvailableTasks []string `json:"available_tasks,omitempty"` AvailableNextIntervalFuncs []string `json:"available_next_interval,omitempty"` Status string `json:"status"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } // NewWorker creates a new Worker with the specified name and maximum concurrency. // It validates the name and maximum concurrency, and initializes the worker status to running. // It returns a pointer to the new Worker instance or an error if something is invalid. func NewWorker(name string, maxConcurrency int) (*Worker, error) { if len(name) == 0 || len(name) > 100 { return nil, fmt.Errorf("name must have a length between 1 and 100") } if maxConcurrency < 1 || maxConcurrency > 100 { return nil, fmt.Errorf("maxConcurrency must be between 1 and 100") } return &Worker{ Name: name, MaxConcurrency: maxConcurrency, Status: WorkerStatusRunning, }, nil } // NewWorkerWithOptions creates a new Worker with the specified name, maximum concurrency, and error handling options. // It validates the name, maximum concurrency, and options, and initializes the worker status to running. // It returns a pointer to the new Worker instance or an error if something is invalid. func NewWorkerWithOptions(name string, maxConcurrency int, options *OnError) (*Worker, error) { if len(name) == 0 || len(name) > 100 { return nil, fmt.Errorf("name must have a length between 1 and 100") } if maxConcurrency < 1 || maxConcurrency > 1000 { return nil, fmt.Errorf("maxConcurrency must be between 1 and 100") } err := options.IsValid() if err != nil { return nil, fmt.Errorf("invalid options: %w", err) } return &Worker{ Name: name, Options: options, MaxConcurrency: maxConcurrency, Status: WorkerStatusRunning, }, nil }
package queuer import ( "context" "database/sql" "encoding/json" "fmt" "log" "os" "sync" "time" "github.com/siherrmann/queuer/core" "github.com/siherrmann/queuer/database" "github.com/siherrmann/queuer/helper" "github.com/siherrmann/queuer/model" ) // Queuer represents the main queuing system. // It manages job scheduling, execution, and error handling. // It provides methods to start, stop, and manage jobs and workers. // It also handles database connections and listeners for job events. type Queuer struct { DB *sql.DB JobPollInterval time.Duration RetentionArchive time.Duration // Context ctx context.Context cancel context.CancelFunc // Runners activeRunners sync.Map // Logger log *log.Logger // Worker worker *model.Worker // DBs dbConfig *helper.DatabaseConfiguration dbJob database.JobDBHandlerFunctions dbWorker database.WorkerDBHandlerFunctions dbMaster database.MasterDBHandlerFunctions // Job DB listeners jobDbListener *database.QueuerListener jobArchiveDbListener *database.QueuerListener // Job listeners jobInsertListener *core.Listener[*model.Job] jobUpdateListener *core.Listener[*model.Job] jobDeleteListener *core.Listener[*model.Job] // Available functions tasks map[string]*model.Task nextIntervalFuncs map[string]model.NextIntervalFunc } // NewQueuer creates a new Queuer instance with the given name and max concurrency. // It wraps NewQueuerWithDB to initialize the queuer without an external db config. func NewQueuer(name string, maxConcurrency int, options ...*model.OnError) *Queuer { return NewQueuerWithDB(name, maxConcurrency, nil, options...) } // NewQueuer creates a new Queuer instance with the given name and max concurrency. // It initializes the database connection and worker. // If options are provided, it creates a worker with those options. // If any error occurs during initialization, it logs a panic error and exits the program. // It returns a pointer to the newly created Queuer instance. func NewQueuerWithDB(name string, maxConcurrency int, dbConfig *helper.DatabaseConfiguration, options ...*model.OnError) *Queuer { // Logger logger := log.New(os.Stdout, "Queuer: ", log.Ltime) // Database var err error var dbCon *helper.Database if dbConfig != nil { dbCon = helper.NewDatabase( "queuer", dbConfig, ) } else { var err error dbConfig, err = helper.NewDatabaseConfiguration() if err != nil { logger.Panicf("failed to create database configuration: %v", err) } dbCon = helper.NewDatabase( "queuer", dbConfig, ) } // DBs dbJob, err := database.NewJobDBHandler(dbCon, dbConfig.WithTableDrop) if err != nil { logger.Panicf("failed to create job db handler: %v", err) } dbWorker, err := database.NewWorkerDBHandler(dbCon, dbConfig.WithTableDrop) if err != nil { logger.Panicf("failed to create worker db handler: %v", err) } dbMaster, err := database.NewMasterDBHandler(dbCon, dbConfig.WithTableDrop) if err != nil { logger.Panicf("failed to create master db handler: %v", err) } // Inserting worker var newWorker *model.Worker if len(options) > 0 { newWorker, err = model.NewWorkerWithOptions(name, maxConcurrency, options[0]) if err != nil { logger.Panicf("error creating new worker with options: %v", err) } } else { newWorker, err = model.NewWorker(name, maxConcurrency) if err != nil { logger.Panicf("error creating new worker: %v", err) } } // Worker worker, err := dbWorker.InsertWorker(newWorker) if err != nil { logger.Panicf("error inserting worker: %v", err) } logger.Printf("Queuer %s created with worker RID: %v", newWorker.Name, worker.RID) return &Queuer{ log: logger, worker: worker, DB: dbCon.Instance, dbConfig: dbConfig, dbJob: dbJob, dbWorker: dbWorker, dbMaster: dbMaster, JobPollInterval: 1 * time.Minute, tasks: map[string]*model.Task{}, nextIntervalFuncs: map[string]model.NextIntervalFunc{}, } } // Start starts the queuer by initializing the job listeners and starting the job poll ticker. // It checks if the queuer is initialized properly, and if not, it logs a panic error and exits the program. // It runs the job processing in a separate goroutine and listens for job events. // // Detailed steps include: // 1. Create job and job archive database listeners. // 2. Create broadcasters for job insert, update, and delete events. // 3. Start the job listeners to listen for job events. // 4. Start the job poll ticker to periodically check for new jobs. // 5. Wait for the queuer to be ready or log a panic error if it fails to start within 5 seconds. func (q *Queuer) Start(ctx context.Context, cancel context.CancelFunc, masterSettings ...*model.MasterSettings) { if ctx == nil || ctx == context.TODO() || cancel == nil { q.log.Panicln("ctx and cancel must be set") } q.ctx = ctx q.cancel = cancel // DB listeners var err error q.jobDbListener, err = database.NewQueuerDBListener(q.dbConfig, "job") if err != nil { q.log.Panicf("failed to create job insert listener: %v", err) } q.jobArchiveDbListener, err = database.NewQueuerDBListener(q.dbConfig, "job_archive") if err != nil { q.log.Panicf("failed to create job update listener: %v", err) } // Broadcasters for job updates and deletes broadcasterJobInsert := core.NewBroadcaster[*model.Job]("job.INSERT") q.jobInsertListener, err = core.NewListener(broadcasterJobInsert) if err != nil { q.log.Panicf("failed to create job insert listener: %v", err) } broadcasterJobUpdate := core.NewBroadcaster[*model.Job]("job.UPDATE") q.jobUpdateListener, err = core.NewListener(broadcasterJobUpdate) if err != nil { q.log.Panicf("failed to create job update listener: %v", err) } broadcasterJobDelete := core.NewBroadcaster[*model.Job]("job.DELETE") q.jobDeleteListener, err = core.NewListener(broadcasterJobDelete) if err != nil { q.log.Panicf("failed to create job update listener: %v", err) } // Start pollers ready := make(chan struct{}) go func() { q.listen(ctx, cancel) err := q.pollJobTicker(ctx) if err != nil && ctx.Err() == nil { q.log.Printf("Error starting job poll ticker: %v", err) return } if len(masterSettings) > 0 && masterSettings[0] != nil { err = q.pollMasterTicker(ctx, masterSettings[0]) if err != nil && ctx.Err() == nil { q.log.Printf("Error starting master poll ticker: %v", err) return } } close(ready) <-ctx.Done() q.log.Println("Queuer stopped") }() select { case <-ready: q.log.Println("Queuer started") return case <-time.After(5 * time.Second): q.log.Panicln("Queuer failed to start within 5 seconds") } } // Start starts the queuer by initializing the job listeners and starting the job poll ticker. // It checks if the queuer is initialized properly, and if not, it logs a panic error and exits the program. // It runs the job processing in a separate goroutine and listens for job events. // // This version does not run the job processing, allowing the queuer to be started without a worker. // Is is useful if you want to run a queuer instance in a seperate service without a worker, // for example to handle listening to job events and providing a central frontend. func (q *Queuer) StartWithoutWorker(ctx context.Context, cancel context.CancelFunc, withoutListeners bool, masterSettings ...*model.MasterSettings) { q.ctx = ctx q.cancel = cancel // Job listeners var err error if !withoutListeners { q.jobDbListener, err = database.NewQueuerDBListener(q.dbConfig, "job") if err != nil { q.log.Panicf("failed to create job insert listener: %v", err) } q.jobArchiveDbListener, err = database.NewQueuerDBListener(q.dbConfig, "job_archive") if err != nil { q.log.Panicf("failed to create job update listener: %v", err) } } // Broadcasters for job updates and deletes broadcasterJobInsert := core.NewBroadcaster[*model.Job]("job.INSERT") q.jobInsertListener, err = core.NewListener(broadcasterJobInsert) if err != nil { q.log.Panicf("failed to create job insert listener: %v", err) } broadcasterJobUpdate := core.NewBroadcaster[*model.Job]("job.UPDATE") q.jobUpdateListener, err = core.NewListener(broadcasterJobUpdate) if err != nil { q.log.Panicf("failed to create job update listener: %v", err) } broadcasterJobDelete := core.NewBroadcaster[*model.Job]("job.DELETE") q.jobDeleteListener, err = core.NewListener(broadcasterJobDelete) if err != nil { q.log.Panicf("failed to create job update listener: %v", err) } // Start job listeners ready := make(chan struct{}) go func() { if !withoutListeners { q.listenWithoutRunning(ctx, cancel) } if len(masterSettings) > 0 && masterSettings[0] != nil { err = q.pollMasterTicker(ctx, masterSettings[0]) if err != nil && ctx.Err() == nil { q.log.Printf("Error starting master poll ticker: %v", err) return } } close(ready) <-ctx.Done() q.log.Println("Queuer stopped") }() select { case <-ready: q.log.Println("Queuer without worker started") return case <-time.After(5 * time.Second): q.log.Panicln("Queuer failed to start within 5 seconds") } } // Stop stops the queuer by closing the job listeners, cancelling all queued and running jobs, // and cancelling the context to stop the queuer. func (q *Queuer) Stop() error { // Close db listeners if q.jobDbListener != nil { err := q.jobDbListener.Listener.Close() if err != nil { return fmt.Errorf("error closing job insert listener: %v", err) } } if q.jobArchiveDbListener != nil { err := q.jobArchiveDbListener.Listener.Close() if err != nil { return fmt.Errorf("error closing job update listener: %v", err) } } // Cancel all queued and running jobs err := q.CancelAllJobsByWorker(q.worker.RID, 100) if err != nil { return fmt.Errorf("error cancelling all jobs by worker: %v", err) } // Cancel the context to stop the queuer if q.ctx != nil { q.cancel() } q.log.Println("Queuer stopped") return nil } // Internal // listen listens to job events and runs the initial job processing. func (q *Queuer) listen(ctx context.Context, cancel context.CancelFunc) { readyJob := make(chan struct{}) readyJobArchive := make(chan struct{}) go func() { close(readyJob) q.jobDbListener.Listen(ctx, cancel, func(data string) { job := &model.JobFromNotification{} err := json.Unmarshal([]byte(data), job) if err != nil { q.log.Printf("Error unmarshalling job data: %v", err) return } if job.Status == model.JobStatusQueued || job.Status == model.JobStatusScheduled { q.jobInsertListener.Notify(job.ToJob()) err = q.runJobInitial() if err != nil { q.log.Printf("Error running job: %v", err) return } } else { q.jobUpdateListener.Notify(job.ToJob()) } }) }() <-readyJob go func() { close(readyJobArchive) q.jobArchiveDbListener.Listen(ctx, cancel, func(data string) { job := &model.JobFromNotification{} err := json.Unmarshal([]byte(data), job) if err != nil { q.log.Printf("Error unmarshalling job data: %v", err) return } q.jobDeleteListener.Notify(job.ToJob()) }) }() <-readyJobArchive } func (q *Queuer) listenWithoutRunning(ctx context.Context, cancel context.CancelFunc) { readyJob := make(chan struct{}) readyJobArchive := make(chan struct{}) go func() { close(readyJob) q.jobDbListener.Listen(ctx, cancel, func(data string) { job := &model.JobFromNotification{} err := json.Unmarshal([]byte(data), job) if err != nil { q.log.Printf("Error unmarshalling job data: %v", err) return } if job.Status == model.JobStatusQueued || job.Status == model.JobStatusScheduled { q.jobInsertListener.Notify(job.ToJob()) } else { q.jobUpdateListener.Notify(job.ToJob()) } }) }() <-readyJob go func() { close(readyJobArchive) q.jobArchiveDbListener.Listen(ctx, cancel, func(data string) { job := &model.JobFromNotification{} err := json.Unmarshal([]byte(data), job) if err != nil { q.log.Printf("Error unmarshalling job data: %v", err) return } q.jobDeleteListener.Notify(job.ToJob()) }) }() <-readyJobArchive } func (q *Queuer) pollJobTicker(ctx context.Context) error { ticker, err := core.NewTicker( q.JobPollInterval, func() { q.log.Println("Polling jobs...") err := q.runJobInitial() if err != nil { q.log.Printf("Error running job: %v", err) } }, ) if err != nil { return fmt.Errorf("error creating ticker: %v", err) } q.log.Println("Starting job poll ticker...") go ticker.Go(ctx) return nil } func (q *Queuer) pollMasterTicker(ctx context.Context, masterSettings *model.MasterSettings) error { ctxInner, cancel := context.WithCancel(ctx) ticker, err := core.NewTicker( masterSettings.MasterPollInterval, func() { q.log.Println("Polling master...") master, err := q.dbMaster.UpdateMaster(q.worker, masterSettings) if err != nil { q.log.Printf("Error updating master: %v", err) } if master != nil { q.log.Printf("Worker %v is now the current master", q.worker.RID) err := q.masterTicker(ctx, master, masterSettings) if err != nil { q.log.Printf("Error starting master ticker: %v", err) } else { cancel() } } }, ) if err != nil { return fmt.Errorf("error creating ticker: %v", err) } q.log.Println("Starting master poll ticker...") go ticker.Go(ctxInner) return nil } func (q *Queuer) masterTicker(ctx context.Context, oldMaster *model.Master, masterSettings *model.MasterSettings) error { if oldMaster == nil { return fmt.Errorf("old master is nil") } if oldMaster.Settings.RetentionArchive == 0 { err := q.dbJob.AddRetentionArchive(masterSettings.RetentionArchive) if err != nil { return fmt.Errorf("error adding retention archive: %v", err) } } else if oldMaster.Settings.RetentionArchive != masterSettings.RetentionArchive { err := q.dbJob.RemoveRetentionArchive() if err != nil { return fmt.Errorf("error removing retention archive: %v", err) } err = q.dbJob.AddRetentionArchive(masterSettings.RetentionArchive) if err != nil { return fmt.Errorf("error adding retention archive: %v", err) } } ctxInner, cancel := context.WithCancel(ctx) ticker, err := core.NewTicker( masterSettings.MasterPollInterval, func() { _, err := q.dbMaster.UpdateMaster(q.worker, masterSettings) if err != nil { err := q.pollMasterTicker(ctx, masterSettings) if err != nil { q.log.Printf("Error restarting poll master ticker: %v", err) } cancel() } // Here we can add any additional logic that needs to run periodically while the worker is master. // This could include stale jobs, cleaning up the job database etc. }, ) if err != nil { return fmt.Errorf("error creating ticker: %v", err) } q.log.Println("Starting master poll ticker...") go ticker.Go(ctxInner) return nil }
package queuer import ( "database/sql" "fmt" "log" "time" "github.com/google/uuid" "github.com/siherrmann/queuer/core" "github.com/siherrmann/queuer/model" ) // AddJob adds a job to the queue with the given task and parameters. // As a task you can either pass a function or a string with the task name // (necessary if you want to use a task with a name set by you). // It returns the created job or an error if something goes wrong. func (q *Queuer) AddJob(task interface{}, parameters ...interface{}) (*model.Job, error) { options := q.mergeOptions(nil) job, err := q.addJob(task, options, parameters...) if err != nil { return nil, fmt.Errorf("error adding job: %v", err) } q.log.Printf("Job added with RID %v", job.RID) return job, nil } // AddJobTx adds a job to the queue with the given task and parameters within a transaction. // As a task you can either pass a function or a string with the task name // (necessary if you want to use a task with a name set by you). // It returns the created job or an error if something goes wrong. func (q *Queuer) AddJobTx(tx *sql.Tx, task interface{}, parameters ...interface{}) (*model.Job, error) { options := q.mergeOptions(nil) job, err := q.addJobTx(tx, task, options, parameters...) if err != nil { return nil, fmt.Errorf("error adding job: %v", err) } q.log.Printf("Job added with RID %v", job.RID) return job, nil } /* AddJobWithOptions adds a job with the given task, options, and parameters. As a task you can either pass a function or a string with the task name (necessary if you want to use a task with a name set by you). It returns the created job or an error if something goes wrong. The options parameter allows you to specify additional options for the job, such as scheduling, retry policies, and error handling. If options are nil, the worker's default options will be used. Example usage: func AddJobExample(queuer *Queuer, param1 string, param2 int) { options := &model.Options{ OnError: &model.OnError{ Timeout: 5, MaxRetries: 2, Runs 3 times, first is not a retry RetryDelay: 1, RetryBackoff: model.RETRY_BACKOFF_NONE, }, Schedule: &model.Schedule{ Start: time.Now().Add(10 * time.Second), Interval: 5 * time.Second, MaxCount: 3, }, } job, err := queuer.AddJobWithOptions(options, myTaskFunction, param1, param2) if err != nil { log.Fatalf("Failed to add job: %v", err) } } */ func (q *Queuer) AddJobWithOptions(options *model.Options, task interface{}, parameters ...interface{}) (*model.Job, error) { q.mergeOptions(options) job, err := q.addJob(task, options, parameters...) if err != nil { return nil, fmt.Errorf("error adding job: %v", err) } q.log.Printf("Job with options added with RID %v", job.RID) return job, nil } // AddJobWithOptionsTx adds a job with the given task, options, and parameters within a transaction. // As a task you can either pass a function or a string with the task name // (necessary if you want to use a task with a name set by you). // It returns the created job or an error if something goes wrong. func (q *Queuer) AddJobWithOptionsTx(tx *sql.Tx, options *model.Options, task interface{}, parameters ...interface{}) (*model.Job, error) { q.mergeOptions(options) job, err := q.addJobTx(tx, task, options, parameters...) if err != nil { return nil, fmt.Errorf("error adding job: %v", err) } q.log.Printf("Job with options added with RID %v", job.RID) return job, nil } // AddJobs adds a batch of jobs to the queue. // It takes a slice of BatchJob, which contains the task, options, and parameters for each job. // It returns an error if something goes wrong during the process. func (q *Queuer) AddJobs(batchJobs []model.BatchJob) error { var jobs []*model.Job for _, batchJob := range batchJobs { options := q.mergeOptions(batchJob.Options) newJob, err := model.NewJob(batchJob.Task, options, batchJob.Parameters...) if err != nil { return fmt.Errorf("error creating job with job options: %v", err) } jobs = append(jobs, newJob) } err := q.dbJob.BatchInsertJobs(jobs) if err != nil { return fmt.Errorf("error inserting jobs: %v", err) } q.log.Printf("%v jobs added", len(jobs)) return nil } // WaitForJobAdded waits for any job to start and returns the job. // It listens for job insert events and returns the job when it is added to the queue. func (q *Queuer) WaitForJobAdded() *model.Job { jobStarted := make(chan *model.Job, 1) outerReady := make(chan struct{}) ready := make(chan struct{}) go func() { close(outerReady) q.jobInsertListener.Listen(q.ctx, ready, func(job *model.Job) { jobStarted <- job }) }() <-outerReady <-ready for { select { case job := <-jobStarted: return job case <-q.ctx.Done(): return nil } } } // WaitForJobFinished waits for a job to finish and returns the job. // It listens for job delete events and returns the job when it is finished. func (q *Queuer) WaitForJobFinished(jobRid uuid.UUID) *model.Job { jobFinished := make(chan *model.Job, 1) outerReady := make(chan struct{}) ready := make(chan struct{}) go func() { close(outerReady) q.jobDeleteListener.Listen(q.ctx, ready, func(job *model.Job) { if job.RID == jobRid { jobFinished <- job } }) }() <-outerReady <-ready for { select { case job := <-jobFinished: return job case <-q.ctx.Done(): return nil } } } // CancelJob cancels a job with the given job RID. // It retrieves the job from the database and cancels it. // If the job is not found or already cancelled, it returns an error. func (q *Queuer) CancelJob(jobRid uuid.UUID) (*model.Job, error) { job, err := q.dbJob.SelectJob(jobRid) if err != nil { return nil, fmt.Errorf("error selecting job with rid %v, but already cancelled: %v", jobRid, err) } err = q.cancelJob(job) if err != nil { return nil, fmt.Errorf("error cancelling job with rid %v: %v", jobRid, err) } return job, nil } // CancelAllJobsByWorker cancels all jobs assigned to a specific worker by its RID. // It retrieves all jobs assigned to the worker and cancels each one. // It returns an error if something goes wrong during the process. func (q *Queuer) CancelAllJobsByWorker(workerRid uuid.UUID, entries int) error { jobs, err := q.dbJob.SelectAllJobsByWorkerRID(workerRid, 0, entries) if err != nil { return fmt.Errorf("error selecting jobs by worker RID %v: %v", workerRid, err) } for _, job := range jobs { err := q.cancelJob(job) if err != nil { return fmt.Errorf("error cancelling job with rid %v: %v", job.RID, err) } } return nil } // ReaddJobFromArchive readds a job from the archive back to the queue. func (q *Queuer) ReaddJobFromArchive(jobRid uuid.UUID) (*model.Job, error) { job, err := q.dbJob.SelectJobFromArchive(jobRid) if err != nil { return nil, fmt.Errorf("error selecting job from archive with rid %v: %v", jobRid, err) } // Readd the job to the queue newJob, err := q.AddJobWithOptions(job.Options, job.TaskName, job.Parameters...) if err != nil { return nil, fmt.Errorf("error readding job: %v", err) } q.log.Printf("Job readded with RID %v", newJob.RID) return newJob, nil } // GetJob retrieves a job by its RID. func (q *Queuer) GetJob(jobRid uuid.UUID) (*model.Job, error) { job, err := q.dbJob.SelectJob(jobRid) if err != nil { return nil, fmt.Errorf("error selecting job with rid %v: %v", jobRid, err) } return job, nil } // GetJobs retrieves all jobs in the queue. func (q *Queuer) GetJobs(lastId int, entries int) ([]*model.Job, error) { jobs, err := q.dbJob.SelectAllJobs(lastId, entries) if err != nil { return nil, fmt.Errorf("error selecting all jobs: %v", err) } return jobs, nil } // GetJobsEnded retrieves all jobs that have ended (succeeded, cancelled or failed). func (q *Queuer) GetJobsEnded(lastId int, entries int) ([]*model.Job, error) { jobs, err := q.dbJob.SelectAllJobsFromArchive(lastId, entries) if err != nil { return nil, fmt.Errorf("error selecting ended jobs: %v", err) } return jobs, nil } // GetJobsByWorkerRID retrieves jobs assigned to a specific worker by its RID. func (q *Queuer) GetJobsByWorkerRID(workerRid uuid.UUID, lastId int, entries int) ([]*model.Job, error) { jobs, err := q.dbJob.SelectAllJobsByWorkerRID(workerRid, lastId, entries) if err != nil { return nil, fmt.Errorf("error selecting jobs by worker RID %v: %v", workerRid, err) } return jobs, nil } // Internal // mergeOptions merges the worker options with optional job options. func (q *Queuer) mergeOptions(options *model.Options) *model.Options { if options != nil && options.OnError == nil { options.OnError = q.worker.Options } else if options == nil && q.worker.Options != nil { options = &model.Options{OnError: q.worker.Options} } return options } // addJob adds a job to the queue with all necessary parameters. func (q *Queuer) addJob(task interface{}, options *model.Options, parameters ...interface{}) (*model.Job, error) { newJob, err := model.NewJob(task, options, parameters...) if err != nil { return nil, fmt.Errorf("error creating job: %v", err) } job, err := q.dbJob.InsertJob(newJob) if err != nil { return nil, fmt.Errorf("error inserting job: %v", err) } return job, nil } // addJobTx adds a job to the queue with all necessary parameters. func (q *Queuer) addJobTx(tx *sql.Tx, task interface{}, options *model.Options, parameters ...interface{}) (*model.Job, error) { newJob, err := model.NewJob(task, options, parameters...) if err != nil { return nil, fmt.Errorf("error creating job: %v", err) } job, err := q.dbJob.InsertJobTx(tx, newJob) if err != nil { return nil, fmt.Errorf("error inserting job: %v", err) } return job, nil } // runJobInitial is called to run the next job in the queue. func (q *Queuer) runJobInitial() error { // Update job status to running with worker. jobs, err := q.dbJob.UpdateJobsInitial(q.worker) if err != nil { return fmt.Errorf("error updating job status to running: %v", err) } else if len(jobs) == 0 { log.Println("No jobs to run at the moment") return nil } for _, job := range jobs { if job.Options != nil && job.Options.Schedule != nil && job.Options.Schedule.Start.After(time.Now()) { scheduler, err := core.NewScheduler( &job.Options.Schedule.Start, q.runJob, job, ) if err != nil { return fmt.Errorf("error creating scheduler: %v", err) } q.log.Printf("Scheduling job with RID %v to run at %v", job.RID, job.Options.Schedule.Start) go scheduler.Go(q.ctx) } else { go q.runJob(job) } } return nil } // waitForJob executes the job and returns the results or an error. func (q *Queuer) waitForJob(job *model.Job) ([]interface{}, error) { // Run job and update job status to completed with results // TODO At this point the task should be available in the queuer, // but we should probably still check if the task is available? task := q.tasks[job.TaskName] runner, err := core.NewRunnerFromJob(task, job) if err != nil { return []interface{}{}, fmt.Errorf("error creating runner: %v", err) } var results []interface{} q.activeRunners.Store(job.RID, runner) go runner.Run(q.ctx) select { case err = <-runner.ErrorChannel: break case results = <-runner.ResultsChannel: break case <-q.ctx.Done(): runner.Cancel() break } q.activeRunners.Delete(job.RID) return results, err } // retryJob retries the job with the given job error. func (q *Queuer) retryJob(job *model.Job, jobErr error) { if job.Options == nil || job.Options.OnError.MaxRetries <= 0 { q.failJob(job, jobErr) return } var err error var results []interface{} retryer, err := core.NewRetryer( func() error { q.log.Printf("Trying/retrying job with RID %v", job.RID) results, err = q.waitForJob(job) if err != nil { return fmt.Errorf("error retrying job: %v", err) } return nil }, job.Options.OnError, ) if err != nil { jobErr = fmt.Errorf("error creating retryer: %v, original error: %v", err, jobErr) } err = retryer.Retry() if err != nil { q.failJob(job, fmt.Errorf("error retrying job: %v, original error: %v", err, jobErr)) } else { q.succeedJob(job, results) } } // runJob retries the job. func (q *Queuer) runJob(job *model.Job) { q.log.Printf("Running scheduled job with RID %v", job.RID) results, err := q.waitForJob(job) if err != nil { q.retryJob(job, err) } else { q.succeedJob(job, results) } } func (q *Queuer) cancelJob(job *model.Job) error { switch job.Status { case model.JobStatusRunning: jobRunner, found := q.activeRunners.Load(job.RID) if !found { return fmt.Errorf("job with rid %v not found or not running", job.RID) } runner := jobRunner.(*core.Runner) runner.Cancel(func() { job.Status = model.JobStatusCancelled _, err := q.dbJob.UpdateJobFinal(job) if err != nil { q.log.Printf("Error updating job status to cancelled: %v", err) } q.log.Printf("Job cancelled with RID %v", job.RID) }) case model.JobStatusScheduled, model.JobStatusQueued: job.Status = model.JobStatusCancelled _, err := q.dbJob.UpdateJobFinal(job) if err != nil { q.log.Printf("Error updating job status to cancelled: %v", err) } q.log.Printf("Job cancelled with RID %v", job.RID) } return nil } // succeedJob updates the job status to succeeded and runs the next job if available. func (q *Queuer) succeedJob(job *model.Job, results []interface{}) { job.Status = model.JobStatusSucceeded job.Results = results q.endJob(job) } func (q *Queuer) failJob(job *model.Job, jobErr error) { job.Status = model.JobStatusFailed job.Error = jobErr.Error() q.endJob(job) } func (q *Queuer) endJob(job *model.Job) { endedJob, err := q.dbJob.UpdateJobFinal(job) if err != nil { // TODO probably add retry for updating job to failed q.log.Printf("Error updating finished job with status %v: %v", job.Status, err) } else { q.log.Printf("Job ended with status %v and RID %v", endedJob.Status, endedJob.RID) // Readd scheduled jobs to the queue if endedJob.Options != nil && endedJob.Options.Schedule != nil && endedJob.ScheduleCount < endedJob.Options.Schedule.MaxCount { var newScheduledAt time.Time if len(endedJob.Options.Schedule.NextInterval) > 0 { // This worker should only have the current job if the NextIntervalFunc is available. nextIntervalFunc, ok := q.nextIntervalFuncs[endedJob.Options.Schedule.NextInterval] if !ok { q.log.Printf("NextIntervalFunc %v not found for job with RID %v", endedJob.Options.Schedule.NextInterval, endedJob.RID) return } newScheduledAt = nextIntervalFunc(*endedJob.ScheduledAt, endedJob.ScheduleCount) } else { newScheduledAt = endedJob.ScheduledAt.Add(time.Duration(endedJob.ScheduleCount) * endedJob.Options.Schedule.Interval) } endedJob.ScheduledAt = &newScheduledAt endedJob.Status = model.JobStatusScheduled job, err := q.dbJob.InsertJob(endedJob) if err != nil { q.log.Printf("Error readding scheduled job with RID %v to the queue: %v", endedJob.RID, err) } q.log.Printf("Job with RID %v added for next iteration to the queue", job.RID) } } // Try to run the next job in the queue err = q.runJobInitial() if err != nil { q.log.Printf("Error running next job: %v", err) } }
package queuer import ( "fmt" "github.com/siherrmann/queuer/model" ) // ListenForJobInsert listens for job insert events and notifies the provided function when a job is added. func (q *Queuer) ListenForJobUpdate(notifyFunction func(data *model.Job)) error { if q == nil || q.ctx == nil { return fmt.Errorf("cannot listen with uninitialized or not running Queuer") } outerReady := make(chan struct{}) ready := make(chan struct{}) go func() { close(outerReady) q.jobUpdateListener.Listen(q.ctx, ready, notifyFunction) }() <-ready <-outerReady return nil } // ListenForJobInsert listens for job insert events and notifies the provided function when a job is added. func (q *Queuer) ListenForJobDelete(notifyFunction func(data *model.Job)) error { if q == nil || q.ctx == nil { return fmt.Errorf("cannot listen with uninitialized or not running Queuer") } outerReady := make(chan struct{}) ready := make(chan struct{}) go func() { close(outerReady) q.jobDeleteListener.Listen(q.ctx, ready, notifyFunction) }() <-ready <-outerReady return nil }
package queuer import ( "slices" "github.com/siherrmann/queuer/helper" "github.com/siherrmann/queuer/model" ) // AddNextIntervalFunc adds a NextIntervalFunc to the worker's available next interval functions. // It takes a NextIntervalFunc and adds it to the worker's AvailableNextIntervalFuncs. // The function name is derived from the NextIntervalFunc interface using helper.GetTaskNameFromInterface. // The function is meant to be used before starting the Queuer to ensure that the worker has access to the function. // It checks if the function is nil or already exists in the worker's available next interval functions. // // If the function is nil or already exists, it panics. // It returns the updated worker after adding the function. func (q *Queuer) AddNextIntervalFunc(nif model.NextIntervalFunc) *model.Worker { if nif == nil { q.log.Panicf("error adding next interval: NextIntervalFunc cannot be nil") } nifName, err := helper.GetTaskNameFromInterface(nif) if err != nil { q.log.Panicf("error getting function name: %v", err) } if slices.Contains(q.worker.AvailableNextIntervalFuncs, nifName) { q.log.Panicf("NextIntervalFunc with name %v already exists", nifName) } q.nextIntervalFuncs[nifName] = nif q.worker.AvailableNextIntervalFuncs = append(q.worker.AvailableNextIntervalFuncs, nifName) worker, err := q.dbWorker.UpdateWorker(q.worker) if err != nil { q.log.Panicf("error updating worker: %v", err) } q.log.Printf("NextInterval function added with name %v", nifName) return worker } // AddNextIntervalFuncWithName adds a NextIntervalFunc to // the worker's available next interval functions with a specific name. // It takes a NextIntervalFunc and a name, checks if the function is nil // or already exists in the worker's available next interval functions, // and adds it to the worker's AvailableNextIntervalFuncs. // // This function is useful when you want to add a NextIntervalFunc // with a specific name that you control, rather than deriving it from the function itself. // It ensures that the function is not nil and that the name does not already exist // in the worker's available next interval functions. // // If the function is nil or already exists, it panics. // It returns the updated worker after adding the function with the specified name. func (q *Queuer) AddNextIntervalFuncWithName(nif model.NextIntervalFunc, name string) *model.Worker { if nif == nil { q.log.Panicf("NextIntervalFunc cannot be nil") } if slices.Contains(q.worker.AvailableNextIntervalFuncs, name) { q.log.Panicf("NextIntervalFunc with name %v already exists", name) } q.nextIntervalFuncs[name] = nif q.worker.AvailableNextIntervalFuncs = append(q.worker.AvailableNextIntervalFuncs, name) worker, err := q.dbWorker.UpdateWorker(q.worker) if err != nil { q.log.Panicf("error updating worker: %v", err) } q.log.Printf("NextInterval function added with name %v", name) return worker }
package queuer import ( "slices" "github.com/siherrmann/queuer/model" ) // AddTask adds a new task to the queuer. // It creates a new task with the provided task interface, adds it to the worker's available tasks, // and updates the worker in the database. // The task name is automatically generated based on the task's function name (eg. main.TestTask). // // If the task creation fails, it logs a panic error and exits the program. // It returns the newly created task. func (q *Queuer) AddTask(task interface{}) *model.Task { newTask, err := model.NewTask(task) if err != nil { q.log.Panicf("error creating new task: %v", err) } if slices.Contains(q.worker.AvailableTasks, newTask.Name) { q.log.Panicf("Task with name %v already exists", newTask.Name) } q.tasks[newTask.Name] = newTask q.worker.AvailableTasks = append(q.worker.AvailableTasks, newTask.Name) // Update worker in DB _, err = q.dbWorker.UpdateWorker(q.worker) if err != nil { q.log.Panicf("error updating worker: %v", err) } q.log.Printf("Task added with name %v", newTask.Name) return newTask } // AddTaskWithName adds a new task with a specific name to the queuer. // It creates a new task with the provided task interface and name, adds it to the worker's available tasks, // and updates the worker in the database. // // If task creation fails, it logs a panic error and exits the program. // It returns the newly created task. func (q *Queuer) AddTaskWithName(task interface{}, name string) *model.Task { newTask, err := model.NewTaskWithName(task, name) if err != nil { q.log.Panicf("error creating new task: %v", err) } if slices.Contains(q.worker.AvailableTasks, name) { q.log.Panicf("Task with name %v already exists", name) } q.tasks[newTask.Name] = newTask q.worker.AvailableTasks = append(q.worker.AvailableTasks, newTask.Name) // Update worker in DB _, err = q.dbWorker.UpdateWorker(q.worker) if err != nil { q.log.Panicf("error updating worker: %v", err) } q.log.Printf("Task added with name %v", newTask.Name) return newTask }
package queuer import ( "fmt" "github.com/google/uuid" "github.com/siherrmann/queuer/model" ) // GetWorker retrieves a worker by its RID (Resource Identifier). // It returns the worker if found, or an error if not. func (q *Queuer) GetWorker(workerRid uuid.UUID) (*model.Worker, error) { worker, err := q.dbWorker.SelectWorker(workerRid) if err != nil { return nil, err } return worker, nil } // GetWorkers retrieves a list of workers starting from the lastId and returning the specified number of entries. // It returns a slice of workers and an error if any occurs. func (q *Queuer) GetWorkers(lastId int, entries int) ([]*model.Worker, error) { if lastId < 0 { return nil, fmt.Errorf("lastId cannot be negative, got %d", lastId) } if entries <= 0 { return nil, fmt.Errorf("entries must be greater than zero, got %d", entries) } workers, err := q.dbWorker.SelectAllWorkers(lastId, entries) if err != nil { return nil, err } return workers, nil }