package scheduler
import "github.com/horockey/go-scheduler/internal/model"
// Get standart header for event ID.
func EventHeaderID() string {
return model.HeaderID
}
// Get standart header for event creation time.
func EventHeaderCreatedAt() string {
return model.HeaderCreatedAt
}
package model
import (
"strings"
"time"
"github.com/google/uuid"
)
type Event[T any] struct {
Payload T
tags map[string]struct{}
headers map[string]string
}
const (
HeaderID string = "ID"
HeaderCreatedAt string = "CREATED_AT"
)
func NewEvent[T any](payload T) *Event[T] {
return &Event[T]{
Payload: payload,
tags: map[string]struct{}{},
headers: map[string]string{
HeaderID: uuid.NewString(),
HeaderCreatedAt: time.Now().Format(time.RFC3339),
},
}
}
// Add a tag to event.
// Tag will be canonicalized before adding.
func (e *Event[T]) Tag(t string) {
e.tags[strings.ToUpper(t)] = struct{}{}
}
// Get all event's tags
func (e *Event[T]) Tags() []string {
res := make([]string, 0, len(e.tags))
for tag := range e.tags {
res = append(res, tag)
}
return res
}
// Add header to event.
// Header's key will be canonicalized before adding.
func (e *Event[T]) Header(key, value string) {
e.headers[strings.ToUpper(key)] = value
}
// Get all event's headers.
func (e *Event[T]) Headers() map[string]string {
res := map[string]string{}
for k, v := range e.headers {
res[k] = v
}
return res
}
package scheduler
import (
"fmt"
"time"
"github.com/horockey/go-scheduler/internal/model"
"github.com/horockey/go-toolbox/options"
)
// Add a tag to event.
// Tag will be canonicalized before adding.
func Tag[T any](t string) options.Option[model.Node[T]] {
return func(target *model.Node[T]) error {
target.Event.Tag(t)
return nil
}
}
// Add header to event.
// Header's key will be canonicalized before adding.
func Header[T any](k, v string) options.Option[model.Node[T]] {
return func(target *model.Node[T]) error {
target.Event.Header(k, v)
return nil
}
}
// Emit event after given duration.
// Duration must be positive.
func After[T any](dur time.Duration) options.Option[model.Node[T]] {
return func(target *model.Node[T]) error {
if dur <= 0 {
return fmt.Errorf("duration must be positive: %d", dur)
}
target.At = time.Now().Add(dur)
return nil
}
}
// Emit event at given time.
func At[T any](t time.Time) options.Option[model.Node[T]] {
return func(target *model.Node[T]) error {
target.At = t
return nil
}
}
// Continue eminitg event every given duration.
func Every[T any](dur time.Duration) options.Option[model.Node[T]] {
return func(target *model.Node[T]) error {
if dur <= 0 {
return fmt.Errorf("duration must be positive: %d", dur)
}
target.Every = dur
return nil
}
}
package scheduler
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/horockey/go-scheduler/internal/model"
"github.com/horockey/go-toolbox/options"
"golang.org/x/exp/slices"
)
var (
ErrNotRunning = fmt.Errorf("scheduller is not running")
ErrEventNotFound = fmt.Errorf("event for given filter not found")
ErrUnexpectedEmptyList = fmt.Errorf("unexpected empty shedule event list")
ErrEventWithNoIDHeader = fmt.Errorf("got event with no ID header. It will be generated")
)
type Scheduler[T any] struct {
mu sync.RWMutex
isRunning bool
nodes []*model.Node[T]
headChanged chan struct{}
emitEvent chan *model.Event[T]
timeCh <-chan time.Time
errorCB func(error)
}
// Create new scheduler with given opts.
func NewScheduler[T any](opts ...options.Option[Scheduler[T]]) (*Scheduler[T], error) {
s := &Scheduler[T]{
nodes: []*model.Node[T]{},
headChanged: make(chan struct{}, 100),
emitEvent: make(chan *model.Event[T], 100),
timeCh: make(<-chan time.Time),
errorCB: func(err error) {},
}
if err := options.ApplyOptions(s, opts...); err != nil {
return nil, fmt.Errorf("applying opts: %w", err)
}
return s, nil
}
// Start scheduler.
// To stop it, given context should be canceled.
func (s *Scheduler[T]) Start(ctx context.Context) error {
updTimeChan := func() {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.nodes) == 0 {
s.timeCh = make(<-chan time.Time)
return
}
dur := time.Until(s.nodes[0].At)
if dur < 0 {
dur = 0
}
s.timeCh = time.After(dur)
}
emitEvent := func() {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.nodes) == 0 {
s.errorCB(ErrUnexpectedEmptyList)
return
}
n := s.nodes[0]
s.emitEvent <- n.Event
s.removeNode(0)
if n.Every > 0 {
s.approveIdHeader(n)
n.At = time.Now().Add(n.Every)
s.addNode(n)
}
}
s.setRunning(true)
for {
select {
case <-s.timeCh:
emitEvent()
case <-s.headChanged:
updTimeChan()
case <-ctx.Done():
s.setRunning(false)
close(s.headChanged)
close(s.emitEvent)
return fmt.Errorf("context err: %w", ctx.Err())
}
}
}
// Schedule new event.
// Scheduler must be started to call this method properly.
func (s *Scheduler[T]) Schedule(payload T, opts ...options.Option[model.Node[T]]) (*model.Event[T], error) {
e := model.NewEvent[T](payload)
n := &model.Node[T]{
Event: e,
At: time.Now(),
}
if err := options.ApplyOptions(n, opts...); err != nil {
return nil, fmt.Errorf("applying opts: %w", err)
}
s.mu.Lock()
defer s.mu.Unlock()
if !s.isRunning {
return nil, ErrNotRunning
}
s.addNode(n)
return e, nil
}
// Unschedule scheduled event by id.
// Scheduler must be started to call this method properly.
func (s *Scheduler[T]) Unschedule(id string) error {
s.mu.Lock()
defer s.mu.Unlock()
for idx, node := range s.nodes {
eventId := s.approveIdHeader(node)
if eventId != id {
continue
}
s.removeNode(idx)
return nil
}
return ErrEventNotFound
}
// Unschedule all scheduled events with given tag.
// Scheduler must be started to call this method properly.
func (s *Scheduler[T]) UnscheduleByTag(tag string) error {
s.mu.Lock()
defer s.mu.Unlock()
tag = strings.ToUpper(tag)
idxToRemove := []int{}
for idx, node := range s.nodes {
if !slices.Contains(node.Event.Tags(), tag) {
continue
}
idxToRemove = append(idxToRemove, idx)
}
if len(idxToRemove) == 0 {
return ErrEventNotFound
}
for removed, idx := range idxToRemove {
s.removeNode(idx - removed)
}
return nil
}
// Unschedule all scheduled events with given header key-value pair.
// Scheduler must be started to call this method properly.
func (s *Scheduler[T]) UnscheduleByHeader(key, value string) error {
s.mu.Lock()
defer s.mu.Unlock()
key = strings.ToUpper(key)
idxToRemove := []int{}
for idx, node := range s.nodes {
if val, ok := node.Event.Headers()[key]; !ok || val != value {
continue
}
idxToRemove = append(idxToRemove, idx)
}
if len(idxToRemove) == 0 {
return ErrEventNotFound
}
for removed, idx := range idxToRemove {
s.removeNode(idx - removed)
}
return nil
}
// Get channel, that emits scheduled events.
func (s *Scheduler[T]) EmitChan() <-chan *model.Event[T] {
return s.emitEvent
}
func (s *Scheduler[T]) addNode(node *model.Node[T]) {
s.nodes = append(s.nodes, node)
oldHead := s.nodes[0]
sort.Slice(s.nodes, func(i, j int) bool {
return s.nodes[i].At.Before(s.nodes[j].At)
})
newHead := s.nodes[0]
if oldHead != newHead || len(s.nodes) == 1 {
s.headChanged <- struct{}{}
}
}
func (s *Scheduler[T]) removeNode(idx int) {
switch len(s.nodes) {
case 1:
s.nodes = []*model.Node[T]{}
default:
s.nodes = append(s.nodes[:idx], s.nodes[idx+1:]...)
}
if idx == 0 {
s.headChanged <- struct{}{}
}
}
func (s *Scheduler[T]) approveIdHeader(node *model.Node[T]) string {
id, ok := node.Event.Headers()[model.HeaderID]
if !ok {
s.errorCB(ErrEventWithNoIDHeader)
id = uuid.NewString()
node.Event.Header(model.HeaderID, id)
}
return id
}
func (s *Scheduler[T]) setRunning(v bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.isRunning = v
}
package scheduler
import (
"fmt"
"github.com/horockey/go-scheduler/internal/model"
"github.com/horockey/go-toolbox/options"
)
// Set custom out chan.
// Strongly recommended to be left default!
// Change it only if you know what you are doing!
func OutChan[T any](ch chan *model.Event[T]) options.Option[Scheduler[T]] {
return func(target *Scheduler[T]) error {
if ch == nil {
return fmt.Errorf("got nil channel")
}
close(target.emitEvent)
target.emitEvent = ch
return nil
}
}
// Add custom error handler.
// By default it does noothing.
func ErrorCB[T any](cb func(error)) options.Option[Scheduler[T]] {
return func(target *Scheduler[T]) error {
if cb == nil {
return fmt.Errorf("got nil callback")
}
target.errorCB = cb
return nil
}
}