From 99e8b6f6b6fc9125db25581807a0f8e55bcec054 Mon Sep 17 00:00:00 2001 From: Arpad Ryszka Date: Tue, 3 Mar 2026 19:50:09 +0100 Subject: [PATCH] initial implementation --- .gitignore | 1 + Makefile | 31 +++++++ algo.go | 163 ++++++++++++++++++++++++++++++++++++ algo_test.go | 44 ++++++++++ event_test.go | 1 + go.mod | 3 + lib.go | 114 +++++++++++++++++++++++++ license | 21 +++++ pool.go | 226 ++++++++++++++++++++++++++++++++++++++++++++++++++ pool_test.go | 22 +++++ readme.md | 0 11 files changed, 626 insertions(+) create mode 100644 .gitignore create mode 100644 Makefile create mode 100644 algo.go create mode 100644 algo_test.go create mode 100644 event_test.go create mode 100644 go.mod create mode 100644 lib.go create mode 100644 license create mode 100644 pool.go create mode 100644 pool_test.go create mode 100644 readme.md diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ebf0f2e --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.cover diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..afb28e9 --- /dev/null +++ b/Makefile @@ -0,0 +1,31 @@ +sources = $(shell find . -name "*.go") + +default: build + +build: $(sources) + go build + +fmt: $(sources) + go fmt + +check: $(sources) + go test -count 1 + +race: $(source) + go test -count 1 -race + +.cover: $(sources) + go test -count 1 -coverprofile .cover + +cover: .cover + go tool cover -func .cover + +showcover: .cover + go tool cover -html .cover + +bench: $(sources) + go test -bench Benchmark -run ^$ + +clean: + go clean + rm .cover diff --git a/algo.go b/algo.go new file mode 100644 index 0000000..d702072 --- /dev/null +++ b/algo.go @@ -0,0 +1,163 @@ +package pool + +import "time" + +const ( + // arbitrary values to be most likely out of sync with anything else: + minNightshiftTime = 729 * time.Millisecond // ~1sec + maxNightshiftTime = 59049 * time.Second // ~2/3day +) + +type adaptive struct { + lastTurnedActive time.Time + nightshiftTime time.Duration + idle bool + average int + deviation int +} + +type maxTimeout struct { + max int + to time.Duration + items []time.Time +} + +func makeAdaptiveAlgo() *adaptive { + return &adaptive{idle: true} +} + +func abs(v int) int { + if v >= 0 { + return v + } + + return 0 - v +} + +func divE(v int) int { + return (3 * v) >> 3 // 1 / 2.72 => 3 / 8 +} + +func mulE(v int) int { + return (11 * v) >> 2 // 2.72 => 11 / 4 +} + +func movingAverage(prev, currv int) int { + return prev + divE(currv-prev) +} + +func movingAbsoluteDeviation(prev, currv, currav int) int { + return prev + divE(abs(currv-currav)-prev) +} + +func targetCapacity(av, dev int) int { + return av + mulE(dev) +} + +func (a *adaptive) target(s Stats) int { + av := movingAverage(a.average, s.Active) + dev := movingAbsoluteDeviation(a.deviation, s.Active, av) + a.average = av + a.deviation = dev + return targetCapacity(av, dev) +} + +func (a *adaptive) nightshift(s Stats) time.Duration { + if a.idle && s.Active == 0 { + return a.nightshiftTime + } + + if !a.idle && s.Active > 0 { + return 0 + } + + now := time.Now() + a.idle = !a.idle + if !a.idle { + a.lastTurnedActive = now + return 0 + } + + a.nightshiftTime = now.Sub(a.lastTurnedActive) + a.nightshiftTime = (3 * a.nightshiftTime) >> 3 + if a.nightshiftTime < minNightshiftTime { + a.nightshiftTime = minNightshiftTime + } + + if a.nightshiftTime > maxNightshiftTime { + a.nightshiftTime = maxNightshiftTime + } + + return a.nightshiftTime +} + +func (a *adaptive) Target(s Stats) (int, time.Duration) { + t := a.target(s) + ns := a.nightshift(s) + return t, ns +} + +func makeMaxTimeout(max int, to time.Duration) *maxTimeout { + return &maxTimeout{ + max: max, + to: to, + } +} + +func (a *maxTimeout) Target(s Stats) (int, time.Duration) { + if a.max <= 0 && a.to <= 0 { + return s.Idle, 0 + } + + if a.max > 0 && a.to <= 0 { + t := s.Idle + if t > a.max { + t = a.max + } + + return t, 0 + } + + var zero time.Time + if len(a.items) > s.Idle { + for i := s.Idle; i < len(a.items); i++ { + a.items[i] = zero + } + + a.items = a.items[:s.Idle] + if len(a.items) == 0 { + a.items = nil + } + } + + now := time.Now() + for len(a.items) < s.Idle { + a.items = append(a.items, now) + } + + var drop int + for drop < len(a.items) && a.items[drop].Add(a.to).Before(now) { + a.items[drop] = zero + drop++ + } + + if drop > 0 { + for i := 0; i < drop; i++ { + a.items[i] = zero + } + + a.items = a.items[drop:] + } + + if len(a.items) == 0 { + a.items = nil + return 0, 0 + } + + t := len(a.items) + if a.max > 0 && t > a.max { + t = a.max + } + + return t, a.items[0].Add(a.to).Sub(now) +} diff --git a/algo_test.go b/algo_test.go new file mode 100644 index 0000000..0c4dad6 --- /dev/null +++ b/algo_test.go @@ -0,0 +1,44 @@ +package pool_test + +import ( + "code.squareroundforest.org/arpio/pool" + "time" +) + +type maxAlgo int + +type timeoutAlgo struct { + to time.Duration + items []time.Time +} + +func (a maxAlgo) Target(s pool.Stats) (int, time.Duration) { + if s.Idle <= int(a) { + return s.Idle, 0 + } + + return int(a), 0 +} + +func (a *timeoutAlgo) Target(s pool.Stats) (int, time.Duration) { + now := time.Now() + for len(a.items) < s.Idle { + a.items = append(a.items, now) + } + + t := s.Idle + for len(a.items) > 0 && a.items[0].Add(a.to).Before(now) { + t-- + a.items = a.items[1:] + } + + if t < 0 { + t = 0 + } + + if len(a.items) == 0 { + return t, 0 + } + + return t, a.items[0].Add(a.to).Sub(now) +} diff --git a/event_test.go b/event_test.go new file mode 100644 index 0000000..2c4370a --- /dev/null +++ b/event_test.go @@ -0,0 +1 @@ +package pool_test diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..51d8013 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module code.squareroundforest.org/arpio/pool + +go 1.25.6 diff --git a/lib.go b/lib.go new file mode 100644 index 0000000..df0b57f --- /dev/null +++ b/lib.go @@ -0,0 +1,114 @@ +// pool with support for: +// - finalizing items +// - monitoring: events and/or stats +// - adaptive shrinking algorithm +// - custom shrinking algorithms +package pool + +import ( + "errors" + "time" +) + +type Stats struct { + Idle int + Active int + Get int + Put int + Alloc int + Free int +} + +type EventType int + +const ( + None EventType = 0 + GetOperation EventType = 1 << iota + PutOperation + AllocateOperation + FreeOperation + AllocateError +) + +type Event struct { + Type EventType + Stats Stats +} + +type Algo interface { + + // always called + // desired idle items + // implementations should consider the cost of freeing the stored resources + // must support being called from a goroutine other than it was created in + // a single pool instance only calls it from a single goroutine at a time + // items need to be allocated always by calling Get + // second return argument for requested next check + Target(Stats) (int, time.Duration) +} + +type Options struct { + + // events can be dropped if the consumer is blocked + Events chan<- Event + + EventMask EventType + Algo Algo +} + +type Pool[R any] struct { + pool pool[R] +} + +var ErrEmptyPool = errors.New("empty pool") + +// zero-config +func Adaptive() Algo { + return makeAdaptiveAlgo() +} + +// enfoces a max pool size and a timeout for the items +// when adding items to the pool via Put that were not fetched via Get, there can discrepancies occur in which +// items get timed out, but the general pool limitations get still consistently enforced eventually +func MaxTimeout(max int, to time.Duration) Algo { + return makeMaxTimeout(max, to) +} + +// like MaxTimeout but without enforcing timeouts +func Max(max int) Algo { + return makeMaxTimeout(max, 0) +} + +// like MaxTimeout but without enforcing max pool size +func Timeout(to time.Duration) Algo { + return makeMaxTimeout(0, to) +} + +// the user code can decide not to put back items to the pool +func NoShrink() Algo { + return makeMaxTimeout(0, 0) +} + +// alloc and free need to support calls from goroutines other than they were created in +// a single pool instance only calls them from a single goroutine at a time +// free happens synchronously, user code may execute it in the background, in which case it is the user code's +// responsibility to ensure that free is fully carried out before the application exits, if that's necessary +func Make[R any](alloc func() (R, error), free func(R), o Options) Pool[R] { + return Pool[R]{pool: makePool(alloc, free, o)} +} + +func (p Pool[R]) Get() (R, error) { + return p.pool.get() +} + +func (p Pool[R]) Put(i R) { + p.pool.put(i) +} + +func (p Pool[R]) Stats() Stats { + return p.pool.stats() +} + +func (p Pool[R]) Free() { + p.pool.freePool() +} diff --git a/license b/license new file mode 100644 index 0000000..00e6290 --- /dev/null +++ b/license @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 Arpad Ryszka + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/pool.go b/pool.go new file mode 100644 index 0000000..dbfa433 --- /dev/null +++ b/pool.go @@ -0,0 +1,226 @@ +package pool + +import "time" + +type state[R any] struct { + items []R + stats Stats + forcedCheckPending bool +} + +type pool[R any] struct { + state chan state[R] + alloc func() (R, error) + free func(r R) + options Options +} + +func makePool[R any](alloc func() (R, error), free func(r R), o Options) pool[R] { + if o.Algo == nil { + o.Algo = Adaptive() + } + + s := make(chan state[R], 1) + s <- state[R]{} + return pool[R]{ + state: s, + alloc: alloc, + free: free, + options: o, + } +} + +func (p pool[R]) stats() Stats { + s := <-p.state + defer func() { + p.state <- s + }() + + return s.stats +} + +func (p pool[R]) sendEvent(e EventType, s Stats) { + if p.options.Events == nil { + return + } + + if p.options.EventMask&e == 0 { + return + } + + ev := Event{ + Type: e, + Stats: s, + } + + select { + case p.options.Events <- ev: + default: + } +} + +func (p pool[R]) get() (R, error) { + s := <-p.state + defer func() { + p.state <- s + }() + + var ( + r R + event EventType + err error + ) + + event |= GetOperation + s.stats.Get++ + switch { + case len(s.items) == 0 && p.alloc == nil: + s.stats.Alloc++ + event |= AllocateOperation + event |= AllocateError + err = ErrEmptyPool + case len(s.items) == 0: + s.stats.Alloc++ + event |= AllocateOperation + r, err = p.alloc() + if err != nil { + event |= AllocateError + } else { + s.stats.Active++ + } + default: + r, s.items = s.items[len(s.items)-1], s.items[:len(s.items)-1] + if len(s.items) == 0 { + s.items = nil + } + + s.stats.Active++ + s.stats.Idle = len(s.items) + } + + p.sendEvent(event, s.stats) + return r, err +} + +func (p pool[R]) put(r R) { + s := <-p.state + defer func() { + p.state <- s + }() + + var event EventType + event |= PutOperation + s.stats.Put++ + s.stats.Idle++ + + // one may put in items that were allocated outside of the pool: + if s.stats.Active > 0 { + s.stats.Active-- + } + + t, f := p.options.Algo.Target(s.stats) + switch { + case t > len(s.items): + s.items = append(s.items, r) + default: + event |= FreeOperation + s = p.freeItems(s, t, r) + } + + // fix provisioned idle count: + s.stats.Idle = len(s.items) + p.sendEvent(event, s.stats) + if f > 0 { + s = p.forcedCheck(s, f) + } +} + +func (p pool[R]) forcedCheck(s state[R], timeout time.Duration) state[R] { + if s.forcedCheckPending { + return s + } + + s.forcedCheckPending = true + go func(to time.Duration) { + <-time.After(to) + p.freeIdle() + }(timeout) + + return s +} + +func (p pool[R]) freeItems(s state[R], target int, additional ...R) state[R] { + if len(s.items)+len(additional) <= target { + return s + } + + s.stats.Free += len(additional) + if p.free == nil && len(s.items) <= target { + return s + } + + var f []R + if p.free != nil { + f = additional + if len(s.items) > target { + f = append(f, s.items[:len(s.items)-target]...) + } + } + + if len(s.items) > target { + s.stats.Free += len(s.items) - target + + var zero R + for i := 0; i < len(s.items)-target; i++ { + s.items[i] = zero + } + + s.items = s.items[len(s.items)-target:] + if len(s.items) == 0 { + s.items = nil + } + } + + if p.free == nil { + return s + } + + for _, fi := range f { + p.free(fi) + } + + return s +} + +func (p pool[R]) freeIdle() { + s := <-p.state + defer func() { + p.state <- s + }() + + s.forcedCheckPending = false + t, f := p.options.Algo.Target(s.stats) + prev := s.stats.Free + s = p.freeItems(s, t) + if s.stats.Free > prev { + p.sendEvent(FreeOperation, s.stats) + } + + if f > 0 { + s = p.forcedCheck(s, f) + } +} + +func (p pool[R]) freePool() { + s := <-p.state + defer func() { + p.state <- s + }() + + s.stats.Idle = 0 + prev := s.stats.Free + s = p.freeItems(s, 0) + if s.stats.Free > prev { + p.sendEvent(FreeOperation, s.stats) + } +} diff --git a/pool_test.go b/pool_test.go new file mode 100644 index 0000000..8671a87 --- /dev/null +++ b/pool_test.go @@ -0,0 +1,22 @@ +package pool_test + +import "testing" + +func TestPool(t *testing.T) { + // initial stats + // get empty + // get pooled own + // get pooled foreign + // get own no alloc + // get foreign no alloc + // put own + // put foreign empty + // put foreign not empty + // release on put no free + // release on put with free + // release all no free + // release all with free + // release on timeout no free + // release on timeout with free + // use default algo +} diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..e69de29