package core import ( "fmt" "queuer/model" "time" ) type Retryer struct { function func() error sleep time.Duration options *model.OnError } func NewRetryer(function func() error, options *model.OnError) (*Retryer, error) { if options == nil || options.MaxRetries <= 0 || options.RetryDelay < 0 { return nil, fmt.Errorf("no valid retry options provided") } return &Retryer{ function: function, sleep: time.Duration(options.RetryDelay) * time.Second, options: options, }, nil } func (r *Retryer) Retry() (err error) { for i := 0; i < r.options.MaxRetries; i++ { err = r.function() if err != nil { time.Sleep(r.sleep) if r.options.RetryBackoff == model.RETRY_BACKOFF_NONE { continue } else if r.options.RetryBackoff == model.RETRY_BACKOFF_LINEAR { r.sleep += time.Duration(r.options.RetryDelay) * time.Second continue } else if r.options.RetryBackoff == model.RETRY_BACKOFF_EXPONENTIAL { r.sleep *= 2 continue } } break } return err }
package core import ( "context" "fmt" "math" "queuer/helper" "queuer/model" "reflect" "time" ) type Runner struct { cancel context.CancelFunc Options *model.Options Task interface{} Parameters model.Parameters // Result channel to return results ResultsChannel chan []interface{} ErrorChannel chan error } func NewRunner(options *model.Options, task interface{}, parameters ...interface{}) (*Runner, error) { taskInputParameters, err := helper.GetInputParametersFromTask(task) if err != nil { return nil, fmt.Errorf("error getting input parameters of task: %v", err) } else if len(taskInputParameters) != len(parameters) { return nil, fmt.Errorf("task expects %d parameters, got %d", len(taskInputParameters), len(parameters)) } for i, param := range parameters { // Convert json float to int if the parameter is int if taskInputParameters[i].Kind() == reflect.Int && reflect.TypeOf(param).Kind() == reflect.Float64 { parameters[i] = int(param.(float64)) } else if taskInputParameters[i].Kind() != reflect.TypeOf(param).Kind() { return nil, fmt.Errorf("parameter %d of task must be of type %s, got %s", i, taskInputParameters[i].Kind(), reflect.TypeOf(param).Kind()) } } err = helper.CheckValidTaskWithParameters(task, parameters...) if err != nil { return nil, fmt.Errorf("error checking task: %v", err) } return &Runner{ Task: task, Parameters: model.Parameters(parameters), Options: options, ResultsChannel: make(chan []interface{}, 1), ErrorChannel: make(chan error, 1), }, nil } func NewRunnerFromJob(task *model.Task, job *model.Job) (*Runner, error) { if task == nil || job == nil { return nil, fmt.Errorf("task and job cannot be nil") } parameters := make([]interface{}, len(job.Parameters)) copy(parameters, job.Parameters) runner, err := NewRunner(job.Options, task.Task, job.Parameters...) if err != nil { return nil, fmt.Errorf("error creating runner from job: %v", err) } return runner, nil } // Run executes the task with the provided parameters. // It will return results on ResultsChannel and errors on ErrorChannel. // If the task panics, it will send the panic value to ErrorChannel. // The main intended use of this function is to run the task in a separate goroutine func (r *Runner) Run(ctx context.Context) { if r.Options != nil && r.Options.OnError != nil && r.Options.OnError.Timeout > 0 { ctx, r.cancel = context.WithTimeout( ctx, time.Duration(math.Round(r.Options.OnError.Timeout*1000))*time.Millisecond, ) } else { ctx, r.cancel = context.WithCancel(ctx) } resultsChannel := make(chan []interface{}, 1) errorChannel := make(chan error, 1) panicChan := make(chan interface{}) go func() { defer func() { if p := recover(); p != nil { panicChan <- p } }() taskFunc := reflect.ValueOf(r.Task) results := taskFunc.Call(r.Parameters.ToReflectValues()) resultValues := []interface{}{} for _, result := range results { resultValues = append(resultValues, result.Interface()) } outputParameters, err := helper.GetOutputParametersFromTask(r.Task) if err != nil { errorChannel <- fmt.Errorf("error getting output parameters of task: %v", err) return } var ok bool if len(resultValues) > 0 { if err, ok = resultValues[len(resultValues)-1].(error); ok || (outputParameters[1].String() == "error" && resultValues[len(resultValues)-1] == nil) { resultValues = resultValues[:len(resultValues)-1] } } if err != nil { errorChannel <- fmt.Errorf("runner failed with error: %v", err) } else { resultsChannel <- resultValues } }() for { select { case p := <-panicChan: r.ErrorChannel <- fmt.Errorf("panic running task: %v", p) return case err := <-errorChannel: r.ErrorChannel <- fmt.Errorf("error running task: %v", err) return case results := <-resultsChannel: r.ResultsChannel <- results return case <-ctx.Done(): r.ErrorChannel <- fmt.Errorf("error running task: %v", ctx.Err()) return } } } func (r *Runner) Cancel(onCancel ...func()) { if len(onCancel) > 0 { for _, cancelFunc := range onCancel { if cancelFunc != nil { cancelFunc() } } } // Cancel the context if it exists if r.cancel != nil { r.cancel() } }
package core import ( "context" "fmt" "log" "queuer/helper" "queuer/model" "reflect" "time" ) type Scheduler struct { Task interface{} Parameters model.Parameters StartTime *time.Time } func NewScheduler(startTime *time.Time, task interface{}, parameters ...interface{}) (*Scheduler, error) { err := helper.CheckValidTaskWithParameters(task, parameters...) if err != nil { return nil, fmt.Errorf("error checking task: %v", err) } return &Scheduler{ Task: task, Parameters: model.Parameters(parameters), StartTime: startTime, }, nil } func (s *Scheduler) Go(ctx context.Context) { var duration time.Duration if s.StartTime != nil { duration = time.Until(*s.StartTime) } runner, err := NewRunner( nil, func() { time.AfterFunc(duration, func() { reflect.ValueOf(s.Task).Call(s.Parameters.ToReflectValues()) }) }, ) if err != nil { log.Printf("Error creating runner: %v", err) return } go runner.Run(ctx) }
package core import ( "context" "fmt" "queuer/helper" "queuer/model" "reflect" "time" ) // Ticker represents a recurring task runner. type Ticker struct { interval time.Duration task interface{} parameters model.Parameters } // NewTicker creates and returns a new Ticker instance. func NewTicker(interval time.Duration, task interface{}, parameters ...interface{}) (*Ticker, error) { if interval <= 0 { return nil, fmt.Errorf("ticker interval must be greater than zero") } err := helper.CheckValidTaskWithParameters(task, parameters...) if err != nil { return nil, fmt.Errorf("error checking task: %s", reflect.TypeOf(task).Kind()) } return &Ticker{ interval: interval, task: task, parameters: model.Parameters(parameters), }, nil } // Go starts the Ticker. It runs the task at the specified interval // until the provided context is cancelled. func (t *Ticker) Go(ctx context.Context) error { runner, err := NewRunner(nil, t.task, t.parameters...) if err != nil { return fmt.Errorf("error creating runner: %v", err) } go runner.Run(ctx) ticker := time.NewTicker(t.interval) defer ticker.Stop() for { select { case <-ctx.Done(): return fmt.Errorf("context cancelled: %w", ctx.Err()) case <-ticker.C: go runner.Run(ctx) case err := <-runner.ErrorChannel: if err != nil { return fmt.Errorf("error running task: %w", err) } } } }
package database import ( "context" "fmt" "log" "queuer/helper" "queuer/model" "time" "github.com/google/uuid" "github.com/lib/pq" ) // JobDBHandlerFunctions defines the interface for Job database operations. type JobDBHandlerFunctions interface { CheckTableExistance() (bool, error) CreateTable() error DropTable() error InsertJob(job *model.Job) (*model.Job, error) BatchInsertJobs(jobs []*model.Job) error UpdateJobsInitial(worker *model.Worker) ([]*model.Job, error) UpdateJobFinal(job *model.Job) (*model.Job, error) DeleteJob(rid uuid.UUID) error SelectJob(rid uuid.UUID) (*model.Job, error) SelectAllJobs(lastID int, entries int) ([]*model.Job, error) SelectAllJobsByWorkerRID(workerRid uuid.UUID, lastID int, entries int) ([]*model.Job, error) SelectAllJobsBySearch(search string, lastID int, entries int) ([]*model.Job, error) // Job Archive SelectJobFromArchive(rid uuid.UUID) (*model.Job, error) SelectAllJobsFromArchive(lastID int, entries int) ([]*model.Job, error) SelectAllJobsFromArchiveBySearch(search string, lastID int, entries int) ([]*model.Job, error) } // JobDBHandler implements JobDBHandlerFunctions and holds the database connection. type JobDBHandler struct { db *helper.Database } // NewJobDBHandler creates a new instance of JobDBHandler. func NewJobDBHandler(dbConnection *helper.Database) (*JobDBHandler, error) { jobDbHandler := &JobDBHandler{ db: dbConnection, } // TODO Remove table drop err := jobDbHandler.DropTable() if err != nil { return nil, fmt.Errorf("error dropping job table: %#v", err) } err = jobDbHandler.CreateTable() if err != nil { return nil, fmt.Errorf("error creating job table: %#v", err) } return jobDbHandler, nil } // CheckTableExistance checks if the 'job' table exists in the database. func (r JobDBHandler) CheckTableExistance() (bool, error) { exists := false exists, err := r.db.CheckTableExistance("job") return exists, err } // CreateTable creates the 'job' table in the database if it doesn't already exist. // It also creates a trigger for notifying events on the table and all necessary indexes. func (r JobDBHandler) CreateTable() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _, err := r.db.Instance.ExecContext( ctx, `CREATE TABLE IF NOT EXISTS job ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, rid UUID UNIQUE DEFAULT gen_random_uuid(), worker_id BIGINT DEFAULT 0, worker_rid UUID DEFAULT NULL, options JSONB DEFAULT '{}', task_name VARCHAR(100) DEFAULT '', parameters JSONB DEFAULT '[]', status VARCHAR(50) DEFAULT 'QUEUED', scheduled_at TIMESTAMP DEFAULT NULL, started_at TIMESTAMP DEFAULT NULL, attempts INT DEFAULT 0, results JSONB DEFAULT '[]', error TEXT DEFAULT '', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS job_archive ( LIKE job INCLUDING DEFAULTS INCLUDING CONSTRAINTS, PRIMARY KEY (id, updated_at) ); SELECT create_hypertable('job_archive', by_range('updated_at'), if_not_exists => TRUE);`, ) if err != nil { log.Panicf("error creating job table: %#v", err) } _, err = r.db.Instance.ExecContext( ctx, `CREATE OR REPLACE TRIGGER job_notify_event AFTER INSERT OR UPDATE OR DELETE ON job FOR EACH ROW EXECUTE PROCEDURE notify_event();`, ) if err != nil { log.Panicf("error creating notify trigger on job table: %#v", err) } err = r.db.CreateIndexes("job", "worker_id", "worker_rid", "status", "created_at", "updated_at") // Indexes on common search/filter fields if err != nil { log.Fatal(err) } r.db.Logger.Println("created table job") return nil } // DropTable drops the 'job' table from the database. func (r JobDBHandler) DropTable() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() query := `DROP TABLE IF EXISTS job` _, err := r.db.Instance.ExecContext(ctx, query) if err != nil { return fmt.Errorf("error dropping job table: %#v", err) } query = `DROP TABLE IF EXISTS job_archive` _, err = r.db.Instance.ExecContext(ctx, query) if err != nil { return fmt.Errorf("error dropping job table: %#v", err) } r.db.Logger.Printf("dropped table job") return nil } // InsertJob inserts a new job record into the database. func (r JobDBHandler) InsertJob(job *model.Job) (*model.Job, error) { newJob := &model.Job{} row := r.db.Instance.QueryRow( `INSERT INTO job (options, task_name, parameters, status, scheduled_at) VALUES ($1, $2, $3, $4, $5) RETURNING id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, attempts, created_at, updated_at;`, job.Options, job.TaskName, job.Parameters, job.Status, job.ScheduledAt, ) err := row.Scan( &newJob.ID, &newJob.RID, &newJob.WorkerID, &newJob.WorkerRID, &newJob.Options, &newJob.TaskName, &newJob.Parameters, &newJob.Status, &newJob.ScheduledAt, &newJob.Attempts, &newJob.CreatedAt, &newJob.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error scanning new job: %w", err) } return newJob, nil } func (r JobDBHandler) BatchInsertJobs(jobs []*model.Job) error { if len(jobs) == 0 { return nil } tx, err := r.db.Instance.Begin() if err != nil { return fmt.Errorf("error starting transaction: %w", err) } stmt, err := tx.Prepare(pq.CopyIn("job", "options", "task_name", "parameters", "scheduled_at")) if err != nil { return fmt.Errorf("error preparing statement for batch insert: %w", err) } for _, job := range jobs { var err error optionsJSON := []byte("{}") parametersJSON := []byte("[]") if job.Options != nil { optionsJSON, err = job.Options.Marshal() if err != nil { return fmt.Errorf("error marshaling job options for batch insert: %w", err) } } if job.Parameters != nil { parametersJSON, err = job.Parameters.Marshal() if err != nil { return fmt.Errorf("error marshaling job parameters for batch insert: %v", err) } } _, err = stmt.Exec( string(optionsJSON), job.TaskName, string(parametersJSON), job.ScheduledAt, ) if err != nil { return fmt.Errorf("error executing batch insert for job %s: %w", job.RID, err) } } _, err = stmt.Exec() if err != nil { return fmt.Errorf("error executing final batch insert: %w", err) } err = stmt.Close() if err != nil { return fmt.Errorf("error closing prepared statement: %w", err) } err = tx.Commit() if err != nil { return fmt.Errorf("error committing transaction: %w", err) } return nil } // UpdateJobsInitial updates an existing queued non locked job record in the database. // Checks if the job is in 'QUEUED' or 'FAILED' status and if the worker can handle the task. func (r JobDBHandler) UpdateJobsInitial(worker *model.Worker) ([]*model.Job, error) { rows, err := r.db.Instance.Query( `WITH current_concurrency AS ( SELECT COUNT(*) AS count FROM job WHERE worker_id = $1 AND status = 'RUNNING' ), current_worker AS ( SELECT id, rid, available_tasks, max_concurrency, COALESCE(cc.count, 0) AS current_concurrency FROM worker, current_concurrency AS cc WHERE id = $1 AND (max_concurrency > COALESCE(cc.count, 0)) FOR UPDATE ), job_ids AS ( SELECT j.id FROM current_worker AS cw, current_concurrency AS cc, LATERAL ( SELECT job.id FROM job WHERE job.task_name = ANY(cw.available_tasks::VARCHAR[]) AND ( job.status = 'QUEUED' OR (job.status = 'SCHEDULED' AND CURRENT_TIMESTAMP >= (job.scheduled_at - '10 minutes'::INTERVAL)) ) ORDER BY job.created_at ASC LIMIT (cw.max_concurrency - COALESCE(cc.count, 0)) FOR UPDATE SKIP LOCKED ) AS j ) UPDATE job SET worker_id = cw.id, worker_rid = cw.rid, status = 'RUNNING', started_at = CURRENT_TIMESTAMP, attempts = attempts + 1, updated_at = CURRENT_TIMESTAMP FROM current_worker AS cw, job_ids WHERE job.id = ANY(SELECT id FROM job_ids) AND EXISTS (SELECT 1 FROM current_worker) RETURNING job.id, job.rid, job.worker_id, job.worker_rid, job.options, job.task_name, job.parameters, job.status, job.scheduled_at, job.started_at, job.attempts, job.created_at, job.updated_at;`, worker.ID, ) if err != nil { return nil, fmt.Errorf("error querying job for worker id %v: %w", worker.ID, err) } defer rows.Close() var jobs []*model.Job for rows.Next() { job := &model.Job{} err := rows.Scan( &job.ID, &job.RID, &job.WorkerID, &job.WorkerRID, &job.Options, &job.TaskName, &job.Parameters, &job.Status, &job.ScheduledAt, &job.StartedAt, &job.Attempts, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error updating initial job for worker id %v: %w", job.WorkerRID, err) } jobs = append(jobs, job) } return jobs, nil } // UpdateJobFinal updates an existing job record in the database to state 'FAILED' or 'SUCCEEDED'. func (r JobDBHandler) UpdateJobFinal(job *model.Job) (*model.Job, error) { row := r.db.Instance.QueryRow( `WITH jobs_old AS ( DELETE FROM job WHERE id = $1 RETURNING id, rid, worker_id, worker_rid, options, task_name, parameters, scheduled_at, started_at, attempts, created_at, updated_at ) INSERT INTO job_archive ( id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, attempts, results, error, created_at, updated_at ) SELECT jobs_old.id, jobs_old.rid, jobs_old.worker_id, jobs_old.worker_rid, jobs_old.options, jobs_old.task_name, jobs_old.parameters, $2, jobs_old.scheduled_at, jobs_old.started_at, jobs_old.attempts, $3, $4, jobs_old.created_at, jobs_old.updated_at FROM jobs_old RETURNING id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, attempts, results, error, created_at, updated_at;`, job.ID, job.Status, job.Results, job.Error, ) archivedJob := &model.Job{} err := row.Scan( &archivedJob.ID, &archivedJob.RID, &archivedJob.WorkerID, &archivedJob.WorkerRID, &archivedJob.Options, &archivedJob.TaskName, &archivedJob.Parameters, &archivedJob.Status, &archivedJob.ScheduledAt, &archivedJob.StartedAt, &archivedJob.Attempts, &archivedJob.Results, &archivedJob.Error, &archivedJob.CreatedAt, &archivedJob.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error updating final job for worker id %v: %w", job.WorkerRID, err) } return archivedJob, nil } // DeleteJob deletes a job record from the database based on its RID. func (r JobDBHandler) DeleteJob(rid uuid.UUID) error { _, err := r.db.Instance.Exec( `DELETE FROM job WHERE rid = $1`, rid, ) if err != nil { return fmt.Errorf("error deleting job with RID %s: %w", rid, err) } return nil } // SelectJob retrieves a single job record from the database based on its RID. func (r JobDBHandler) SelectJob(rid uuid.UUID) (*model.Job, error) { row := r.db.Instance.QueryRow( `SELECT id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, attempts, results, error, created_at, updated_at FROM job WHERE rid = $1`, rid, ) job := &model.Job{} err := row.Scan( &job.ID, &job.RID, &job.WorkerID, &job.WorkerRID, &job.Options, &job.TaskName, &job.Parameters, &job.Status, &job.ScheduledAt, &job.StartedAt, &job.Attempts, &job.Results, &job.Error, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error scanning job with RID %s: %w", rid, err) } return job, nil } // SelectAllJobs retrieves a paginated list of jobs for a specific worker. func (r JobDBHandler) SelectAllJobs(lastID int, entries int) ([]*model.Job, error) { rows, err := r.db.Instance.Query( `SELECT id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, attempts, results, error, created_at, updated_at FROM job WHERE (0 = $1 OR created_at < ( SELECT d.created_at FROM job AS d WHERE d.id = $1)) ORDER BY created_at DESC LIMIT $2;`, lastID, entries, ) if err != nil { return []*model.Job{}, fmt.Errorf("error querying all jobs: %w", err) } defer rows.Close() var jobs []*model.Job for rows.Next() { job := &model.Job{} err := rows.Scan( &job.ID, &job.RID, &job.WorkerID, &job.WorkerRID, &job.Options, &job.TaskName, &job.Parameters, &job.Status, &job.ScheduledAt, &job.StartedAt, &job.Attempts, &job.Results, &job.Error, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return []*model.Job{}, fmt.Errorf("error scanning job row: %w", err) } jobs = append(jobs, job) } return jobs, nil } // SelectAllJobsByWorkerRID retrieves a paginated list of jobs for a specific worker, filtered by worker RID. func (r JobDBHandler) SelectAllJobsByWorkerRID(workerRid uuid.UUID, lastID int, entries int) ([]*model.Job, error) { rows, err := r.db.Instance.Query( `SELECT id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, attempts, results, error, created_at, updated_at FROM job WHERE worker_rid = $1 AND (0 = $2 OR created_at < ( SELECT d.created_at FROM job AS d WHERE d.id = $2)) ORDER BY created_at DESC LIMIT $3;`, workerRid, lastID, entries, ) if err != nil { return []*model.Job{}, fmt.Errorf("error querying jobs by worker RID: %w", err) } defer rows.Close() var jobs []*model.Job for rows.Next() { job := &model.Job{} err := rows.Scan( &job.ID, &job.RID, &job.WorkerID, &job.WorkerRID, &job.Options, &job.TaskName, &job.Parameters, &job.Status, &job.ScheduledAt, &job.StartedAt, &job.Attempts, &job.Results, &job.Error, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return []*model.Job{}, fmt.Errorf("error scanning job row for worker: %w", err) } jobs = append(jobs, job) } return jobs, nil } // SelectAllJobsBySearch retrieves a paginated list of jobs for a worker, filtered by search string. // It searches across 'rid', 'worker_id', and 'status' fields. func (r JobDBHandler) SelectAllJobsBySearch(search string, lastID int, entries int) ([]*model.Job, error) { rows, err := r.db.Instance.Query(` SELECT id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, attempts, results, error, created_at, updated_at FROM job WHERE (rid::text ILIKE '%' || $1 || '%' OR worker_id::text ILIKE '%' || $1 || '%' OR task_name ILIKE '%' || $1 || '%' OR status ILIKE '%' || $1 || '%') AND (0 = $2 OR created_at < ( SELECT u.created_at FROM job AS u WHERE u.id = $2)) ORDER BY created_at DESC LIMIT $3`, search, lastID, entries, ) if err != nil { return []*model.Job{}, fmt.Errorf("error querying jobs by search: %w", err) } defer rows.Close() var jobs []*model.Job for rows.Next() { job := &model.Job{} err := rows.Scan( &job.ID, &job.RID, &job.WorkerID, &job.WorkerRID, &job.Options, &job.TaskName, &job.Parameters, &job.Status, &job.ScheduledAt, &job.StartedAt, &job.Attempts, &job.Results, &job.Error, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return []*model.Job{}, fmt.Errorf("error scanning job row during search: %w", err) } jobs = append(jobs, job) } if err = rows.Err(); err != nil { return []*model.Job{}, fmt.Errorf("error after iterating rows during search: %w", err) } return jobs, nil } // Job Archive func (r JobDBHandler) SelectJobFromArchive(rid uuid.UUID) (*model.Job, error) { row := r.db.Instance.QueryRow( `SELECT id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, attempts, results, error, created_at, updated_at FROM job_archive WHERE rid = $1`, rid, ) job := &model.Job{} err := row.Scan( &job.ID, &job.RID, &job.WorkerID, &job.WorkerRID, &job.Options, &job.TaskName, &job.Parameters, &job.Status, &job.ScheduledAt, &job.StartedAt, &job.Attempts, &job.Results, &job.Error, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error scanning archived job with RID %s: %w", rid, err) } return job, nil } func (r JobDBHandler) SelectAllJobsFromArchive(lastID int, entries int) ([]*model.Job, error) { rows, err := r.db.Instance.Query( `SELECT id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, attempts, results, error, created_at, updated_at FROM job_archive WHERE (0 = $1 OR created_at < ( SELECT d.created_at FROM job_archive AS d WHERE d.id = $1)) ORDER BY created_at DESC LIMIT $2;`, lastID, entries, ) if err != nil { return []*model.Job{}, fmt.Errorf("error querying all archived jobs: %w", err) } defer rows.Close() var jobs []*model.Job for rows.Next() { job := &model.Job{} err := rows.Scan( &job.ID, &job.RID, &job.WorkerID, &job.WorkerRID, &job.Options, &job.TaskName, &job.Parameters, &job.Status, &job.ScheduledAt, &job.StartedAt, &job.Attempts, &job.Results, &job.Error, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return []*model.Job{}, fmt.Errorf("error scanning archived job row: %w", err) } jobs = append(jobs, job) } return jobs, nil } func (r JobDBHandler) SelectAllJobsFromArchiveBySearch(search string, lastID int, entries int) ([]*model.Job, error) { rows, err := r.db.Instance.Query(` SELECT id, rid, worker_id, worker_rid, options, task_name, parameters, status, scheduled_at, started_at, attempts, results, error, created_at, updated_at FROM job_archive WHERE (rid::text ILIKE '%' || $1 || '%' OR worker_id::text ILIKE '%' || $1 || '%' OR task_name ILIKE '%' || $1 || '%' OR status ILIKE '%' || $1 || '%') AND (0 = $2 OR created_at < ( SELECT u.created_at FROM job_archive AS u WHERE u.id = $2)) ORDER BY created_at DESC LIMIT $3`, search, lastID, entries, ) if err != nil { return []*model.Job{}, fmt.Errorf("error querying archived jobs by search: %w", err) } defer rows.Close() var jobs []*model.Job for rows.Next() { job := &model.Job{} err := rows.Scan( &job.ID, &job.RID, &job.WorkerID, &job.WorkerRID, &job.Options, &job.TaskName, &job.Parameters, &job.Status, &job.ScheduledAt, &job.StartedAt, &job.Attempts, &job.Results, &job.Error, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return []*model.Job{}, fmt.Errorf("error scanning archived job row during search: %w", err) } jobs = append(jobs, job) } if err = rows.Err(); err != nil { return []*model.Job{}, fmt.Errorf("error after iterating rows during search: %w", err) } return jobs, nil }
package database import ( "context" "fmt" "log" "queuer/helper" "time" "github.com/lib/pq" ) type QueuerListener struct { Listener *pq.Listener Channel string } // NewQueuerListener creates a new QueuerListener instance. func NewQueuerListener(dbConfig *helper.DatabaseConfiguration, channel string) (*QueuerListener, error) { listener := pq.NewListener(dbConfig.DatabaseConnectionString(), 10*time.Second, time.Minute, func(ev pq.ListenerEventType, err error) { if err != nil { fmt.Printf("error creating postgres listener: %v", err) } }) if err := listener.Listen(channel); err != nil { return nil, fmt.Errorf("error listening to channel %v: %v", channel, err) } log.Println("Added listener to channel: ", channel) return &QueuerListener{ Listener: listener, Channel: channel, }, nil } // ListenToEvents listens for events on the specified channel and processes them. func (l *QueuerListener) ListenToEvents(ctx context.Context, cancel context.CancelFunc, notifyFunction func(data string)) { for { select { case <-ctx.Done(): return case n := <-l.Listener.Notify: if n != nil { go notifyFunction(n.Extra) } case <-time.After(90 * time.Second): // Checking connection all 90 seconds err := l.Listener.Ping() if err != nil { log.Printf("error pinging listener: %v", err) cancel() return } } } }
package database import ( "context" "fmt" "log" "queuer/helper" "queuer/model" "time" "github.com/google/uuid" "github.com/lib/pq" ) // WorkerDBHandlerFunctions defines the interface for Worker database operations. type WorkerDBHandlerFunctions interface { CheckTableExistance() (bool, error) CreateTable() error DropTable() error InsertWorker(worker *model.Worker) (*model.Worker, error) UpdateWorker(worker *model.Worker) (*model.Worker, error) DeleteWorker(rid uuid.UUID) error SelectWorker(rid uuid.UUID) (*model.Worker, error) SelectAllWorkers(lastID int, entries int) ([]*model.Worker, error) SelectAllWorkersBySearch(search string, lastID int, entries int) ([]*model.Worker, error) } // WorkerDBHandler implements WorkerDBHandlerFunctions and holds the database connection. type WorkerDBHandler struct { db *helper.Database } // NewWorkerDBHandler creates a new instance of WorkerDBHandler. func NewWorkerDBHandler(dbConnection *helper.Database) (*WorkerDBHandler, error) { workerDbHandler := &WorkerDBHandler{ db: dbConnection, } // TODO Remove table drop err := workerDbHandler.DropTable() if err != nil { return nil, fmt.Errorf("error dropping worker table: %#v", err) } err = workerDbHandler.CreateTable() if err != nil { return nil, fmt.Errorf("error creating worker table: %#v", err) } return workerDbHandler, nil } // CheckTableExistance checks if the 'worker' table exists in the database. func (r WorkerDBHandler) CheckTableExistance() (bool, error) { exists := false exists, err := r.db.CheckTableExistance("worker") return exists, err } // CreateTable creates the 'worker' table in the database if it doesn't already exist. func (r WorkerDBHandler) CreateTable() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _, err := r.db.Instance.ExecContext( ctx, `CREATE TABLE IF NOT EXISTS worker ( id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, rid UUID UNIQUE DEFAULT gen_random_uuid(), name VARCHAR(100) DEFAULT '', options JSONB DEFAULT '{}', available_tasks VARCHAR[] DEFAULT ARRAY[]::VARCHAR[], available_next_interval VARCHAR[] DEFAULT ARRAY[]::VARCHAR[], current_concurrency INT DEFAULT 0, max_concurrency INT DEFAULT 1, status VARCHAR(50) DEFAULT 'RUNNING', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP )`, ) if err != nil { log.Panicf("error creating worker table: %#v", err) } err = r.db.CreateIndexes("worker", "rid", "name", "status") if err != nil { r.db.Logger.Fatal(err) } r.db.Logger.Println("created table worker") return nil } // DropTable drops the 'worker' table from the database. func (r WorkerDBHandler) DropTable() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() query := `DROP TABLE IF EXISTS worker` _, err := r.db.Instance.ExecContext(ctx, query) if err != nil { return fmt.Errorf("error dropping worker table: %#v", err) } r.db.Logger.Printf("dropped table worker") return nil } // InsertWorker inserts a new worker record into the database. func (r WorkerDBHandler) InsertWorker(worker *model.Worker) (*model.Worker, error) { row := r.db.Instance.QueryRow( `INSERT INTO worker (name, options, max_concurrency) VALUES ($1, $2, $3) RETURNING id, rid, name, options, max_concurrency, status, created_at, updated_at`, worker.Name, worker.Options, worker.MaxConcurrency, ) newWorker := &model.Worker{} err := row.Scan( &newWorker.ID, &newWorker.RID, &newWorker.Name, &newWorker.Options, &newWorker.MaxConcurrency, &newWorker.Status, &newWorker.CreatedAt, &newWorker.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error scanning new worker: %w", err) } return newWorker, nil } // UpdateWorker updates an existing worker record in the database based on its RID. func (r WorkerDBHandler) UpdateWorker(worker *model.Worker) (*model.Worker, error) { row := r.db.Instance.QueryRow( `UPDATE worker SET name = $1, options = $2, available_tasks = $3, available_next_interval = $4, max_concurrency = $5, status = $6, updated_at = CURRENT_TIMESTAMP WHERE rid = $7 RETURNING id, rid, name, options, available_tasks, available_next_interval, max_concurrency, status, created_at, updated_at;`, worker.Name, worker.Options, pq.Array(worker.AvailableTasks), pq.Array(worker.AvailableNextIntervalFuncs), worker.MaxConcurrency, worker.Status, worker.RID, ) updatedWorker := &model.Worker{} err := row.Scan( &updatedWorker.ID, &updatedWorker.RID, &updatedWorker.Name, &updatedWorker.Options, pq.Array(&updatedWorker.AvailableTasks), pq.Array(&updatedWorker.AvailableNextIntervalFuncs), &updatedWorker.MaxConcurrency, &updatedWorker.Status, &updatedWorker.CreatedAt, &updatedWorker.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error scanning updated worker: %w", err) } return updatedWorker, err } // DeleteWorker deletes a worker record from the database based on its RID. func (r WorkerDBHandler) DeleteWorker(rid uuid.UUID) error { _, err := r.db.Instance.Exec( `DELETE FROM worker WHERE rid = $1`, rid, ) if err != nil { return fmt.Errorf("error deleting worker with RID %s: %w", rid.String(), err) } return nil } // SelectWorker retrieves a single worker record from the database based on its RID. func (r WorkerDBHandler) SelectWorker(rid uuid.UUID) (*model.Worker, error) { worker := &model.Worker{} row := r.db.Instance.QueryRow( `SELECT id, rid, name, options, available_tasks, available_next_interval, max_concurrency, status, created_at, updated_at FROM worker WHERE rid = $1`, rid, ) err := row.Scan( &worker.ID, &worker.RID, &worker.Name, &worker.Options, pq.Array(&worker.AvailableTasks), pq.Array(&worker.AvailableNextIntervalFuncs), &worker.MaxConcurrency, &worker.Status, &worker.CreatedAt, &worker.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("error scanning worker with RID %s: %w", rid.String(), err) } return worker, nil } // SelectAllWorkers retrieves a paginated list of all workers. func (r WorkerDBHandler) SelectAllWorkers(lastID int, entries int) ([]*model.Worker, error) { var workers []*model.Worker rows, err := r.db.Instance.Query( `SELECT id, rid, name, options, available_tasks, available_next_interval, max_concurrency, status, created_at, updated_at FROM worker WHERE (0 = $1 OR created_at < ( SELECT d.created_at FROM worker AS d WHERE d.id = $1)) ORDER BY created_at DESC LIMIT $2`, lastID, entries, ) if err != nil { return []*model.Worker{}, fmt.Errorf("error querying all workers: %w", err) } defer rows.Close() for rows.Next() { worker := &model.Worker{} err := rows.Scan( &worker.ID, &worker.RID, &worker.Name, &worker.Options, pq.Array(&worker.AvailableTasks), pq.Array(&worker.AvailableNextIntervalFuncs), &worker.MaxConcurrency, &worker.Status, &worker.CreatedAt, &worker.UpdatedAt, ) if err != nil { return []*model.Worker{}, fmt.Errorf("error scanning worker row: %w", err) } workers = append(workers, worker) } if err = rows.Err(); err != nil { return []*model.Worker{}, fmt.Errorf("error iterating rows: %w", err) } return workers, nil } // SelectAllWorkersBySearch retrieves a paginated list of workers, filtered by search string. // It searches across 'queue_name', 'name', and 'status' fields. func (r WorkerDBHandler) SelectAllWorkersBySearch(search string, lastID int, entries int) ([]*model.Worker, error) { var workers []*model.Worker rows, err := r.db.Instance.Query(` SELECT id, rid, name, options, available_tasks, available_next_interval, max_concurrency, status, created_at, updated_at FROM worker WHERE (name ILIKE '%' || $1 || '%' OR array_to_string(available_tasks, ',') ILIKE '%' || $1 || '%' OR status ILIKE '%' || $1 || '%') AND (0 = $2 OR created_at < ( SELECT u.created_at FROM worker AS u WHERE u.id = $2)) ORDER BY created_at DESC LIMIT $3`, search, lastID, entries, ) if err != nil { return []*model.Worker{}, fmt.Errorf("error querying workers by search: %w", err) } defer rows.Close() for rows.Next() { worker := &model.Worker{} err := rows.Scan( &worker.ID, &worker.RID, &worker.Name, &worker.Options, pq.Array(&worker.AvailableTasks), pq.Array(&worker.AvailableNextIntervalFuncs), &worker.MaxConcurrency, &worker.Status, &worker.CreatedAt, &worker.UpdatedAt, ) if err != nil { return []*model.Worker{}, fmt.Errorf("error scanning worker row during search: %w", err) } workers = append(workers, worker) } if err = rows.Err(); err != nil { return []*model.Worker{}, fmt.Errorf("error iterating rows: %w", err) } return workers, nil }
package helper import ( "context" "database/sql" "fmt" "log" "os" "strconv" "strings" "sync" "time" _ "github.com/joho/godotenv/autoload" "github.com/lib/pq" ) // Database represents a service that interacts with a database. type Database struct { Name string Logger *log.Logger Instance *sql.DB } func NewDatabase(name string, dbConfig *DatabaseConfiguration) *Database { logger := log.New(os.Stdout, "Database "+name+": ", log.Ltime) if dbConfig != nil { db := &Database{Name: name, Logger: logger} db.ConnectToDatabase(dbConfig, logger) if db.Instance == nil { logger.Fatal("failed to connect to database") } err := db.AddNotifyFunction() if err != nil { logger.Panicf("failed to add notify function: %v", err) } return db } else { return &Database{name, logger, nil} } } type DatabaseConfiguration struct { Host string Port string Database string Username string Password string Schema string } func NewDatabaseConfiguration() (*DatabaseConfiguration, error) { config := &DatabaseConfiguration{ Host: os.Getenv("QUEUER_DB_HOST"), Port: os.Getenv("QUEUER_DB_PORT"), Database: os.Getenv("QUEUER_DB_DATABASE"), Username: os.Getenv("QUEUER_DB_USERNAME"), Password: os.Getenv("QUEUER_DB_PASSWORD"), Schema: os.Getenv("QUEUER_DB_SCHEMA"), } if len(strings.TrimSpace(config.Host)) == 0 || len(strings.TrimSpace(config.Port)) == 0 || len(strings.TrimSpace(config.Database)) == 0 || len(strings.TrimSpace(config.Username)) == 0 || len(strings.TrimSpace(config.Password)) == 0 || len(strings.TrimSpace(config.Schema)) == 0 { return nil, fmt.Errorf("QUEUER_DB_HOST, QUEUER_DB_PORT, QUEUER_DB_DATABASE, QUEUER_DB_USERNAME, QUEUER_DB_PASSWORD and QUEUER_DB_SCHEMA environment variables must be set") } return config, nil } func (d *DatabaseConfiguration) DatabaseConnectionString() string { return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable&search_path=%s", d.Username, d.Password, d.Host, d.Port, d.Database, d.Schema) } // Internal function for the service creation to connect to a database. // DatabaseConfiguration must contain uri, username and password. func (d *Database) ConnectToDatabase(dbConfig *DatabaseConfiguration, logger *log.Logger) { if len(strings.TrimSpace(dbConfig.Host)) == 0 || len(strings.TrimSpace(dbConfig.Port)) == 0 || len(strings.TrimSpace(dbConfig.Database)) == 0 || len(strings.TrimSpace(dbConfig.Username)) == 0 || len(strings.TrimSpace(dbConfig.Password)) == 0 || len(strings.TrimSpace(dbConfig.Schema)) == 0 { logger.Fatalln("database configuration must contain uri, username and password.") } var connectOnce sync.Once var db *sql.DB var err error logger.Printf("initializing the database with ip address: %v", dbConfig.Host) connectOnce.Do(func() { db, err = sql.Open("postgres", dbConfig.DatabaseConnectionString()) if err != nil { logger.Panicf("error establishing connection to db: %v. Trying again.", err.Error()) } db.SetMaxOpenConns(10) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _, err = db.ExecContext( ctx, "CREATE EXTENSION IF NOT EXISTS pg_trgm;", ) if err != nil { logger.Fatal(err) } pingErr := db.Ping() if pingErr != nil { logger.Fatal(pingErr) } logger.Println("connected to db") }) d.Instance = db } func (d *Database) AddNotifyFunction() error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _, err := d.Instance.ExecContext( ctx, `CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$ DECLARE data JSON; BEGIN PERFORM pg_notify(TG_TABLE_NAME || '.' || TG_OP, data::text); RETURN NEW; END; $$ LANGUAGE plpgsql;`, ) // IF (TG_OP = 'DELETE') THEN // data = row_to_json(OLD); // ELSE // data = row_to_json(NEW); // END IF; if err != nil { return fmt.Errorf("error creating notify function: %#v", err) } return nil } func (d *Database) CheckTableExistance(tableName string) (bool, error) { exists := false tableNameQuoted := pq.QuoteLiteral(tableName) row := d.Instance.QueryRow( fmt.Sprintf(` SELECT EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_name = %s ) AS table_existence`, tableNameQuoted, ), ) err := row.Scan( &exists, ) if err != nil { return false, err } return exists, nil } func (d *Database) CreateIndex(tableName string, columnName string) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() tableNameQuoted := pq.QuoteIdentifier(tableName) indexQuoted := pq.QuoteIdentifier("idx_" + tableName + "_" + columnName) columnNameQuoted := pq.QuoteIdentifier(columnName) _, err := d.Instance.ExecContext( ctx, fmt.Sprintf("CREATE INDEX IF NOT EXISTS %s ON %s(%s)", indexQuoted, tableNameQuoted, columnNameQuoted), ) if err != nil { return fmt.Errorf("error creating %s index: %#v", indexQuoted, err) } return nil } func (d *Database) CreateIndexes(tableName string, columnNames ...string) error { for _, columnName := range columnNames { err := d.CreateIndex(tableName, columnName) if err != nil { return err } } return nil } func (d *Database) CreateCombinedIndex(tableName string, columnName1 string, columnName2 string) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() tableNameQuoted := pq.QuoteIdentifier(tableName) indexQuoted := pq.QuoteIdentifier("idx_" + tableName + "_" + columnName1 + "_" + columnName2) columnName1Quoted := pq.QuoteIdentifier(columnName1) columnName2Quoted := pq.QuoteIdentifier(columnName2) _, err := d.Instance.ExecContext( ctx, fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s ON %s (%s, %s)`, indexQuoted, tableNameQuoted, columnName1Quoted, columnName2Quoted), ) if err != nil { return fmt.Errorf("error creating %s index: %#v", indexQuoted, err) } return nil } func (d *Database) CreateUniqueCombinedIndex(tableName string, columnName1 string, columnName2 string) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() tableNameQuoted := pq.QuoteIdentifier(tableName) indexQuoted := pq.QuoteIdentifier("idx_" + tableName + "_" + columnName1 + "_" + columnName2) columnName1Quoted := pq.QuoteIdentifier(columnName1) columnName2Quoted := pq.QuoteIdentifier(columnName2) _, err := d.Instance.ExecContext( ctx, fmt.Sprintf(`CREATE UNIQUE INDEX IF NOT EXISTS %s ON %s (%s, %s)`, indexQuoted, tableNameQuoted, columnName1Quoted, columnName2Quoted), ) if err != nil { return fmt.Errorf("error creating %s index: %#v", indexQuoted, err) } return nil } func (d *Database) CreateJsonIndexNumber(tableName string, jsonMapKey string) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() tableNameQuoted := pq.QuoteIdentifier(tableName) indexQuoted := pq.QuoteIdentifier("idx_" + tableName + "_" + jsonMapKey) _, err := d.Instance.ExecContext( ctx, `CREATE INDEX IF NOT EXISTS `+indexQuoted+` ON `+tableNameQuoted+`USING BTREE ((details->>'`+jsonMapKey+`'));`, ) if err != nil { return fmt.Errorf("error creating %s json index number: %#v", indexQuoted, err) } return nil } func (d *Database) CreateJsonIndexText(tableName string, jsonMapKey string) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() tableNameQuoted := pq.QuoteIdentifier(tableName) indexQuoted := pq.QuoteIdentifier("idx_" + tableName + "_" + jsonMapKey) _, err := d.Instance.ExecContext( ctx, `CREATE INDEX IF NOT EXISTS `+indexQuoted+` ON `+tableNameQuoted+` USING GIN ((details->>'`+jsonMapKey+`') gin_trgm_ops);`, ) if err != nil { return fmt.Errorf("error creating %s json index text: %#v", indexQuoted, err) } return nil } func (d *Database) CreateJsonIndexArray(tableName string, jsonMapKey string) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() tableNameQuoted := pq.QuoteIdentifier(tableName) indexQuoted := pq.QuoteIdentifier("idx_" + tableName + "_" + jsonMapKey) _, err := d.Instance.ExecContext( ctx, `CREATE INDEX `+indexQuoted+` ON `+tableNameQuoted+` USING GIN ((details->'`+jsonMapKey+`') jsonb_path_ops);`, ) if err != nil { return fmt.Errorf("error creating %s json index array: %#v", indexQuoted, err) } // select where array contains // `SELECT * from $1 where details->$2 @> '["abc", "keh"]'::jsonb;`, // tableName, // jsonMapKey, // array return nil } func (d *Database) DropIndex(tableName string, jsonMapKey string) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() indexQuoted := pq.QuoteIdentifier("idx_" + tableName + "_" + jsonMapKey) _, err := d.Instance.ExecContext( ctx, fmt.Sprintf(`DROP INDEX IF EXISTS %s;`, indexQuoted), ) if err != nil { return fmt.Errorf("error dropping %s index: %#v", indexQuoted, err) } return nil } // Health checks the health of the database connection by pinging the database. // It returns a map with keys indicating various health statistics. func (d *Database) Health() map[string]string { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() stats := make(map[string]string) // Ping the database err := d.Instance.PingContext(ctx) if err != nil { stats["status"] = "down" stats["error"] = fmt.Sprintf("db down: %v", err) log.Panicf("db down: %v", err) // Log the error and terminate the program return stats } // Database is up, add more statistics stats["status"] = "up" stats["message"] = "It's healthy" // Get database stats (like open connections, in use, idle, etc.) dbStats := d.Instance.Stats() stats["open_connections"] = strconv.Itoa(dbStats.OpenConnections) stats["in_use"] = strconv.Itoa(dbStats.InUse) stats["idle"] = strconv.Itoa(dbStats.Idle) stats["wait_count"] = strconv.FormatInt(dbStats.WaitCount, 10) stats["wait_duration"] = dbStats.WaitDuration.String() stats["max_idle_closed"] = strconv.FormatInt(dbStats.MaxIdleClosed, 10) stats["max_lifetime_closed"] = strconv.FormatInt(dbStats.MaxLifetimeClosed, 10) // Evaluate stats to provide a health message if dbStats.OpenConnections > 40 { // Assuming 50 is the max for this example stats["message"] = "The database is experiencing heavy load." } if dbStats.WaitCount > 1000 { stats["message"] = "The database has a high number of wait events, indicating potential bottlenecks." } if dbStats.MaxIdleClosed > int64(dbStats.OpenConnections)/2 { stats["message"] = "Many idle connections are being closed, consider revising the connection pool settings." } if dbStats.MaxLifetimeClosed > int64(dbStats.OpenConnections)/2 { stats["message"] = "Many connections are being closed due to max lifetime, consider increasing max lifetime or revising the connection usage pattern." } return stats } // Close closes the database connection. // It logs a message indicating the disconnection from the specific database. // If the connection is successfully closed, it returns nil. // If an error occurs while closing the connection, it returns the error. func (d *Database) Close() error { log.Printf("Disconnected from database: %v", d.Instance) return d.Instance.Close() }
package helper import ( "context" "fmt" "net/url" "time" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/modules/postgres" "github.com/testcontainers/testcontainers-go/wait" ) const ( dbName = "database" dbUser = "user" dbPwd = "password" ) // MustStartPostgresContainer starts a PostgreSQL container for testing purposes. func MustStartPostgresContainer() (func(ctx context.Context, opts ...testcontainers.TerminateOption) error, string, error) { ctx := context.Background() pgContainer, err := postgres.Run( ctx, "timescale/timescaledb:latest-pg17", postgres.WithDatabase(dbName), postgres.WithUsername(dbUser), postgres.WithPassword(dbPwd), testcontainers.WithWaitStrategy( wait.ForLog("database system is ready to accept connections"). WithOccurrence(2).WithStartupTimeout(5*time.Second), ), ) if err != nil { return nil, "", fmt.Errorf("error starting postgres container: %w", err) } connStr, err := pgContainer.ConnectionString(ctx, "sslmode=disable") if err != nil { return nil, "", fmt.Errorf("error getting connection string: %w", err) } u, err := url.Parse(connStr) if err != nil { return nil, "", fmt.Errorf("error parsing connection string: %v", err) } return pgContainer.Terminate, u.Port(), err } func NewTestDatabase(config *DatabaseConfiguration) *Database { return NewDatabase( "test_db", config, ) } func NewTestDatabaseConfig(port string) *DatabaseConfiguration { return &DatabaseConfiguration{ Host: "localhost", Port: port, Database: dbName, Username: dbUser, Password: dbPwd, Schema: "public", } }
package helper import ( "fmt" "reflect" "runtime" ) func CheckValidTask(task interface{}) error { if task == nil { return fmt.Errorf("task must not be nil") } if reflect.ValueOf(task).Kind() != reflect.Func { return fmt.Errorf("task must be a function, got %s", reflect.TypeOf(task).Kind()) } if reflect.ValueOf(task).IsNil() { return fmt.Errorf("task value must not be nil") } return nil } func CheckValidTaskWithParameters(task interface{}, parameters ...interface{}) error { err := CheckValidTask(task) if err != nil { return err } taskType := reflect.TypeOf(task) if taskType.NumIn() != len(parameters) { return fmt.Errorf("task expects %d parameters, got %d", taskType.NumIn(), len(parameters)) } for i, param := range parameters { if !reflect.TypeOf(param).AssignableTo(taskType.In(i)) { return fmt.Errorf("parameter %d of task must be of type %s, got %s", i, taskType.In(i).Kind(), reflect.TypeOf(param).Kind()) } } return nil } func GetTaskNameFromFunction(f interface{}) (string, error) { err := CheckValidTask(f) if err != nil { return "", err } return runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), nil } func GetTaskNameFromInterface(task interface{}) (string, error) { if taskNameString, ok := task.(string); ok { return taskNameString, nil } return GetTaskNameFromFunction(task) } func GetInputParametersFromTask(task interface{}) ([]reflect.Type, error) { inputCount := reflect.TypeOf(task).NumIn() inputParameters := []reflect.Type{} for i := 0; i < inputCount; i++ { inputParameters = append(inputParameters, reflect.TypeOf(task).In(i)) } return inputParameters, nil } func GetOutputParametersFromTask(task interface{}) ([]reflect.Type, error) { outputCount := reflect.TypeOf(task).NumOut() outputParameters := []reflect.Type{} for i := 0; i < outputCount; i++ { outputParameters = append(outputParameters, reflect.TypeOf(task).Out(i)) } return outputParameters, nil }
package model import ( "database/sql/driver" "encoding/json" "errors" "fmt" "queuer/helper" "reflect" "strings" "time" "github.com/google/uuid" ) const ( // Job statuses before processing JobStatusQueued = "QUEUED" JobStatusScheduled = "SCHEDULED" // Running status is used when the job is being processed by a worker. JobStatusRunning = "RUNNING" // Job statuses after processing JobStatusFailed = "FAILED" JobStatusSucceeded = "SUCCEEDED" JobStatusCancelled = "CANCELLED" ) type Parameters []interface{} func (c Parameters) Value() (driver.Value, error) { return c.Marshal() } func (c *Parameters) Scan(value interface{}) error { return c.Unmarshal(value) } func (r Parameters) Marshal() ([]byte, error) { return json.Marshal(r) } func (r *Parameters) Unmarshal(value interface{}) error { if s, ok := value.(Parameters); ok { *r = Parameters(s) } else { b, ok := value.([]byte) if !ok { return errors.New("type assertion to []byte failed") } return json.Unmarshal(b, r) } return nil } func (r *Parameters) ToReflectValues() []reflect.Value { if r == nil { return []reflect.Value{} } reflectValues := []reflect.Value{} for _, p := range *r { reflectValues = append(reflectValues, reflect.ValueOf(p)) } return reflectValues } func (r *Parameters) ToInterfaceSlice() []interface{} { if r == nil { return []interface{}{} } interfaceSlice := make([]interface{}, len(*r)) copy(interfaceSlice, *r) return interfaceSlice } type Job struct { ID int `json:"id"` RID uuid.UUID `json:"rid"` WorkerID int `json:"worker_id"` WorkerRID uuid.UUID `json:"worker_rid"` Options *Options `json:"options"` TaskName string `json:"task_name"` Parameters Parameters `json:"parameters"` Status string `json:"status"` ScheduledAt *time.Time `json:"scheduled_at"` StartedAt *time.Time `json:"started_at"` Attempts int `json:"attempts"` Results Parameters `json:"result"` Error string `json:"error"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } func NewJob(task interface{}, options *Options, parameters ...interface{}) (*Job, error) { taskName, err := helper.GetTaskNameFromInterface(task) if err != nil { return nil, fmt.Errorf("error getting task name: %v", err) } if len(taskName) == 0 || len(taskName) > 100 { return nil, fmt.Errorf("taskName must have a length between 1 and 100") } if options != nil && options.OnError != nil { err := options.OnError.IsValid() if err != nil { return nil, fmt.Errorf("invalid OnError options: %v", err) } } if options != nil && options.Schedule != nil { err := options.Schedule.IsValid() if err != nil { return nil, fmt.Errorf("invalid Schedule options: %v", err) } } status := JobStatusQueued var scheduledAt time.Time if options != nil && options.Schedule != nil { status = JobStatusScheduled scheduledAt = options.Schedule.Start.UTC() } return &Job{ TaskName: taskName, Status: status, ScheduledAt: &scheduledAt, Options: options, Parameters: parameters, }, nil } type JobFromNotification struct { ID int `json:"id"` RID uuid.UUID `json:"rid"` WorkerID int `json:"worker_id"` WorkerRID uuid.UUID `json:"worker_rid"` TaskName string `json:"task_name"` Parameters Parameters `json:"parameters"` Status string `json:"status"` Attempts int `json:"attempts"` Result Parameters `json:"result"` CreatedAt DBTime `json:"created_at"` UpdatedAt DBTime `json:"updated_at"` } func (jn *JobFromNotification) ToJob() *Job { return &Job{ ID: jn.ID, RID: jn.RID, WorkerID: jn.WorkerID, WorkerRID: jn.WorkerRID, TaskName: jn.TaskName, Parameters: jn.Parameters, Status: jn.Status, Attempts: jn.Attempts, CreatedAt: jn.CreatedAt.Time, UpdatedAt: jn.UpdatedAt.Time, } } type DBTime struct { time.Time } const dbTimeLayoutWithoutZeroes = "2006-01-02T15:04:05." const dbTimeLayout = "2006-01-02T15:04:05.000000" func (ct *DBTime) UnmarshalJSON(b []byte) error { s := strings.Trim(string(b), "\"") if s == "null" { ct.Time = time.Time{} return nil } tSplit := strings.Split(s, ".") if len(tSplit) != 2 { return fmt.Errorf("invalid time format: %s", s) } var err error ct.Time, err = time.Parse(dbTimeLayoutWithoutZeroes+strings.Repeat("0", len(tSplit[1])), s) if err != nil { return fmt.Errorf("error parsing time: %s, error: %w", s, err) } return nil } func (ct *DBTime) MarshalJSON() ([]byte, error) { if ct.Time.IsZero() { return []byte("null"), nil } return []byte(fmt.Sprintf("\"%s\"", ct.Time.Format(dbTimeLayout))), nil } func (ct *DBTime) IsSet() bool { return !ct.IsZero() }
package model import ( "database/sql/driver" "encoding/json" "errors" ) type Options struct { OnError *OnError Schedule *Schedule } func (c *Options) IsValid() error { // nil options are considered valid if c == nil { return nil } if c.OnError != nil { if err := c.OnError.IsValid(); err != nil { return err } } if c.Schedule != nil { if err := c.Schedule.IsValid(); err != nil { return err } } return nil } func (c Options) Value() (driver.Value, error) { return c.Marshal() } func (c *Options) Scan(value interface{}) error { return c.Unmarshal(value) } func (r Options) Marshal() ([]byte, error) { return json.Marshal(r) } func (r *Options) Unmarshal(value interface{}) error { if o, ok := value.(Options); ok { *r = o } else { b, ok := value.([]byte) if !ok { return errors.New("type assertion to []byte failed") } return json.Unmarshal(b, r) } return nil }
package model import ( "database/sql/driver" "encoding/json" "errors" ) const ( RETRY_BACKOFF_NONE = "none" RETRY_BACKOFF_LINEAR = "linear" RETRY_BACKOFF_EXPONENTIAL = "exponential" ) type OnError struct { Timeout float64 `json:"timeout"` MaxRetries int `json:"max_retries"` RetryDelay float64 `json:"retry_delay"` RetryBackoff string `json:"retry_backoff"` } func (c *OnError) IsValid() error { if c.Timeout < 0 { return errors.New("timeout cannot be negative") } if c.MaxRetries < 0 { return errors.New("max retries cannot be negative") } if c.RetryDelay < 0 { return errors.New("retry delay cannot be negative") } if c.RetryBackoff != RETRY_BACKOFF_NONE && c.RetryBackoff != RETRY_BACKOFF_LINEAR && c.RetryBackoff != RETRY_BACKOFF_EXPONENTIAL { return errors.New("retry backoff must be one of: none, linear, exponential") } return nil } func (c OnError) Value() (driver.Value, error) { return c.Marshal() } func (c *OnError) Scan(value interface{}) error { return c.Unmarshal(value) } func (r OnError) Marshal() ([]byte, error) { return json.Marshal(r) } func (r *OnError) Unmarshal(value interface{}) error { if o, ok := value.(OnError); ok { *r = o } else { b, ok := value.([]byte) if !ok { return errors.New("type assertion to []byte failed") } return json.Unmarshal(b, r) } return nil }
package model import ( "database/sql/driver" "encoding/json" "errors" "time" ) // NextIntervalFunc defines a function type that calculates the next interval // based on the start time and the current count of executions. type NextIntervalFunc func(start time.Time, currentCount int) time.Time type Schedule struct { Start time.Time `json:"start"` Interval time.Duration `json:"interval"` MaxCount int `json:"max_count"` NextInterval string `json:"next_interval,omitempty"` } func (c *Schedule) IsValid() error { if c.Start.IsZero() { return errors.New("start time cannot be zero") } if c.MaxCount < 0 { return errors.New("maxCount must be greater than or equal to 0") } if c.Interval <= time.Duration(0) && c.NextInterval == "" { return errors.New("interval must be greater than zero or nextInterval must be provided") } return nil } func (c Schedule) Value() (driver.Value, error) { return c.Marshal() } func (c *Schedule) Scan(value interface{}) error { return c.Unmarshal(value) } func (r Schedule) Marshal() ([]byte, error) { return json.Marshal(r) } func (r *Schedule) Unmarshal(value interface{}) error { if o, ok := value.(Schedule); ok { *r = o } else { b, ok := value.([]byte) if !ok { return errors.New("type assertion to []byte failed") } return json.Unmarshal(b, r) } return nil }
package model import ( "fmt" "queuer/helper" "reflect" ) type Task struct { Task interface{} Name string InputParameters []reflect.Type OutputParameters []reflect.Type } func NewTask(task interface{}) (*Task, error) { taskName, err := helper.GetTaskNameFromFunction(task) if err != nil { return nil, fmt.Errorf("error getting task name: %v", err) } return NewTaskWithName(task, taskName) } func NewTaskWithName(task interface{}, taskName string) (*Task, error) { if len(taskName) == 0 || len(taskName) > 100 { return nil, fmt.Errorf("taskName must have a length between 1 and 100") } err := helper.CheckValidTask(task) if err != nil { return nil, fmt.Errorf("task must be a function, got %s", reflect.TypeOf(task).Kind()) } inputParameters := []reflect.Type{} inputCount := reflect.TypeOf(task).NumIn() for i := 0; i < inputCount; i++ { inputParameters = append(inputParameters, reflect.TypeOf(task).In(i)) } outputParameters := []reflect.Type{} outputCount := reflect.TypeOf(task).NumOut() for i := 0; i < outputCount; i++ { outputParameters = append(outputParameters, reflect.TypeOf(task).Out(i)) } return &Task{ Task: task, Name: taskName, InputParameters: inputParameters, OutputParameters: outputParameters, }, nil }
package model import ( "fmt" "time" "github.com/google/uuid" ) const ( WorkerStatusRunning = "RUNNING" WorkerStatusFailed = "FAILED" ) type Worker struct { ID int `json:"id"` RID uuid.UUID `json:"rid"` Name string `json:"name"` Options *OnError `json:"options,omitempty"` MaxConcurrency int `json:"max_concurrency"` AvailableTasks []string `json:"available_tasks,omitempty"` AvailableNextIntervalFuncs []string `json:"available_next_interval,omitempty"` Status string `json:"status"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } func NewWorker(name string, maxConcurrency int) (*Worker, error) { if len(name) == 0 || len(name) > 100 { return nil, fmt.Errorf("name must have a length between 1 and 100") } if maxConcurrency < 1 || maxConcurrency > 100 { return nil, fmt.Errorf("maxConcurrency must be between 1 and 100") } return &Worker{ Name: name, MaxConcurrency: maxConcurrency, Status: WorkerStatusRunning, }, nil } func NewWorkerWithOptions(name string, maxConcurrency int, options *OnError) (*Worker, error) { if len(name) == 0 || len(name) > 100 { return nil, fmt.Errorf("name must have a length between 1 and 100") } if maxConcurrency < 1 || maxConcurrency > 1000 { return nil, fmt.Errorf("maxConcurrency must be between 1 and 100") } err := options.IsValid() if err != nil { return nil, fmt.Errorf("invalid options: %w", err) } return &Worker{ Name: name, Options: options, MaxConcurrency: maxConcurrency, Status: WorkerStatusRunning, }, nil }
package queuer import ( "context" "fmt" "log" "os" "queuer/core" "queuer/database" "queuer/helper" "queuer/model" "sync" "time" ) type Queuer struct { // Context ctx context.Context cancel context.CancelFunc // Runners activeRunners sync.Map // Worker worker *model.Worker // DBs dbJob database.JobDBHandlerFunctions dbWorker database.WorkerDBHandlerFunctions // Job listeners jobInsertListener *database.QueuerListener jobUpdateListener *database.QueuerListener jobDeleteListener *database.QueuerListener JobPollInterval time.Duration // Available functions tasks map[string]*model.Task nextIntervalFuncs map[string]model.NextIntervalFunc // Logger log *log.Logger } // NewQueuer creates a new Queuer instance with the given name and max concurrency. // It initializes the database connection, job listeners and worker. // If options are provided, it creates a worker with those options. // If any error occurs during initialization, it logs a panic error and exits the program. // It returns a pointer to the newly created Queuer instance. func NewQueuer(name string, maxConcurrency int, options ...*model.OnError) *Queuer { // Logger logger := log.New(os.Stdout, "Queuer: ", log.Ltime) // Database dbConfig, err := helper.NewDatabaseConfiguration() if err != nil { logger.Panicf("failed to create database configuration: %v", err) } dbConnection := helper.NewDatabase( "queuer", dbConfig, ) // DBs var dbJob database.JobDBHandlerFunctions var dbWorker database.WorkerDBHandlerFunctions dbJob, err = database.NewJobDBHandler(dbConnection) if err != nil { logger.Panicf("failed to create job db handler: %v", err) } dbWorker, err = database.NewWorkerDBHandler(dbConnection) if err != nil { logger.Panicf("failed to create worker db handler: %v", err) } // Job listeners jobInsertListener, err := database.NewQueuerListener(dbConfig, "job.INSERT") if err != nil { logger.Panicf("failed to create job insert listener: %v", err) } jobUpdateListener, err := database.NewQueuerListener(dbConfig, "job.UPDATE") if err != nil { logger.Panicf("failed to create job update listener: %v", err) } jobDeleteListener, err := database.NewQueuerListener(dbConfig, "job.DELETE") if err != nil { logger.Panicf("failed to create job delete listener: %v", err) } // Inserting worker var newWorker *model.Worker if len(options) > 0 { newWorker, err = model.NewWorkerWithOptions(name, maxConcurrency, options[0]) if err != nil { logger.Panicf("error creating new worker with options: %v", err) } } else { newWorker, err = model.NewWorker(name, maxConcurrency) if err != nil { logger.Panicf("error creating new worker: %v", err) } } worker, err := dbWorker.InsertWorker(newWorker) if err != nil { logger.Panicf("error inserting worker: %v", err) } logger.Printf("Queuer %s created with worker RID %s", worker.Name, worker.RID.String()) return &Queuer{ worker: worker, dbJob: dbJob, dbWorker: dbWorker, jobInsertListener: jobInsertListener, jobUpdateListener: jobUpdateListener, jobDeleteListener: jobDeleteListener, JobPollInterval: 1 * time.Minute, tasks: map[string]*model.Task{}, nextIntervalFuncs: map[string]model.NextIntervalFunc{}, log: logger, } } // NewQueuerWithoutWorker creates a new Queuer instance without a worker. // This is useful for scenarios where the queuer needs to be initialized without a worker, // such as when a seperate service is responsible for job status endpoints without processing jobs. // It initializes the database connection and job listeners. // If any error occurs during initialization, it logs a panic error and exits the program. // It returns a pointer to the newly created Queuer instance. func NewQueuerWithoutWorker() *Queuer { // Logger logger := log.New(os.Stdout, "Queuer: ", log.Ltime) // Database dbConfig, err := helper.NewDatabaseConfiguration() if err != nil { logger.Panicf("failed to create database configuration: %v", err) } dbConnection := helper.NewDatabase( "queuer", dbConfig, ) // DBs var dbJob database.JobDBHandlerFunctions var dbWorker database.WorkerDBHandlerFunctions dbJob, err = database.NewJobDBHandler(dbConnection) if err != nil { logger.Panicf("failed to create job db handler: %v", err) } dbWorker, err = database.NewWorkerDBHandler(dbConnection) if err != nil { logger.Panicf("failed to create worker db handler: %v", err) } // Job listeners // jobInsertListener, err := database.NewQueuerListener(dbConfig, "job.INSERT") // if err != nil { // logger.Panicf("failed to create job insert listener: %v", err) // } // jobUpdateListener, err := database.NewQueuerListener(dbConfig, "job.UPDATE") // if err != nil { // logger.Panicf("failed to create job update listener: %v", err) // } // jobDeleteListener, err := database.NewQueuerListener(dbConfig, "job.DELETE") // if err != nil { // logger.Panicf("failed to create job delete listener: %v", err) // } logger.Println("Queuer without worker created") return &Queuer{ dbJob: dbJob, dbWorker: dbWorker, // jobInsertListener: jobInsertListener, // jobUpdateListener: jobUpdateListener, // jobDeleteListener: jobDeleteListener, JobPollInterval: 1 * time.Minute, tasks: map[string]*model.Task{}, nextIntervalFuncs: map[string]model.NextIntervalFunc{}, log: logger, } } // Start starts the queuer by initializing the job listeners and starting the job poll ticker. // It checks if the queuer is initialized properly, and if not, it logs a panic error and exits the program. // It runs the job processing in a separate goroutine and listens for job events. func (q *Queuer) Start(ctx context.Context, cancel context.CancelFunc) { if q.dbJob == nil || q.dbWorker == nil || q.jobInsertListener == nil || q.jobUpdateListener == nil || q.jobDeleteListener == nil { q.log.Panicln("worker is not initialized properly") } q.ctx = ctx q.cancel = cancel go func() { ctx, cancel := context.WithCancel(q.ctx) defer cancel() go q.listen(ctx, cancel) err := q.pollJobTicker(ctx) if err != nil && ctx.Err() == nil { q.log.Printf("error starting job poll ticker: %v", err) return } q.log.Println("Queuer started") <-ctx.Done() q.log.Println("Queuer stopped") }() } // Stop stops the queuer by closing the job listeners, cancelling all queued and running jobs, // and cancelling the context to stop the queuer. func (q *Queuer) Stop() error { if q.jobInsertListener != nil { err := q.jobInsertListener.Listener.Close() if err != nil { return fmt.Errorf("error closing job insert listener: %v", err) } } if q.jobUpdateListener != nil { err := q.jobUpdateListener.Listener.Close() if err != nil { return fmt.Errorf("error closing job update listener: %v", err) } } if q.jobDeleteListener != nil { err := q.jobDeleteListener.Listener.Close() if err != nil { return fmt.Errorf("error closing job delete listener: %v", err) } } // Cancel all queued and running jobs err := q.CancelAllJobsByWorker(q.worker.RID, 100) if err != nil { return fmt.Errorf("error cancelling all jobs by worker: %v", err) } // Cancel the context to stop the queuer if q.ctx != nil { q.cancel() } q.log.Println("Queuer stopped") return nil } // Internal // listen listens to job events and runs the initial job processing. func (q *Queuer) listen(ctx context.Context, cancel context.CancelFunc) { go q.jobInsertListener.ListenToEvents(ctx, cancel, func(data string) { err := q.runJobInitial() if err != nil { q.log.Printf("error running job: %v", err) } }) // go q.jobUpdateListener.ListenToEvents(ctx, cancel) // go q.jobDeleteListener.ListenToEvents(ctx, cancel) } func (q *Queuer) pollJobTicker(ctx context.Context) error { ticker, err := core.NewTicker( q.JobPollInterval, func() { q.log.Println("Polling jobs...") err := q.runJobInitial() if err != nil { q.log.Printf("error running job: %v", err) } }, ) if err != nil { return fmt.Errorf("error creating ticker: %v", err) } q.log.Println("Starting job poll ticker...") err = ticker.Go(ctx) if err != nil { return fmt.Errorf("error starting ticker: %v", err) } return nil }
package queuer import ( "fmt" "log" "queuer/core" "queuer/model" "time" "github.com/google/uuid" ) // AddJob adds a job to the queue with the given task and parameters. // As a task you can either pass a function or a string with the task name // (necessary if you want to use a task with a name set by you). func (q *Queuer) AddJob(task interface{}, parameters ...interface{}) (*model.Job, error) { options := q.mergeOptions(nil) job, err := q.addJob(task, options, parameters...) if err != nil { return nil, fmt.Errorf("error adding job: %v", err) } q.log.Printf("Job added with RID %v", job.RID) return job, nil } // AddJobWithOptions adds a job with the given task, options, and parameters. // As a task you can either pass a function or a string with the task name // (necessary if you want to use a task with a name set by you). func (q *Queuer) AddJobWithOptions(options *model.Options, task interface{}, parameters ...interface{}) (*model.Job, error) { q.mergeOptions(options) job, err := q.addJob(task, options, parameters...) if err != nil { return nil, fmt.Errorf("error adding job: %v", err) } q.log.Printf("Job with options added with RID %v", job.RID) return job, nil } // AddJobs adds a batch of jobs to the queue. func (q *Queuer) AddJobs(batchJobs []model.BatchJob) error { var jobs []*model.Job for _, batchJob := range batchJobs { options := q.mergeOptions(batchJob.Options) newJob, err := model.NewJob(batchJob.Task, options, batchJob.Parameters...) if err != nil { return fmt.Errorf("error creating job with job options: %v", err) } jobs = append(jobs, newJob) } err := q.dbJob.BatchInsertJobs(jobs) if err != nil { return fmt.Errorf("error inserting jobs: %v", err) } q.log.Printf("%v jobs added", len(jobs)) return nil } // CancelJob cancels a job with the given job RID. func (q *Queuer) CancelJob(jobRid uuid.UUID) (*model.Job, error) { job, err := q.dbJob.SelectJob(jobRid) if err != nil { return nil, fmt.Errorf("error selecting job with rid %v, but already cancelled: %v", jobRid, err) } err = q.cancelJob(job) if err != nil { return nil, fmt.Errorf("error cancelling job with rid %v: %v", jobRid, err) } q.log.Printf("Job cancelled with RID %v", job.RID) return job, nil } func (q *Queuer) CancelAllJobsByWorker(workerRid uuid.UUID, entries int) error { jobs, err := q.dbJob.SelectAllJobsByWorkerRID(workerRid, 0, entries) if err != nil { return fmt.Errorf("error selecting jobs by worker RID %v: %v", workerRid, err) } for _, job := range jobs { err := q.cancelJob(job) if err != nil { return fmt.Errorf("error cancelling job with rid %v: %v", job.RID, err) } q.log.Printf("Job cancelled with RID %v", job.RID) } return nil } // ReaddJobFromArchive readds a job from the archive back to the queue. func (q *Queuer) ReaddJobFromArchive(jobRid uuid.UUID) (*model.Job, error) { job, err := q.dbJob.SelectJobFromArchive(jobRid) if err != nil { return nil, fmt.Errorf("error selecting job from archive with rid %v: %v", jobRid, err) } // Readd the job to the queue newJob, err := q.AddJobWithOptions(job.Options, job.TaskName, job.Parameters...) if err != nil { return nil, fmt.Errorf("error readding job: %v", err) } q.log.Printf("Job readded with RID %v", newJob.RID) return newJob, nil } // GetJob retrieves a job by its RID. func (q *Queuer) GetJob(jobRid uuid.UUID) (*model.Job, error) { job, err := q.dbJob.SelectJob(jobRid) if err != nil { return nil, fmt.Errorf("error selecting job with rid %v: %v", jobRid, err) } return job, nil } // GetJobs retrieves all jobs in the queue. func (q *Queuer) GetJobs(lastId int, entries int) ([]*model.Job, error) { jobs, err := q.dbJob.SelectAllJobs(lastId, entries) if err != nil { return nil, fmt.Errorf("error selecting all jobs: %v", err) } return jobs, nil } // GetJobsByWorkerRID retrieves jobs assigned to a specific worker by its RID. func (q *Queuer) GetJobsByWorkerRID(workerRid uuid.UUID, lastId int, entries int) ([]*model.Job, error) { jobs, err := q.dbJob.SelectAllJobsByWorkerRID(workerRid, lastId, entries) if err != nil { return nil, fmt.Errorf("error selecting jobs by worker RID %v: %v", workerRid, err) } return jobs, nil } // Internal // mergeOptions merges the worker options with optional job options. func (q *Queuer) mergeOptions(options *model.Options) *model.Options { if options != nil && options.OnError == nil { options.OnError = q.worker.Options } else if options == nil && q.worker.Options != nil { options = &model.Options{OnError: q.worker.Options} } return options } // addJob adds a job to the queue with all necessary parameters. func (q *Queuer) addJob(task interface{}, options *model.Options, parameters ...interface{}) (*model.Job, error) { newJob, err := model.NewJob(task, options, parameters...) if err != nil { return nil, fmt.Errorf("error creating job: %v", err) } job, err := q.dbJob.InsertJob(newJob) if err != nil { return nil, fmt.Errorf("error inserting job: %v", err) } return job, nil } // runJobInitial is called to run the next job in the queue. func (q *Queuer) runJobInitial() error { // Update job status to running with worker. jobs, err := q.dbJob.UpdateJobsInitial(q.worker) if err != nil { return fmt.Errorf("error updating job status to running: %v", err) } else if len(jobs) == 0 { return nil } for _, job := range jobs { if job.Options != nil && job.Options.Schedule != nil && job.Options.Schedule.Start.After(time.Now()) { scheduler, err := core.NewScheduler( &job.Options.Schedule.Start, q.scheduleJob, job, ) if err != nil { return fmt.Errorf("error creating scheduler: %v", err) } log.Printf("Scheduling job with RID %v to run at %v", job.RID, job.Options.Schedule.Start) go scheduler.Go(q.ctx) } else { go func() { q.log.Printf("Running job with RID %v", job.RID) resultValues, err := q.waitForJob(job) if err != nil { q.retryJob(job, err) } else { q.succeedJob(job, resultValues) } }() } } return nil } // waitForJob executes the job and returns the results or an error. func (q *Queuer) waitForJob(job *model.Job) ([]interface{}, error) { // Run job and update job status to completed with results // TODO At this point the task should be available in the queuer, // but we should probably still check if the task is available? task := q.tasks[job.TaskName] runner, err := core.NewRunnerFromJob(task, job) if err != nil { return []interface{}{}, fmt.Errorf("error creating runner: %v", err) } var results []interface{} q.activeRunners.Store(job.RID, runner) go runner.Run(q.ctx) select { case err = <-runner.ErrorChannel: break case results = <-runner.ResultsChannel: break case <-q.ctx.Done(): runner.Cancel() break } q.activeRunners.Delete(job.RID) return results, err } // retryJob retries the job with the given job error. func (q *Queuer) retryJob(job *model.Job, jobErr error) { if job.Options == nil || job.Options.OnError.MaxRetries <= 0 { q.failJob(job, jobErr) return } var err error var results []interface{} retryer, err := core.NewRetryer( func() error { q.log.Printf("Trying/retrying job with RID %v", job.RID) results, err = q.waitForJob(job) if err != nil { return fmt.Errorf("error retrying job: %v", err) } return nil }, job.Options.OnError, ) if err != nil { jobErr = fmt.Errorf("error creating retryer: %v, original error: %v", err, jobErr) } err = retryer.Retry() if err != nil { q.failJob(job, fmt.Errorf("error retrying job: %v, original error: %v", err, jobErr)) } else { q.succeedJob(job, results) } } // scheduleJob retries the job. func (q *Queuer) scheduleJob(job *model.Job) { q.log.Printf("Running scheduled job with RID %v", job.RID) resultValues, err := q.waitForJob(job) if err != nil { q.retryJob(job, err) } else { q.succeedJob(job, resultValues) } } func (q *Queuer) cancelJob(job *model.Job) error { switch job.Status { case model.JobStatusRunning: jobRunner, found := q.activeRunners.Load(job.RID) if !found { return fmt.Errorf("job with rid %v not found or not running", job.RID) } runner := jobRunner.(*core.Runner) runner.Cancel(func() { job.Status = model.JobStatusCancelled _, err := q.dbJob.UpdateJobFinal(job) if err != nil { q.log.Printf("error updating job status to cancelled: %v", err) } }) case model.JobStatusScheduled, model.JobStatusQueued: job.Status = model.JobStatusCancelled _, err := q.dbJob.UpdateJobFinal(job) if err != nil { q.log.Printf("error updating job status to cancelled: %v", err) } } return nil } // succeedJob updates the job status to succeeded and runs the next job if available. func (q *Queuer) succeedJob(job *model.Job, results []interface{}) { job.Status = model.JobStatusSucceeded job.Results = results _, err := q.dbJob.UpdateJobFinal(job) if err != nil { // TODO probably add retry for updating job to succeeded q.log.Printf("error updating job status to succeeded: %v", err) } q.log.Printf("Job succeeded with RID %v", job.RID) // Try running next job if available err = q.runJobInitial() if err != nil { q.log.Printf("error running next job: %v", err) } } func (q *Queuer) failJob(job *model.Job, jobErr error) { job.Status = model.JobStatusFailed job.Error = jobErr.Error() _, err := q.dbJob.UpdateJobFinal(job) if err != nil { // TODO probably add retry for updating job to failed q.log.Printf("error updating job status to failed: %v", err) } q.log.Printf("Job failed with RID %v", job.RID) // Try running next job if available err = q.runJobInitial() if err != nil { q.log.Printf("error running next job: %v", err) } }
package queuer import ( "queuer/helper" "queuer/model" "slices" ) func (q *Queuer) AddNextIntervalFunc(nif model.NextIntervalFunc) *model.Worker { if nif == nil { q.log.Panicf("error adding next interval: NextIntervalFunc cannot be nil") } nifName, err := helper.GetTaskNameFromInterface(nif) if err != nil { q.log.Panicf("error getting function name: %v", err) } if slices.Contains(q.worker.AvailableNextIntervalFuncs, nifName) { q.log.Panicf("NextIntervalFunc with name %v already exists", nifName) } q.nextIntervalFuncs[nifName] = nif q.worker.AvailableNextIntervalFuncs = append(q.worker.AvailableNextIntervalFuncs, nifName) worker, err := q.dbWorker.UpdateWorker(q.worker) if err != nil { q.log.Panicf("error updating worker: %v", err) } q.log.Printf("NextInterval function added with name %v", nifName) return worker } func (q *Queuer) AddNextIntervalFuncWithName(nif model.NextIntervalFunc, name string) *model.Worker { if nif == nil { q.log.Panicf("NextIntervalFunc cannot be nil") } if slices.Contains(q.worker.AvailableNextIntervalFuncs, name) { q.log.Panicf("NextIntervalFunc with name %v already exists", name) } q.nextIntervalFuncs[name] = nif q.worker.AvailableNextIntervalFuncs = append(q.worker.AvailableNextIntervalFuncs, name) worker, err := q.dbWorker.UpdateWorker(q.worker) if err != nil { q.log.Panicf("error updating worker: %v", err) } q.log.Printf("NextInterval function added with name %v", name) return worker }
package queuer import ( "log" "queuer/model" ) // AddTask adds a new task to the queuer. // It creates a new task with the provided task interface, adds it to the worker's available tasks, // and updates the worker in the database. // The task name is automatically generated based on the task's function name (eg. main.TestTask). // If the task creation fails, it logs a panic error and exits the program. // It returns the newly created task. func (q *Queuer) AddTask(task interface{}) *model.Task { newTask, err := model.NewTask(task) if err != nil { log.Panicf("error creating new task: %v", err) } q.tasks[newTask.Name] = newTask q.worker.AvailableTasks = append(q.worker.AvailableTasks, newTask.Name) // Update worker in DB _, err = q.dbWorker.UpdateWorker(q.worker) if err != nil { log.Panicf("error updating worker: %v", err) } q.log.Printf("Task added with name %v", newTask.Name) return newTask } // AddTaskWithName adds a new task with a specific name to the queuer. // It creates a new task with the provided task interface and name, adds it to the worker's available tasks, // and updates the worker in the database. // If task creation fails, it logs a panic error and exits the program. // It returns the newly created task. func (q *Queuer) AddTaskWithName(task interface{}, name string) *model.Task { newTask, err := model.NewTaskWithName(task, name) if err != nil { log.Panicf("error creating new task: %v", err) } q.tasks[newTask.Name] = newTask q.worker.AvailableTasks = append(q.worker.AvailableTasks, newTask.Name) // Update worker in DB _, err = q.dbWorker.UpdateWorker(q.worker) if err != nil { log.Panicf("error updating worker: %v", err) } q.log.Printf("Task added with name %v", newTask.Name) return newTask }
package queuer import ( "fmt" "queuer/model" "github.com/google/uuid" ) // GetWorker retrieves a worker by its RID (Resource Identifier). // It returns the worker if found, or an error if not. func (q *Queuer) GetWorker(workerRid uuid.UUID) (*model.Worker, error) { worker, err := q.dbWorker.SelectWorker(workerRid) if err != nil { return nil, err } return worker, nil } // GetWorkers retrieves a list of workers starting from the lastId and returning the specified number of entries. // It returns a slice of workers and an error if any occurs. func (q *Queuer) GetWorkers(lastId int, entries int) ([]*model.Worker, error) { if lastId < 0 { return nil, fmt.Errorf("lastId cannot be negative, got %d", lastId) } if entries <= 0 { return nil, fmt.Errorf("entries must be greater than zero, got %d", entries) } workers, err := q.dbWorker.SelectAllWorkers(lastId, entries) if err != nil { return nil, err } return workers, nil }