package pipelines
import (
"context"
"sync"
)
// GenerateStream takes a function that generates data and returns a channel of type T
func GenerateStream[T any](ctx context.Context, fn func(context.Context) T) <-chan T {
stream := make(chan T)
go func() {
defer close(stream)
for {
select {
case <-ctx.Done():
return
case stream <- fn(ctx):
}
}
}()
return stream
}
// StreamSlice takes a slice of type []T and returns a channel of type T
func StreamSlice[T any](ctx context.Context, data []T) <-chan T {
stream := make(chan T)
go func() {
defer close(stream)
for i := range data {
select {
case <-ctx.Done():
return
case stream <- data[i]:
}
}
}()
return stream
}
// StreamMap takes a map of type map[T]H and returns a channel of type H
func StreamMap[T comparable, H comparable](ctx context.Context, data map[T]H) <-chan H {
stream := make(chan H)
go func() {
defer close(stream)
for i := range data {
select {
case <-ctx.Done():
return
case stream <- data[i]:
}
}
}()
return stream
}
// FanOut controls concurrent processing of data from the input channel
func FanOut[T any, H any](ctx context.Context, inputStream <-chan T, fn func(context.Context, T) H, numFan int) []<-chan H {
process := func() <-chan H {
stream := make(chan H)
go func() {
defer close(stream)
for value := range inputStream {
select {
case <-ctx.Done():
return
default:
// process data with supplied function
stream <- fn(ctx, value)
}
}
}()
return stream
}
fanOutChannels := make([]<-chan H, numFan)
for i := 0; i < numFan; i++ {
fanOutChannels[i] = process()
}
return fanOutChannels
}
// FanIn takes any number of readonly channels and returns a fanned in channel
func FanIn[T any](ctx context.Context, channels ...<-chan T) <-chan T {
var wg sync.WaitGroup
fannedInStream := make(chan T)
transfer := func(c <-chan T) {
defer wg.Done()
for msg := range c {
select {
case <-ctx.Done():
return
case fannedInStream <- msg:
}
}
}
for i := range channels {
wg.Add(1)
go transfer(channels[i])
}
go func() {
wg.Wait()
close(fannedInStream)
}()
return fannedInStream
}