package flow
import "sync"
// Task processes values from in and sends results to out.
// Must close out when done, including when returning an error.
// Returning a non-nil error signals that processing has failed;
// closing the out channel ensures downstream tasks and goroutines terminate cleanly.
type Task[In, Out any] func(in <-chan In, out chan<- Out) error
// Concurrent runs n goroutines of the given task sharing the same in/out channels.
// If any worker returns an error, the first non-nil error is returned.
func Concurrent[In, Out any](n int, t Task[In, Out]) Task[In, Out] {
return func(in <-chan In, out chan<- Out) error {
var (
wg sync.WaitGroup
once sync.Once
errs = make(chan error, n)
)
wg.Add(n)
for range n {
ch := make(chan Out)
go func() {
errs <- t(in, ch)
}()
go func() {
defer wg.Done()
for v := range ch {
out <- v
}
}()
}
wg.Wait()
close(out)
var firstErr error
for range n {
if err := <-errs; err != nil {
once.Do(func() { firstErr = err })
}
}
return firstErr
}
}
// Pipe connects two tasks: the output of a feeds into b.
// If either task returns an error, it is propagated to the caller.
func Pipe[A, B, C any](a Task[A, B], b Task[B, C]) Task[A, C] {
return func(in <-chan A, out chan<- C) error {
ch := make(chan B)
var (
wg sync.WaitGroup
bErr error
)
wg.Add(1)
go func() {
defer wg.Done()
bErr = b(ch, out)
}()
aErr := a(in, ch)
wg.Wait()
if aErr != nil {
return aErr
}
return bErr
}
}
// Chain composes multiple same-type tasks into a single task.
// If any task in the chain returns an error, it is propagated to the caller.
func Chain[T any](tasks ...Task[T, T]) Task[T, T] {
t := tasks[0]
for _, next := range tasks[1:] {
t = Pipe(t, next)
}
return t
}
// FromValues starts the task with the given input values and returns the collected output.
// Returns a non-nil error if the task fails.
func FromValues[In, Out any](t Task[In, Out], input ...In) ([]Out, error) {
return FromSlice(t, input)
}
// FromSlice starts the task with the given input slice and returns the collected output.
// Returns a non-nil error if the task fails.
func FromSlice[In, Out any](t Task[In, Out], input []In) ([]Out, error) {
ch := make(chan In)
go func() {
defer close(ch)
for _, v := range input {
ch <- v
}
}()
return FromChannel(t, ch)
}
// FromChannel starts the task with values from a channel and returns the collected output.
// Returns a non-nil error if the task fails.
func FromChannel[In, Out any](t Task[In, Out], in <-chan In) ([]Out, error) {
out := make(chan Out)
var results []Out
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for v := range out {
results = append(results, v)
}
}()
err := t(in, out)
wg.Wait()
return results, err
}
package flow
// Logger is the interface used by logging helpers to emit structured log messages.
type Logger interface {
Info(msg string, args ...any)
}
// LogEveryN returns a pass-through Task that logs a message every n items processed.
// The message will be emitted with the given args and additionally with "count" set to the number of items processed.
// If the input channel is closed, a final message will be emitted with "closed" set to true.
func LogEveryN[T any](n int, logger Logger, msg string, args ...any) Task[T, T] {
return func(in <-chan T, out chan<- T) error {
defer close(out)
var count int
for v := range in {
count++
if count%n == 0 {
logger.Info(msg, append([]any{"count", count}, args...)...)
}
out <- v
}
logger.Info(msg, append([]any{"count", count, "closed", true}, args...)...)
return nil
}
}
// Filter returns a Task that only forwards items for which predicate returns true.
// The predicate may also return an error, which will be returned by the Task to abort the pipeline.
func Filter[T any](predicate func(T) (bool, error)) Task[T, T] {
return func(in <-chan T, out chan<- T) error {
defer close(out)
for v := range in {
ok, err := predicate(v)
if err != nil {
return err
}
if ok {
out <- v
}
}
return nil
}
}
// ForEach returns a Task that calls fn for each item and forwards the result.
// The function may also return an error, which will be returned by the Task to abort the pipeline.
func ForEach[In, Out any](fn func(in In) (Out, error)) Task[In, Out] {
return func(in <-chan In, out chan<- Out) error {
defer close(out)
for v := range in {
result, err := fn(v)
if err != nil {
return err
}
out <- result
}
return nil
}
}
// Append returns a Task that appends all items to the given slice.
func Append[T any](s *[]T) Task[T, T] {
return func(in <-chan T, out chan<- T) error {
defer close(out)
for v := range in {
*s = append(*s, v)
out <- v
}
return nil
}
}
// Tee returns a Task that forwards all items to the given channel and the output channel.
func Tee[T any](tee chan<- T) Task[T, T] {
return func(in <-chan T, out chan<- T) error {
defer close(out)
for v := range in {
tee <- v
out <- v
}
return nil
}
}