// Package workgroup provides a mechanism for synchronizing goroutines,
// propagating errors, and context cancellation signals.
// It is designed for managing collections of goroutines working on
// subtasks of a common task. This package is a fork of the
// errgroup.Group library available in `x/sync`, but with modified
// behavior in how it handles goroutine errors and cancellation.
//
// This package offers two different failure modes:
//
// - Collect - All goroutines are allowed to complete, and all errors
// encountered across different goroutines are collected. Wait()
// returns a joined error that combines all errors from the
// individual goroutines.
//
// - FailFast - The first error encountered immediately cancels the
// context of all remaining goroutines and causes Wait() to return
// that error.
//
// `workgroup.Group` also provides options to set a retry policy for
// individual goroutines within the group. A zero-value `Group` will
// collect all errors and return them as a single error.
package workgroup
import (
"context"
"errors"
"sync"
"github.com/avast/retry-go"
)
// FailureMode defines how the workgroup handles errors encountered
// in its goroutines.
type FailureMode int
const (
// Collect instructs the workgroup to collect all errors from
// its goroutines and return them as a single error from `Wait()`.
Collect FailureMode = iota
// FailFast instructs the workgroup to halt execution and cancel
// all remaining goroutines upon the first error encountered.
FailFast
)
// Option is a function that configures a workgroup.
type Option func(*Group)
// WithLimit sets the maximum number of goroutines that can execute
// concurrently within the workgroup.
func WithLimit(n int) Option {
return func(g *Group) {
g.sem = make(chan struct{}, n)
}
}
// WithRetry sets the retry policy for individual goroutines
// within the workgroup.
func WithRetry(opts ...retry.Option) Option {
return func(g *Group) {
g.retryOptions = append(g.retryOptions, opts...)
}
}
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
// A zero-value `workgroup.Group` is valid and has the following default behavior:
// - No limit on the number of concurrently executing goroutines.
// - Does not cancel on error (uses `Collect` failure mode).
// - Does not retry on error.
type Group struct {
cancel func()
err error
errOnce sync.Once
errLock sync.Mutex
wg sync.WaitGroup
sem chan struct{}
failureMode FailureMode
retryOptions []retry.Option
}
// New creates a new workgroup with the specified failure mode and options.
// It returns a context that is derived from `ctx`.
// The derived context is canceled when the workgroup finishes
// or is canceled explicitly.
// If no Retry is specified, the default behavior is no retries.
func New(ctx context.Context, mode FailureMode, opts ...Option) (context.Context, *Group) {
ctx, cancel := context.WithCancel(ctx)
g := &Group{
cancel: cancel,
failureMode: mode,
retryOptions: []retry.Option{
retry.Attempts(1),
retry.LastErrorOnly(true),
retry.Context(ctx),
},
}
for _, opt := range opts {
opt(g)
}
return ctx, g
}
// Go launches a new goroutine within the workgroup to execute the
// provided function. The function may be retried according to the
// workgroup's retry policy.
// It blocks until the new goroutine can be added without exceeding the
// configured concurrency limit.
func (g *Group) Go(ctx context.Context, fn func() error) {
g.add()
go func() {
defer g.done()
err := retry.Do(fn, g.retryOptions...)
if err != nil {
g.errLock.Lock()
defer g.errLock.Unlock()
if g.failureMode == FailFast {
// In FailFast mode, cancel the workgroup context and
// store the first error encountered.
g.errOnce.Do(func() {
g.err = err
// Signal cancellation to all goroutines.
g.Cancel()
})
return
}
// In Collect mode, aggregate errors from all goroutines.
g.err = errors.Join(g.err, err)
}
}()
}
// Wait blocks until all goroutines in the workgroup have completed.
// It returns nil if all goroutines were successful, or an error
// aggregating the errors encountered, depending on the configured
// failure mode.
func (g *Group) Wait() error {
g.wg.Wait()
// Ensure context is canceled after all goroutines finish.
g.Cancel()
return g.err
}
// Cancel cancels the workgroup context, signaling all running
// goroutines to stop.
func (g *Group) Cancel() {
if g.cancel != nil {
g.cancel()
}
}
func (g *Group) add() {
if g.sem != nil {
g.sem <- struct{}{}
}
g.wg.Add(1)
}
func (g *Group) done() {
if g.sem != nil {
<-g.sem
}
g.wg.Done()
}