// Copyright 2025 Ahmad Sameh(asmsh)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sema
import (
"runtime"
"sync/atomic"
)
// Group guards concurrent access to a resource by providing methods to
// control concurrency, observe usage counters, and wait for in-flight
// operations to complete.
//
// The zero value is a ready to use [Group], with no concurrency limit,
// comparable to a [sync.WaitGroup].
//
// Group is best suited for concurrent tasks of equal weight or cost.
type Group struct {
// waitChan is created lazily in Wait, only if it hasn't already,
// and there are some active calls.
// it's an unbuffered channel and is closed once the Group zeros.
waitChan atomic.Value // chan struct{}
// blockChan is created lazily in [NewGroup] or [Group.SetSize],
// only if size > 0.
// it's an unbuffered channel that's never closed.
blockChan atomic.Value // chan struct{}
// high 32 bits are pending count, low 32 bits are active count.
counter atomic.Uint64
// size is the maximum number of N that can be reserved.
size atomic.Uint32
}
// NewGroup creates a new [Group] with the provided size.
// The [Group] size is the concurrency limit that it can handle.
func NewGroup(size int) *Group {
g := &Group{}
g.setSize(size)
return g
}
func (g *Group) setSize(size int) {
// normalize negative size to 0.
if size < 0 {
size = 0
}
// only create the blockChan if size is not zero.
if size > 0 {
// make sure the size isn't too big.
s := uint32(size)
if int(s) != size {
panic("sema.Group: incorrect group size")
}
// save the size and create the block chan.
g.size.Store(s)
g.blockChan.Store(make(chan struct{}))
}
}
func counterParts(counter uint64) (pending uint32, active int32) {
return uint32(counter >> 32), int32(counter)
}
func (g *Group) counterUpdate(
oldCounter uint64,
pendingDelta int,
activeDelta int,
) (newCounter uint64, ok bool) {
oldPending, oldActive := counterParts(oldCounter)
newPending := oldPending + uint32(pendingDelta)
newActive := oldActive + int32(activeDelta)
// Check for overflow/underflow from low
if activeDelta > 0 && newActive < oldActive {
newPending += 1 // Carry
} else if activeDelta < 0 && newActive > oldActive {
newPending -= 1 // Borrow
}
newCounter = uint64(newPending)<<32 | uint64(uint32(newActive))
if g.counter.CompareAndSwap(oldCounter, newCounter) {
return newCounter, true
}
return newCounter, false
}
// SetSize sets the [Group.Size] to the passed value.
//
// It panics if it's called on a non-zero [Group].
// It panics if the [Group.Size] size was set before, either through this
// method or through the [NewGroup] function.
// This means that it should be called once before any other methods, and
// only on the zero value of a [Group].
//
// If size is zero or negative, then the [Group] has no limits, and no
// [Group.Reserve] or [Group.ReserveN] calls will block.
//
// Calling it with 0 on a zero [Group] has no effect.
func (g *Group) SetSize(size int) {
// check if the Group has already started counting.
if g.counter.Load() != 0 {
panic("sema.Group: concurrent Reserve calls while initializing group")
}
// check if the Group is already initialized.
if g.blockChan.Load() != nil {
panic("sema.Group: group already initialized")
}
g.setSize(size)
// re-check if the Group has already started counting.
if g.counter.Load() != 0 {
panic("sema.Group: concurrent Reserve calls while initializing group")
}
}
// Size is the current limit of this [Group], which is the maximum
// N resources allowed to be active at the same time.
//
// The [Group] size is the concurrency limit that it can handle.
// If it's 0, then the [Group] has no limit.
func (g *Group) Size() int {
return int(g.size.Load())
}
// ActiveCount is the total number of successfully reserved N resources
// via calling either [Group.Reserve] or [Group.TryReserve].
// It represents the number of N that's currently used from this [Group]'s
// size, which can never be greater than size.
func (g *Group) ActiveCount() int {
_, active := counterParts(g.counter.Load())
return int(active)
}
// PendingCount is the total number of N resources that's pending and
// blocking their [Group.Reserve] calls, waiting for matching [Group.Free]
// calls to unblock them.
func (g *Group) PendingCount() int {
pending, _ := counterParts(g.counter.Load())
return int(pending)
}
// Reserve increments [Group.ActiveCount] by 1, blocking if needed until
// there's room made available by [Group.Free] or [Group.FreeN] calls.
//
// It returns immediately if the [Group.PendingCount] is 0, and there's
// available room for 1 (in [Group.ActiveCount] against the [Group.Size]).
// Otherwise, it increments the [Group.PendingCount] instead while being
// blocked, until there's room.
//
// If there's a room made available, it wakes up in random order with other
// blocked [Group.Reserve] and [Group.ReserveN] calls.
//
// It always returns immediately, without blocking, if the [Group.Size] is 0.
//
// It always updates the [Group.ActiveCount] and [Group.PendingCount] before
// returning.
func (g *Group) Reserve() {
g.ReserveN(nil, 1)
}
// ReserveN increments [Group.ActiveCount] by n, blocking if needed, as long
// as n is within [Group.Size], until there's room made available by [Group.Free]
// or [Group.FreeN] calls, and returns true if the increment was successful,
// or false if it was aborted via the provided doneChan.
//
// It returns immediately if the [Group.PendingCount] is 0, and there's
// available room for n (in [Group.ActiveCount] against the [Group.Size]).
// Otherwise, it increments the [Group.PendingCount] instead while being
// blocked, until there's room, or aborts if the provided doneChan becomes
// receive-ready if it's non-nil.
//
// If there's a room made available, it wakes up in random order with other
// blocked [Group.Reserve] and [Group.ReserveN] calls.
//
// It always returns immediately, without blocking, if the [Group.Size] is 0.
//
// It always updates the [Group.ActiveCount] and [Group.PendingCount] before
// returning.
// Such that, if it returns false, the [Group.PendingCount] will no longer
// include the provided n.
// This means that, once the provided doneChan becomes receive-ready,
// the provided n will not move to the [Group.ActiveCount], and will be
// removed from the [Group.PendingCount] before returning.
//
// It panics if n is less than or equal to 0.
//
// Note: The doneChan becomes receive-ready when it's closed or sent to.
func (g *Group) ReserveN(doneChan <-chan struct{}, n int) (reserved bool) {
if n <= 0 {
// n can't be 0 or negative, as 0 isn't a valid resource,
// and for negative values, [Group.FreeN] should be used.
panic("sema.Group: invalid group reserve N value")
}
// return and don't update any counters if the provided doneChan
// is already closed.
if doneChan != nil {
select {
case <-doneChan:
return false
default:
}
}
// if the size is 0, then there's no limitation on the Reserve
// calls, and the call should succeed right away.
size := g.size.Load()
if size == 0 {
g.counter.Add(uint64(n))
return true
}
// if the requested N is greater than the set size, then this
// Reserve call is destined to fail, so wait for the done chan,
// if it's provided, and return failure.
if n > int(size) {
if doneChan != nil {
<-doneChan
}
return false
}
// if the Reserve call can be made with the size limit, then
// the call should succeed right away.
if g.tryReserve(size, n, false) {
return true
}
// otherwise, block until matching FreeN calls are made.
return g.reserveNSlow(size, doneChan, n)
}
func (g *Group) reserveNSlow(size uint32, doneChan <-chan struct{}, reserveN int) bool {
// at this point, the blockChan shouldn't be nil.
// because we enter this method only if the size is not 0,
// and the blockChan will always be set if the size is not 0,
// unless the [Group.SetSize] method has been called concurrently
// with the [Group.ReserveN] method, and a context switch to
// this method happened before the [Group.SetSize] method has set
// the blockChan value, but right after setting the size value.
blockChan := g.blockChan.Load()
if blockChan == nil {
panic("sema.Group: concurrent Reserve calls while initializing group")
}
blockChanVal := blockChan.(chan struct{})
// wait for a suitable freed tickets, or keep looping.
for {
select {
case <-blockChanVal:
// block for a FreeN call.
reloop, ok := g.reserveNSuccessWait(size, doneChan, reserveN, blockChanVal)
if ok {
return true
}
if !reloop {
return false
}
case <-doneChan:
// or abort on demand.
g.reserveNAbortWait(blockChanVal, reserveN)
return false
}
}
}
func (g *Group) reserveNSuccessWait(
size uint32,
doneChan <-chan struct{},
reserveN int,
blockChan chan struct{},
) (reloop, ok bool) {
// execute in a loop, because counterUpdate might lose the CAS.
for {
counter := g.counter.Load()
pending, active := counterParts(counter)
diffN := int(size) - int(active) - reserveN
// if we got what we need, update the counter and return true.
if diffN >= 0 {
// only move forward if the doneChan wasn't closed.
select {
case <-doneChan:
g.reserveNAbortWait(blockChan, reserveN)
return false, false
default:
}
_, ok = g.counterUpdate(counter, -reserveN, reserveN)
if !ok {
// the counter got changed, re-loop and try again.
continue
}
return false, true
}
// this call still needs more N to succeed...
// if this is the only blocked call, then return and wait for
// the next free call.
if int(pending) == reserveN {
return true, false
}
// if there are still potentially active calls, then return and wait
// for the next free call.
if int(pending)+int(active) > int(size) {
return true, false
}
// notify another blocked call to check its goal...
// note: if the code below didn't notify any reserve calls, it means
// it unblocked another call, which means the other call already
// updated the counter, so we need to re-loop and check again.
// otherwise, this call might be the last one, and we might end up
// waiting for a FreeN call that will never happen.
// only proceed if there are other pending calls.
if int(pending)-reserveN <= 0 {
continue
}
// attempt to wake up a Reserve call, or unblock another that's trying the same.
select {
case blockChan <- struct{}{}:
// woke up 1 blocked Reserve call...
return true, false
case <-blockChan:
// unblock another call.
continue
}
}
}
func (g *Group) reserveNAbortWait(blockChan chan struct{}, reserveN int) {
for ok := false; !ok; {
counter := g.counter.Load()
counter, ok = g.counterUpdate(counter, -reserveN, 0)
}
counter := g.notifyFree(blockChan)
g.notifyWait(counterParts(counter))
}
// TryReserveN tries to increment the [Group.ActiveCount] by n without
// blocking and returns true if it was successful.
// It returns false if the [Group.PendingCount] is not 0 or there's no
// room in the [Group.ActiveCount] against the [Group.Size].
//
// It always returns true if the [Group.Size] is 0.
//
// It panics if n is less than or equal to 0.
func (g *Group) TryReserveN(n int) bool {
if n <= 0 {
// n can't be 0 or negative, as 0 isn't a valid resource,
// and for negative values, [Group.FreeN] should be used.
panic("sema.Group: invalid group reserve N value")
}
size := g.size.Load()
if size == 0 {
// no limitation on the Reserve calls, so the call is allowed.
g.counter.Add(uint64(n))
return true
}
if n > int(size) {
return false
}
return g.tryReserve(size, n, true)
}
func (g *Group) tryReserve(size uint32, reserveN int, tryCall bool) bool {
for {
counter := g.counter.Load()
pending, active := counterParts(counter)
// if the group has room and doesn't have any waiters
if pending == 0 && int(active)+reserveN <= int(size) {
_, ok := g.counterUpdate(counter, 0, reserveN)
if ok {
return true
}
continue
} else if !tryCall {
_, ok := g.counterUpdate(counter, reserveN, 0)
if ok {
return false
}
continue
} else {
return false
}
}
}
// Free decrements the [Group.ActiveCount] by 1, making it available for other
// reserve calls, and attempting to wake up a single blocked [Group.Reserve]
// or [Group.ReserveN] call, in random order, if there's any blocked.
//
// If the [Group.ActiveCount] reaches zero by this call, it will wake up any
// calls blocked on [Group.Wait], and closes the [Group.WaitChan].
//
// If the [Group.ActiveCount] goes below zero by this call, it panics.
func (g *Group) Free() {
g.FreeN(1)
}
// FreeN decrements the [Group.ActiveCount] by n, making it available for other
// reserve calls, and attempting to wake up a single blocked [Group.Reserve]
// or [Group.ReserveN] call, in random order, if there's any blocked.
//
// If the [Group.ActiveCount] reaches zero by this call, it will wake up any
// calls blocked on [Group.Wait], and closes the [Group.WaitChan].
//
// If the [Group.ActiveCount] goes below zero by this call, it panics.
//
// It panics if n is less than or equal to 0.
func (g *Group) FreeN(n int) {
if n <= 0 {
// n can't be 0 or negative, as 0 isn't a valid resource,
// and for negative values, [Group.FreeN] should be used.
panic("sema.Group: invalid group free N value")
}
// check if we should wake up any blocked [Group.ReserveN] calls.
// note: if the blockChan is nil, then pending must be 0, as
// pending is used to track blocked calls, and blockChan is
// only set when the Group can have blocked calls (size != 0).
blockChan := g.blockChan.Load()
// update the counter, and read its values.
var pending uint32
var active int32
if blockChan == nil {
counter := g.counter.Add(uint64(-n))
active = int32(counter)
} else {
for ok := false; !ok; {
counter := g.counter.Load()
counter, ok = g.counterUpdate(counter, 0, -n)
_, active = counterParts(counter)
}
// notify any blocked ReserveN calls of the counter update,
// and make sure the counter values are updated.
counter := g.notifyFree(blockChan.(chan struct{}))
pending, _ = counterParts(counter)
}
// attempt to wake up any blocked [Group.Wait] calls.
g.notifyWait(pending, active)
// handle any misuse, assuming valid usage so far.
if active < 0 {
panic("sema.Group: negative group counter")
}
}
func (g *Group) notifyFree(blockChan chan struct{}) (counter uint64) {
counter = g.counter.Load()
pending, _ := counterParts(counter)
// this will avoid Free missing an opportunity to wake up a Reserve.
for int(pending) > 0 {
// attempt to wakeup a Reserve call, or update pending until it's 0.
select {
case blockChan <- struct{}{}:
// woke up 1 blocked Reserve call...
counter = g.counter.Load()
return counter
default:
// the Reserve call aborted, so re-sync.
runtime.Gosched()
counter = g.counter.Load()
pending, _ = counterParts(counter)
}
}
return counter
}
func (g *Group) notifyWait(pending uint32, active int32) {
// if there are still blocked calls, as this means we still don't need
// to wake up any [Group.Wait] calls.
// return if there are still active calls, as this means we still don't
// need to wake up any [Group.Wait] calls.
if pending > 0 || active > 0 {
return
}
// waitChan will be nil only if no Wait calls have been made.
waitChan := g.waitChan.Load()
if waitChan == nil || waitChan == nilChan {
return
}
if !g.waitChan.CompareAndSwap(waitChan, nilChan) {
// if it didn't succeed, return, as it means that this value is
// an waitChan value, and a newer once has been set, after the old
// one got already closed.
return
}
close(waitChan.(chan struct{}))
}
// Wait blocks until the [Group] reaches zero.
// It waits only for [Group.Reserve] and [Group.ReserveN] calls that are
// either made before this function is called, or made while the [Group] is non-zero.
// A zero [Group] means both [Group.ActiveCount] and [Group.PendingCount] are zero.
func (g *Group) Wait() {
waitChan := g.initWaitChan()
<-waitChan
}
// WaitChan returns a channel that will be closed once the [Group] reaches zero.
// It waits only for [Group.Reserve] and [Group.ReserveN] calls that are
// either made before this function is called, or made while the [Group] is non-zero.
// A zero [Group] means both [Group.ActiveCount] and [Group.PendingCount] are zero.
func (g *Group) WaitChan() <-chan struct{} {
return g.initWaitChan()
}
var closedChan = make(chan struct{})
var nilChan chan struct{}
func init() {
close(closedChan)
}
func (g *Group) initWaitChan() chan struct{} {
waitChan := g.waitChan.Load()
pending, active := counterParts(g.counter.Load())
if pending <= 0 && active <= 0 {
return closedChan
}
if waitChan != nil && waitChan != nilChan {
return waitChan.(chan struct{})
}
return g.initWaitChanSlow(waitChan)
}
func (g *Group) initWaitChanSlow(waitChan any) chan struct{} {
// we entered this method with waitChan either nil or nilChan,
// and it can only go to nilChan or a valid chan value.
newWaitChan := make(chan struct{})
if g.waitChan.CompareAndSwap(waitChan, newWaitChan) {
// we need to be sure that the swapped waitChan will be closed by
// a Free call, which happens only if the counter is still not 0.
pending, active := counterParts(g.counter.Load())
if pending <= 0 && active <= 0 {
return closedChan
}
return newWaitChan
}
// the waitChan value got changed...
// if it's not a nilChan, then it must be a valid chan value.
// 1- if it's nilChan, then it must have been set by a concurrent Free
// call.
// meaning that, all Reserve calls that have been done before this Wait,
// their Free calls have been executed, so we unblock these Wait calls,
// even if there's now new Reserve calls.
// 2- if it's a valid chan value, then it must have been set by another
// competing Wait call.
// note: from point no. 1 above, it means that all Reserve calls must
// be executed before their respective Wait calls.
// note: comparing only against nilChan, because it can't be nil
// again at this point.
loadedWaitChan := g.waitChan.Load()
if loadedWaitChan == nilChan {
return closedChan
}
return loadedWaitChan.(chan struct{})
}