1
0
pool/pool.go

257 lines
4.1 KiB
Go
Raw Normal View History

2026-03-03 19:50:09 +01:00
package pool
2026-03-05 12:34:35 +01:00
import (
"code.squareroundforest.org/arpio/times"
"time"
)
2026-03-03 19:50:09 +01:00
type state[R any] struct {
items []R
stats Stats
forcedCheckPending bool
}
type pool[R any] struct {
state chan state[R]
alloc func() (R, error)
free func(r R)
options Options
}
func makePool[R any](alloc func() (R, error), free func(r R), o Options) pool[R] {
if o.Algo == nil {
o.Algo = Adaptive()
}
2026-03-05 12:34:35 +01:00
if o.Clock == nil {
o.Clock = times.Sys()
}
if sc, ok := o.Algo.(interface{ setClock(times.Clock) }); ok {
sc.setClock(o.Clock)
}
2026-03-03 19:50:09 +01:00
s := make(chan state[R], 1)
s <- state[R]{}
return pool[R]{
state: s,
alloc: alloc,
free: free,
options: o,
}
}
func (p pool[R]) stats() Stats {
s := <-p.state
defer func() {
p.state <- s
}()
return s.stats
}
func (p pool[R]) sendEvent(e EventType, s Stats) {
if p.options.Events == nil {
return
}
if p.options.EventMask&e == 0 {
return
}
ev := Event{
Type: e,
Stats: s,
}
select {
case p.options.Events <- ev:
default:
}
}
func (p pool[R]) get() (R, error) {
s := <-p.state
defer func() {
p.state <- s
}()
var (
r R
event EventType
err error
)
event |= GetOperation
s.stats.Get++
switch {
case len(s.items) == 0 && p.alloc == nil:
s.stats.Alloc++
event |= AllocateOperation
event |= AllocateError
2026-03-05 12:34:35 +01:00
err = ErrEmpty
2026-03-03 19:50:09 +01:00
case len(s.items) == 0:
s.stats.Alloc++
event |= AllocateOperation
r, err = p.alloc()
if err != nil {
event |= AllocateError
} else {
s.stats.Active++
}
default:
r, s.items = s.items[len(s.items)-1], s.items[:len(s.items)-1]
if len(s.items) == 0 {
s.items = nil
}
s.stats.Active++
s.stats.Idle = len(s.items)
}
p.sendEvent(event, s.stats)
return r, err
}
func (p pool[R]) put(r R) {
s := <-p.state
defer func() {
p.state <- s
}()
var event EventType
event |= PutOperation
s.stats.Put++
s.stats.Idle++
// one may put in items that were allocated outside of the pool:
if s.stats.Active > 0 {
s.stats.Active--
}
t, f := p.options.Algo.Target(s.stats)
switch {
case t > len(s.items):
s.items = append(s.items, r)
default:
event |= FreeOperation
s = p.freeItems(s, t, r)
}
// fix provisioned idle count:
s.stats.Idle = len(s.items)
p.sendEvent(event, s.stats)
if f > 0 {
s = p.forcedCheck(s, f)
}
}
2026-03-14 19:03:18 +01:00
func (p pool[R]) load(i []R) {
s := <-p.state
defer func() {
p.state <- s
}()
s.items = append(s.items, i...)
s.stats.Idle = len(s.items)
2026-03-14 21:33:59 +01:00
s.stats.Load += len(i)
2026-03-14 19:03:18 +01:00
p.options.Algo.Load(len(i))
2026-03-14 21:33:59 +01:00
event := LoadOperation
p.sendEvent(event, s.stats)
2026-03-14 19:03:18 +01:00
}
2026-03-03 19:50:09 +01:00
func (p pool[R]) forcedCheck(s state[R], timeout time.Duration) state[R] {
if s.forcedCheckPending {
return s
}
s.forcedCheckPending = true
go func(to time.Duration) {
2026-03-14 19:03:18 +01:00
c := p.options.Clock.After(to)
p.options.TestBus.Signal("background-job-waiting")
<-c
2026-03-03 19:50:09 +01:00
p.freeIdle()
}(timeout)
return s
}
func (p pool[R]) freeItems(s state[R], target int, additional ...R) state[R] {
if len(s.items)+len(additional) <= target {
return s
}
s.stats.Free += len(additional)
if p.free == nil && len(s.items) <= target {
return s
}
var f []R
if p.free != nil {
f = additional
if len(s.items) > target {
f = append(f, s.items[:len(s.items)-target]...)
}
}
if len(s.items) > target {
s.stats.Free += len(s.items) - target
var zero R
for i := 0; i < len(s.items)-target; i++ {
s.items[i] = zero
}
s.items = s.items[len(s.items)-target:]
if len(s.items) == 0 {
s.items = nil
}
}
if p.free == nil {
return s
}
for _, fi := range f {
p.free(fi)
}
return s
}
func (p pool[R]) freeIdle() {
s := <-p.state
defer func() {
p.state <- s
}()
s.forcedCheckPending = false
t, f := p.options.Algo.Target(s.stats)
prev := s.stats.Free
s = p.freeItems(s, t)
2026-03-04 15:03:29 +01:00
s.stats.Idle = len(s.items)
2026-03-03 19:50:09 +01:00
if s.stats.Free > prev {
p.sendEvent(FreeOperation, s.stats)
}
if f > 0 {
s = p.forcedCheck(s, f)
}
2026-03-05 12:34:35 +01:00
p.options.TestBus.Signal("free-idle-done")
2026-03-03 19:50:09 +01:00
}
func (p pool[R]) freePool() {
s := <-p.state
defer func() {
p.state <- s
}()
s.stats.Idle = 0
prev := s.stats.Free
s = p.freeItems(s, 0)
if s.stats.Free > prev {
p.sendEvent(FreeOperation, s.stats)
}
}