1
0
This commit is contained in:
Arpad Ryszka 2026-03-05 12:34:35 +01:00
parent 9ee50d312b
commit 8495fcf619
9 changed files with 543 additions and 25 deletions

View File

@ -1,6 +1,9 @@
package pool package pool
import "time" import (
"code.squareroundforest.org/arpio/times"
"time"
)
const ( const (
// arbitrary values to be most likely out of sync with anything else: // arbitrary values to be most likely out of sync with anything else:
@ -9,6 +12,7 @@ const (
) )
type adaptive struct { type adaptive struct {
clock times.Clock
activeTime time.Time activeTime time.Time
nsTO time.Duration nsTO time.Duration
idle bool idle bool
@ -20,6 +24,10 @@ func makeAdaptiveAlgo() *adaptive {
return &adaptive{idle: true} return &adaptive{idle: true}
} }
func (a *adaptive) setClock(c times.Clock) {
a.clock = c
}
func abs(v int) int { func abs(v int) int {
if v >= 0 { if v >= 0 {
return v return v
@ -65,7 +73,7 @@ func (a *adaptive) nightshift(s Stats) time.Duration {
return 0 return 0
} }
now := time.Now() now := a.clock.Now()
a.idle = !a.idle a.idle = !a.idle
if !a.idle { if !a.idle {
a.activeTime = now a.activeTime = now

6
adaptive_test.go Normal file
View File

@ -0,0 +1,6 @@
package pool_test
import "testing"
func TestAdaptive(t *testing.T) {
}

View File

@ -1 +1,125 @@
package pool_test package pool_test
import (
"code.squareroundforest.org/arpio/pool"
"errors"
"testing"
)
func TestEvent(t *testing.T) {
t.Run("get put allocate free", func(t *testing.T) {
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
e := make(chan pool.Event, 7)
o := pool.Options{
Algo: pool.Max(2),
Events: e,
EventMask: pool.AllEvents,
}
p := pool.Make(alloc, nil, o)
var b [][]byte
for i := 0; i < 3; i++ {
bi, err := p.Get()
if err != nil {
t.Fatal(err)
}
b = append(b, bi)
}
for _, bi := range b {
p.Put(bi)
}
for i := 0; i < 3; i++ {
ev := <-e
if ev.Type != pool.GetOperation|pool.AllocateOperation {
t.Fatal(ev)
}
}
for i := 0; i < 2; i++ {
ev := <-e
if ev.Type != pool.PutOperation {
t.Fatal(ev)
}
}
ev := <-e
if ev.Type != pool.PutOperation|pool.FreeOperation {
t.Fatal(ev)
}
})
t.Run("allocate error", func(t *testing.T) {
alloc := func() ([]byte, error) { return nil, errTest }
e := make(chan pool.Event, 1)
o := pool.Options{
Algo: pool.NoShrink(),
Events: e,
EventMask: pool.AllEvents,
}
p := pool.Make(alloc, nil, o)
_, err := p.Get()
if !errors.Is(err, errTest) {
t.Fatal(err)
}
ev := <-e
if ev.Type != pool.GetOperation|pool.AllocateOperation|pool.AllocateError {
t.Fatal(ev)
}
})
t.Run("drop", func(t *testing.T) {
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
e := make(chan pool.Event)
o := pool.Options{
Algo: pool.NoShrink(),
Events: e,
EventMask: pool.AllEvents,
}
p := pool.Make(alloc, nil, o)
_, err := p.Get()
if err != nil {
t.Fatal(err)
}
select {
case <-e:
t.Fatal("unexpected event")
default:
}
})
t.Run("mask", func(t *testing.T) {
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
e := make(chan pool.Event, 1)
o := pool.Options{
Algo: pool.NoShrink(),
Events: e,
EventMask: pool.GetOperation,
}
p := pool.Make(alloc, nil, o)
b, err := p.Get()
if err != nil {
t.Fatal(err)
}
p.Put(b)
ev := <-e
if ev.Type != pool.AllocateOperation|pool.GetOperation {
t.Fatal(ev)
}
select {
case <-e:
t.Fatal("unexpected event")
default:
}
})
}

7
go.mod
View File

@ -1,3 +1,10 @@
module code.squareroundforest.org/arpio/pool module code.squareroundforest.org/arpio/pool
go 1.25.6 go 1.25.6
require (
code.squareroundforest.org/arpio/syncbus v0.0.0-20260222175441-f7da66ad4045
code.squareroundforest.org/arpio/times v0.0.0-20260304202452-0bdc043a8aa6
)
replace code.squareroundforest.org/arpio/times => ../times

2
go.sum Normal file
View File

@ -0,0 +1,2 @@
code.squareroundforest.org/arpio/syncbus v0.0.0-20260222175441-f7da66ad4045 h1:eSg4fnu8x6/7B6aem2ibxHX8SxFs9Mo2n2etWg4eGFY=
code.squareroundforest.org/arpio/syncbus v0.0.0-20260222175441-f7da66ad4045/go.mod h1:xZqPFR30EESkog+JzR40zDKVoBc7zmrV1X+Wo0v86p4=

55
lib.go
View File

@ -6,7 +6,11 @@
package pool package pool
import ( import (
"code.squareroundforest.org/arpio/syncbus"
"code.squareroundforest.org/arpio/times"
"errors" "errors"
"fmt"
"strings"
"time" "time"
) )
@ -28,6 +32,8 @@ const (
AllocateOperation AllocateOperation
FreeOperation FreeOperation
AllocateError AllocateError
AllEvents = GetOperation | PutOperation | AllocateOperation | FreeOperation | AllocateError
) )
type Event struct { type Event struct {
@ -54,13 +60,60 @@ type Options struct {
EventMask EventType EventMask EventType
Algo Algo Algo Algo
Clock times.Clock
TestBus *syncbus.SyncBus
} }
type Pool[R any] struct { type Pool[R any] struct {
pool pool[R] pool pool[R]
} }
var ErrEmptyPool = errors.New("empty pool") var ErrEmpty = errors.New("empty pool")
func (et EventType) String() string {
var s []string
if et&GetOperation != 0 {
s = append(s, "get")
}
if et&PutOperation != 0 {
s = append(s, "put")
}
if et&AllocateOperation != 0 {
s = append(s, "allocate")
}
if et&FreeOperation != 0 {
s = append(s, "free")
}
if et&AllocateError != 0 {
s = append(s, "allocerr")
}
if len(s) == 0 {
return "none"
}
return strings.Join(s, "|")
}
func (ev Event) String() string {
return fmt.Sprintf("%v; %v", ev.Type, ev.Stats)
}
func (s Stats) String() string {
return fmt.Sprintf(
"idle: %d, active: %d, get: %d, put: %d, alloc: %d, free: %d",
s.Idle,
s.Active,
s.Get,
s.Put,
s.Alloc,
s.Free,
)
}
// zero-config // zero-config
func Adaptive() Algo { func Adaptive() Algo {

View File

@ -1,8 +1,12 @@
package pool package pool
import "time" import (
"code.squareroundforest.org/arpio/times"
"time"
)
type maxTimeout struct { type maxTimeout struct {
clock times.Clock
max int max int
to time.Duration to time.Duration
items []time.Time items []time.Time
@ -16,6 +20,10 @@ func makeMaxTimeout(max int, to time.Duration) *maxTimeout {
} }
} }
func (a *maxTimeout) setClock(c times.Clock) {
a.clock = c
}
func (a *maxTimeout) Target(s Stats) (int, time.Duration) { func (a *maxTimeout) Target(s Stats) (int, time.Duration) {
if a.max <= 0 && a.to <= 0 { if a.max <= 0 && a.to <= 0 {
return s.Idle, 0 return s.Idle, 0
@ -42,7 +50,7 @@ func (a *maxTimeout) Target(s Stats) (int, time.Duration) {
} }
} }
now := time.Now() now := a.clock.Now()
for len(a.items) < s.Idle { for len(a.items) < s.Idle {
a.items = append(a.items, now) a.items = append(a.items, now)
} }

20
pool.go
View File

@ -1,6 +1,9 @@
package pool package pool
import "time" import (
"code.squareroundforest.org/arpio/times"
"time"
)
type state[R any] struct { type state[R any] struct {
items []R items []R
@ -20,6 +23,14 @@ func makePool[R any](alloc func() (R, error), free func(r R), o Options) pool[R]
o.Algo = Adaptive() o.Algo = Adaptive()
} }
if o.Clock == nil {
o.Clock = times.Sys()
}
if sc, ok := o.Algo.(interface{ setClock(times.Clock) }); ok {
sc.setClock(o.Clock)
}
s := make(chan state[R], 1) s := make(chan state[R], 1)
s <- state[R]{} s <- state[R]{}
return pool[R]{ return pool[R]{
@ -78,7 +89,7 @@ func (p pool[R]) get() (R, error) {
s.stats.Alloc++ s.stats.Alloc++
event |= AllocateOperation event |= AllocateOperation
event |= AllocateError event |= AllocateError
err = ErrEmptyPool err = ErrEmpty
case len(s.items) == 0: case len(s.items) == 0:
s.stats.Alloc++ s.stats.Alloc++
event |= AllocateOperation event |= AllocateOperation
@ -142,7 +153,8 @@ func (p pool[R]) forcedCheck(s state[R], timeout time.Duration) state[R] {
s.forcedCheckPending = true s.forcedCheckPending = true
go func(to time.Duration) { go func(to time.Duration) {
<-time.After(to) p.options.TestBus.Signal("background-job-running")
<-p.options.Clock.After(to)
p.freeIdle() p.freeIdle()
}(timeout) }(timeout)
@ -210,6 +222,8 @@ func (p pool[R]) freeIdle() {
if f > 0 { if f > 0 {
s = p.forcedCheck(s, f) s = p.forcedCheck(s, f)
} }
p.options.TestBus.Signal("free-idle-done")
} }
func (p pool[R]) freePool() { func (p pool[R]) freePool() {

View File

@ -1,22 +1,318 @@
package pool_test package pool_test
import "testing" import (
"code.squareroundforest.org/arpio/pool"
"code.squareroundforest.org/arpio/syncbus"
"code.squareroundforest.org/arpio/times"
"errors"
"testing"
"time"
)
var errTest = errors.New("test error")
func TestPool(t *testing.T) { func TestPool(t *testing.T) {
// initial stats t.Run("initial stats", func(t *testing.T) {
// get empty alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
// get pooled own p := pool.Make(alloc, nil, pool.Options{Algo: pool.NoShrink()})
// get pooled foreign s := p.Stats()
// get own no alloc e := pool.Stats{}
// get foreign no alloc if s != e {
// put own t.Fatal(s)
// put foreign empty }
// put foreign not empty })
// release on put no free
// release on put with free t.Run("get when empty", func(t *testing.T) {
// release all no free alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
// release all with free p := pool.Make(alloc, nil, pool.Options{Algo: pool.NoShrink()})
// release on timeout no free b, err := p.Get()
// release on timeout with free if err != nil {
// use default algo t.Fatal(err)
}
if len(b) != 1<<9 {
t.Fatal(len(b))
}
s := p.Stats()
e := pool.Stats{Alloc: 1, Get: 1, Active: 1}
if s != e {
t.Fatal(s)
}
})
t.Run("get pooled own", func(t *testing.T) {
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
p := pool.Make(alloc, nil, pool.Options{Algo: pool.NoShrink()})
b, err := p.Get()
if err != nil {
t.Fatal(err)
}
if len(b) != 1<<9 {
t.Fatal(len(b))
}
p.Put(b)
b, err = p.Get()
if err != nil {
t.Fatal(err)
}
if len(b) != 1<<9 {
t.Fatal(len(b))
}
s := p.Stats()
e := pool.Stats{Alloc: 1, Get: 2, Put: 1, Active: 1}
if s != e {
t.Fatal(s)
}
})
t.Run("get pooled foreign", func(t *testing.T) {
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
p := pool.Make(alloc, nil, pool.Options{Algo: pool.NoShrink()})
p.Put(make([]byte, 1<<6))
b, err := p.Get()
if err != nil {
t.Fatal(err)
}
if len(b) != 1<<6 {
t.Fatal(len(b))
}
s := p.Stats()
e := pool.Stats{Get: 1, Put: 1, Active: 1}
if s != e {
t.Fatal(s)
}
})
t.Run("get own alloc not available", func(t *testing.T) {
p := pool.Make[[]byte](nil, nil, pool.Options{Algo: pool.NoShrink()})
_, err := p.Get()
if !errors.Is(err, pool.ErrEmpty) {
t.Fatal(err)
}
s := p.Stats()
e := pool.Stats{Alloc: 1, Get: 1}
if s != e {
t.Fatal(s)
}
})
t.Run("get foreign no alloc", func(t *testing.T) {
p := pool.Make[[]byte](nil, nil, pool.Options{Algo: pool.NoShrink()})
p.Put(make([]byte, 1<<6))
b, err := p.Get()
if err != nil {
t.Fatal(err)
}
if len(b) != 1<<6 {
t.Fatal(len(b))
}
s := p.Stats()
e := pool.Stats{Get: 1, Put: 1, Active: 1}
if s != e {
t.Fatal(s)
}
})
t.Run("allocation error", func(t *testing.T) {
alloc := func() ([]byte, error) { return nil, errTest }
p := pool.Make(alloc, nil, pool.Options{Algo: pool.NoShrink()})
_, err := p.Get()
if !errors.Is(err, errTest) {
t.Fatal(err)
}
})
t.Run("put foreign not empty", func(t *testing.T) {
p := pool.Make[[]byte](nil, nil, pool.Options{Algo: pool.NoShrink()})
p.Put(make([]byte, 1<<6))
p.Put(make([]byte, 1<<6))
b, err := p.Get()
if err != nil {
t.Fatal(err)
}
if len(b) != 1<<6 {
t.Fatal(len(b))
}
s := p.Stats()
e := pool.Stats{Get: 1, Put: 2, Active: 1, Idle: 1}
if s != e {
t.Fatal(s)
}
})
t.Run("release on put no free", func(t *testing.T) {
p := pool.Make[[]byte](nil, nil, pool.Options{Algo: pool.Max(2)})
p.Put(make([]byte, 1<<9))
p.Put(make([]byte, 1<<9))
p.Put(make([]byte, 1<<9))
s := p.Stats()
e := pool.Stats{Put: 3, Idle: 2, Free: 1}
if s != e {
t.Fatal(s)
}
})
t.Run("release on put with free", func(t *testing.T) {
var freeCount int
f := func([]byte) { freeCount++ }
p := pool.Make(nil, f, pool.Options{Algo: pool.Max(2)})
p.Put(make([]byte, 1<<9))
p.Put(make([]byte, 1<<9))
p.Put(make([]byte, 1<<9))
if freeCount != 1 {
t.Fatal(freeCount)
}
s := p.Stats()
e := pool.Stats{Put: 3, Idle: 2, Free: 1}
if s != e {
t.Fatal(s)
}
})
t.Run("release all no free", func(t *testing.T) {
p := pool.Make[[]byte](nil, nil, pool.Options{Algo: pool.NoShrink()})
p.Put(make([]byte, 1<<9))
p.Put(make([]byte, 1<<9))
p.Put(make([]byte, 1<<9))
p.Free()
s := p.Stats()
e := pool.Stats{Put: 3, Idle: 0, Free: 3}
if s != e {
t.Fatal(s)
}
})
t.Run("release all with free", func(t *testing.T) {
var freeCount int
f := func([]byte) { freeCount++ }
p := pool.Make[[]byte](nil, f, pool.Options{Algo: pool.NoShrink()})
p.Put(make([]byte, 1<<9))
p.Put(make([]byte, 1<<9))
p.Put(make([]byte, 1<<9))
p.Free()
if freeCount != 3 {
t.Fatal(freeCount)
}
s := p.Stats()
e := pool.Stats{Put: 3, Idle: 0, Free: 3}
if s != e {
t.Fatal(s)
}
})
t.Run("release all when empty", func(t *testing.T) {
p := pool.Make[[]byte](nil, nil, pool.Options{Algo: pool.NoShrink()})
p.Free()
s := p.Stats()
var e pool.Stats
if s != e {
t.Fatal(s)
}
})
t.Run("release on timeout no free", func(t *testing.T) {
c := times.Test()
b := syncbus.New(time.Second)
o := pool.Options{
Algo: pool.Timeout(3 * time.Millisecond),
Clock: c,
TestBus: b,
}
p := pool.Make[[]byte](nil, nil, o)
p.Put(make([]byte, 1<<9))
if err := b.Wait("background-job-running"); err != nil {
t.Fatal(err)
}
c.Pass(2 * time.Millisecond)
p.Put(make([]byte, 1<<9))
if err := b.Wait("background-job-running"); err != nil {
t.Fatal(err)
}
c.Pass(2 * time.Millisecond)
b.Wait("free-idle-done")
s := p.Stats()
e := pool.Stats{Put: 2, Idle: 1, Free: 1}
if s == e {
return
}
})
t.Run("release on timeout with free", func(t *testing.T) {
var freeCount int
f := func([]byte) { freeCount++ }
c := times.Test()
b := syncbus.New(time.Second)
o := pool.Options{
Algo: pool.Timeout(3 * time.Millisecond),
Clock: c,
TestBus: b,
}
p := pool.Make[[]byte](nil, f, o)
p.Put(make([]byte, 1<<9))
if err := b.Wait("background-job-running"); err != nil {
t.Fatal(err)
}
c.Pass(2 * time.Millisecond)
p.Put(make([]byte, 1<<9))
if err := b.Wait("background-job-running"); err != nil {
t.Fatal(err)
}
c.Pass(2 * time.Millisecond)
b.Wait("free-idle-done")
s := p.Stats()
e := pool.Stats{Put: 2, Idle: 1, Free: 1}
if s == e {
if freeCount != 1 {
t.Fatal(freeCount)
}
return
}
})
t.Run("use default algo", func(t *testing.T) {
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
p := pool.Make(alloc, nil, pool.Options{})
var bs [][]byte
for i := 0; i < 9; i++ {
b, err := p.Get()
if err != nil {
t.Fatal(err)
}
bs = append(bs, b)
}
for _, b := range bs[:2*len(bs)/3] {
p.Put(b)
}
s := p.Stats()
e := pool.Stats{Alloc: 9, Get: 9, Put: 6, Active: 3, Idle: 3, Free: 3}
if s != e {
t.Fatal(s)
}
})
} }