package bottleneck
import (
"time"
)
// Equalizer typed Bottleneck similar to ticker queue.
// Provides bottle neck which pass 1 and no more then each {time.Second/RPS} second.
// RPS 1000 means pass 1 each 1 millisecond.
type Equalizer struct {
lastCheckout int64
diffDuration int64
burst int
}
// NewEqualizer returns equalized implementation of Bottleneck.
func NewEqualizer(rps, burst int) (*Equalizer, error) {
if rps <= 0 {
return nil, ErrRPSNegativeOrZero
}
if burst < 0 {
return nil, ErrBurstNegative
}
bottleneck := &Equalizer{
lastCheckout: time.Now().Add(-1 * time.Second).UnixNano(),
diffDuration: int64(time.Second) / int64(rps),
burst: burst,
}
return bottleneck, nil
}
// BreakThrough passes through bottle neck. Waits and pass if it busy.
func (bottleneck *Equalizer) BreakThrough() {
newTime := bottleneck.lastCheckout + bottleneck.diffDuration
now := time.Now().UnixNano()
time.Sleep(time.Duration(newTime - now))
bottleneck.lastCheckout = time.Now().UnixNano()
}
// MaxRate returns max rate for both simultaneously processed and in queue requests.
func (bottleneck *Equalizer) MaxRate() int {
return int(int64(time.Second)/bottleneck.diffDuration) + bottleneck.burst
}
package bottleneck
// LeakyBucket typed Bottleneck implements leaky bucket algorithm.
type LeakyBucket struct {
Regular
}
// NewLeakyBucket returns leaky bucket algo implementation of Bottleneck.
func NewLeakyBucket(size int) (*LeakyBucket, error) {
bn, err := NewRegular(size, 0)
if err != nil {
return nil, err
}
ret := LeakyBucket{*bn}
return &ret, nil
}
package bottleneck
import (
"time"
)
// Regular typed Bottleneck similar to attractions queue.
// Provides uniform distribution of requests to stabilize CPU load.
type Regular struct {
data []int64
pos int
burst int
}
// NewRegular returns regular implementation of bottle neck.
func NewRegular(rps, burst int) (*Regular, error) {
if rps <= 0 {
return nil, ErrRPSNegativeOrZero
}
if burst < 0 {
return nil, ErrBurstNegative
}
bottleneck := Regular{
data: make([]int64, rps),
pos: 0,
burst: burst,
}
someTimeBefore := time.Now().Add(-1 * time.Second).UnixNano()
for i := range bottleneck.data {
bottleneck.data[i] = someTimeBefore
}
return &bottleneck, nil
}
// BreakThrough passes through bottle neck. Waits and pass if it busy.
func (bottleneck *Regular) BreakThrough() {
nextAvailable := bottleneck.data[bottleneck.pos] + time.Second.Nanoseconds()
now := time.Now().UnixNano()
time.Sleep(time.Duration(nextAvailable - now))
bottleneck.data[bottleneck.pos] = time.Now().UnixNano()
bottleneck.pos = (bottleneck.pos + 1) % len(bottleneck.data)
}
// MaxRate returns max rate for both simultaneously processed and in queue requests.
func (bottleneck *Regular) MaxRate() int {
return len(bottleneck.data) + bottleneck.burst
}
package bottleneck
// TokenBucket typed Bottleneck similar to ticker queue.
// Provides bottle neck which pass 1 and no more then each {time.Second/RPS} second.
// RPS 1000 means pass 1 each 1 millisecond.
type TokenBucket struct {
Equalizer
}
// NewTokenBucket returns token bucket algo implementation of Bottleneck.
func NewTokenBucket(rps, burst int) (*TokenBucket, error) {
bn, err := NewEqualizer(rps, burst)
if err != nil {
return nil, err
}
ret := TokenBucket{*bn}
return &ret, nil
}
package bottleneck
import (
"time"
)
// Valve typed Bottleneck similar to heart valve.
// Holds all together and pass them at one time.
type Valve struct {
lastCheckout int64
currentRate int
rps int
burst int
}
// NewValve returns valve implementation of Bottleneck.
func NewValve(rps, burst int) (*Valve, error) {
if rps <= 0 {
return nil, ErrRPSNegativeOrZero
}
if burst < 0 {
return nil, ErrBurstNegative
}
bottleneck := &Valve{
lastCheckout: time.Now().UnixNano(),
currentRate: 0,
rps: rps,
burst: burst,
}
return bottleneck, nil
}
// BreakThrough passes through bottle neck. Waits and pass if it busy.
func (bottleneck *Valve) BreakThrough() {
if bottleneck.currentRate < bottleneck.rps {
bottleneck.currentRate++
return
}
now := time.Now().UnixNano()
if now-bottleneck.lastCheckout < int64(time.Second) {
time.Sleep(time.Duration(bottleneck.lastCheckout + int64(time.Second) - now))
}
bottleneck.currentRate = 1
bottleneck.lastCheckout = time.Now().UnixNano()
}
// MaxRate returns max rate for both simultaneously processed and in queue requests.
func (bottleneck *Valve) MaxRate() int {
return bottleneck.rps + bottleneck.burst
}
// Package ratelimiter provides an easy to use rate limiter with context.Context support,
// that can be used to limit the rate of arbitrary things mainly in your http middleware.
// See https://en.wikipedia.org/wiki/Rate_limiting .
package ratelimiter
import (
"context"
"runtime"
"sync/atomic"
)
type (
// Bottleneck represents gate keeper for rate limiter.
// See bottleneck subpackage for examples.
Bottleneck interface {
BreakThrough()
MaxRate() int
}
// RateLimiter implement rate limiter with wait option.
RateLimiter struct {
notify chan struct{}
curRate int64
maxRate int64
}
)
// Take returns true until rate and burst reached, false overwise.
// Requests over rate (rate < i <= rate+burst) holds in queue before next spot released.
// On context cancel or deadline expires request leaves the queue fast and returns false.
func (ratelimiter *RateLimiter) Take(ctx context.Context) bool {
defer atomic.AddInt64(&ratelimiter.curRate, -1)
if atomic.AddInt64(&ratelimiter.curRate, 1) > ratelimiter.maxRate {
return false
}
select {
case <-ctx.Done():
return false
// panic by writes to closed channel impossible, because
// channel closed by runtime.SetFinalizer which can be invoked only if `ratelimiter` link lost
case ratelimiter.notify <- struct{}{}:
return true
}
}
// New returns an initialized RateLimiter with provided Bottleneck.
func New(bottleneck Bottleneck) *RateLimiter {
if bottleneck == nil {
panic("ratelimiter: bottleneck argument cannot be nil")
}
notify := make(chan struct{})
ratelimiter := &RateLimiter{
curRate: int64(0),
maxRate: int64(bottleneck.MaxRate()),
notify: notify,
}
runtime.SetFinalizer(ratelimiter, func(_ *RateLimiter) { close(notify) })
go func() {
for {
bottleneck.BreakThrough()
if _, ok := <-notify; !ok {
return
}
}
}()
return ratelimiter
}