1
0

testing and prewarm functionality

This commit is contained in:
Arpad Ryszka 2026-03-14 19:03:18 +01:00
parent 8495fcf619
commit 4dea0194c4
10 changed files with 1697 additions and 53 deletions

View File

@ -2,6 +2,7 @@ package pool
import ( import (
"code.squareroundforest.org/arpio/times" "code.squareroundforest.org/arpio/times"
"math"
"time" "time"
) )
@ -16,8 +17,8 @@ type adaptive struct {
activeTime time.Time activeTime time.Time
nsTO time.Duration nsTO time.Duration
idle bool idle bool
average int average float64
deviation int deviation float64
} }
func makeAdaptiveAlgo() *adaptive { func makeAdaptiveAlgo() *adaptive {
@ -28,40 +29,24 @@ func (a *adaptive) setClock(c times.Clock) {
a.clock = c a.clock = c
} }
func abs(v int) int { func movingAverage(prev, currv float64) float64 {
if v >= 0 { return prev + (currv-prev)/math.E
return v
} }
return 0 - v func movingAbsoluteDeviation(prev, currv, currav float64) float64 {
return prev + (math.Abs(currv-currav)-prev)/math.E
} }
func divE(v int) int { func targetCapacity(av, dev float64) float64 {
return (3 * v) >> 3 // 1 / 2.72 => 3 / 8 return av + dev*math.E
}
func mulE(v int) int {
return (11 * v) >> 2 // 2.72 => 11 / 4
}
func movingAverage(prev, currv int) int {
return prev + divE(currv-prev)
}
func movingAbsoluteDeviation(prev, currv, currav int) int {
return prev + divE(abs(currv-currav)-prev)
}
func targetCapacity(av, dev int) int {
return av + mulE(dev)
} }
func (a *adaptive) target(s Stats) int { func (a *adaptive) target(s Stats) int {
av := movingAverage(a.average, s.Active) av := movingAverage(a.average, float64(s.Active))
dev := movingAbsoluteDeviation(a.deviation, s.Active, av) dev := movingAbsoluteDeviation(a.deviation, float64(s.Active), av)
a.average = av a.average = av
a.deviation = dev a.deviation = dev
return targetCapacity(av, dev) return int(targetCapacity(av, dev))
} }
func (a *adaptive) nightshift(s Stats) time.Duration { func (a *adaptive) nightshift(s Stats) time.Duration {
@ -81,7 +66,7 @@ func (a *adaptive) nightshift(s Stats) time.Duration {
} }
a.nsTO = now.Sub(a.activeTime) a.nsTO = now.Sub(a.activeTime)
a.nsTO = (3 * a.nsTO) >> 3 a.nsTO = (3 * a.nsTO) / 8
if a.nsTO < minNightshiftTime { if a.nsTO < minNightshiftTime {
a.nsTO = minNightshiftTime a.nsTO = minNightshiftTime
} }
@ -98,3 +83,9 @@ func (a *adaptive) Target(s Stats) (int, time.Duration) {
ns := a.nightshift(s) ns := a.nightshift(s)
return t, ns return t, ns
} }
func (a *adaptive) Load(n int) {
// we lie to the algorithm when adding the additional idle count to the average active count. This way
// we can adjust the calculated target capacity:
a.average += float64(n)
}

View File

@ -1,6 +1,332 @@
package pool_test package pool_test
import "testing" import (
"code.squareroundforest.org/arpio/pool"
"code.squareroundforest.org/arpio/syncbus"
"code.squareroundforest.org/arpio/times"
"testing"
"time"
)
func TestAdaptive(t *testing.T) { func TestAdaptive(t *testing.T) {
t.Run("basic", func(t *testing.T) {
t.Run("no concurrency", func(t *testing.T) {
testBasicSet(t, scenarioOptions{})
})
t.Run("low concurrency", func(t *testing.T) {
testBasicSet(t, scenarioOptions{concurrency: 8})
})
t.Run("high concurrency", func(t *testing.T) {
testBasicSet(t, scenarioOptions{concurrency: 256})
})
})
t.Run("cyclic", func(t *testing.T) {
t.Run("no concurrency", func(t *testing.T) {
testCyclicSet(t, scenarioOptions{})
})
t.Run("low concurrency", func(t *testing.T) {
testCyclicSet(t, scenarioOptions{concurrency: 8})
})
t.Run("high concurrency", func(t *testing.T) {
testCyclicSet(t, scenarioOptions{concurrency: 256})
})
})
t.Run("nightshift", func(t *testing.T) {
const (
initial = 15
variation = 10
variationCycles = 3
variationStepTime = 10 * time.Millisecond
dropStepTime = time.Millisecond
waitCycles = 36
waitTime = time.Second
)
bus := syncbus.New(time.Second)
clock := times.Test()
o := pool.Options{
Clock: clock,
TestBus: bus,
}
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
p := pool.Make(alloc, nil, o)
var active [][]byte
get := func() {
b, err := p.Get()
if err != nil {
t.Fatal(err)
}
active = append(active, b)
}
put := func() {
if len(active) == 0 {
t.Fatal("put called from empty active")
}
var b []byte
b, active = active[0], active[1:]
p.Put(b)
}
for i := 0; i < initial; i++ {
get()
}
for i := 0; i <= variationCycles; i++ {
for j := 0; j < variation; j++ {
put()
clock.Pass(variationStepTime)
}
for j := 0; j < variation; j++ {
get()
clock.Pass(variationStepTime)
}
}
for len(active) > 0 {
put()
clock.Pass(dropStepTime)
}
for i := 0; i < waitCycles; i++ {
if err := bus.Wait("background-job-waiting"); err != nil {
t.Fatal(err)
}
bus.ResetSignals("background-job-waiting")
clock.Pass(waitTime)
if err := bus.Wait("free-idle-done"); err != nil {
t.Fatal(err)
}
bus.ResetSignals("free-idle-done")
}
if p.Stats().Idle != 0 {
t.Fatal(p.Stats())
}
})
t.Run("external put", func(t *testing.T) {
t.Run("initial", func(t *testing.T) {
const (
initialCount = 15
steadyUseCycles = 8
)
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
p := pool.Make(alloc, nil, pool.Options{})
var active [][]byte
get := func() {
b, err := p.Get()
if err != nil {
t.Fatal(err)
}
active = append(active, b)
}
put := func() {
if len(active) == 0 {
t.Fatal("put called from empty active")
}
var b []byte
b, active = active[0], active[1:]
p.Put(b)
}
for i := 0; i < initialCount; i++ {
p.Put(make([]byte, 1<<9))
}
for i := 0; i < steadyUseCycles; i++ {
get()
put()
}
s := p.Stats()
e := pool.Stats{Idle: 0, Active: 0, Get: 8, Put: 23, Alloc: 8, Free: 23}
if s != e {
t.Fatal(s)
}
})
t.Run("expect higher load", func(t *testing.T) {
const (
initialCount = 15
steadyUseCycles = 8
adjustCount = 15
highLoadCycles = 8
)
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
p := pool.Make(alloc, nil, pool.Options{})
var active [][]byte
get := func() {
b, err := p.Get()
if err != nil {
t.Fatal(err)
}
active = append(active, b)
}
put := func() {
if len(active) == 0 {
t.Fatal("put called from empty active")
}
var b []byte
b, active = active[0], active[1:]
p.Put(b)
}
for i := 0; i < initialCount; i++ {
get()
}
for i := 0; i < steadyUseCycles; i++ {
get()
put()
}
for i := 0; i < adjustCount; i++ {
p.Put(make([]byte, 1<<9))
}
for i := 0; i < highLoadCycles; i++ {
get()
get()
put()
}
s := p.Stats()
e := pool.Stats{Idle: 1, Active: 8, Get: 39, Put: 31, Alloc: 20, Free: 11}
if s != e {
t.Fatal(s)
}
})
})
t.Run("load", func(t *testing.T) {
t.Run("prewarm", func(t *testing.T) {
const (
initialCount = 15
steadyUseCycles = 8
)
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
p := pool.Make(alloc, nil, pool.Options{})
var active [][]byte
get := func() {
b, err := p.Get()
if err != nil {
t.Fatal(err)
}
active = append(active, b)
}
put := func() {
if len(active) == 0 {
t.Fatal("put called from empty active")
}
var b []byte
b, active = active[0], active[1:]
p.Put(b)
}
l := make([][]byte, initialCount)
for i := 0; i < initialCount; i++ {
l[i] = make([]byte, 1<<9)
}
p.Load(l)
for i := 0; i < steadyUseCycles; i++ {
get()
put()
}
s := p.Stats()
e := pool.Stats{Idle: 3, Active: 0, Get: 8, Put: 8, Alloc: 0, Free: 12}
if s != e {
t.Fatal(s)
}
})
t.Run("expect higher load", func(t *testing.T) {
const (
initialCount = 15
steadyUseCycles = 8
adjustCount = 15
highLoadCycles = 8
)
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
p := pool.Make(alloc, nil, pool.Options{})
var active [][]byte
get := func() {
b, err := p.Get()
if err != nil {
t.Fatal(err)
}
active = append(active, b)
}
put := func() {
if len(active) == 0 {
t.Fatal("put called from empty active")
}
var b []byte
b, active = active[0], active[1:]
p.Put(b)
}
for i := 0; i < initialCount; i++ {
get()
}
for i := 0; i < steadyUseCycles; i++ {
get()
put()
}
l := make([][]byte, adjustCount)
for i := 0; i < adjustCount; i++ {
l[i] = make([]byte, 1<<9)
}
p.Load(l)
for i := 0; i < highLoadCycles; i++ {
get()
get()
put()
}
s := p.Stats()
e := pool.Stats{Idle: 8, Active: 23, Get: 39, Put: 16, Alloc: 16, Free: 0}
if s != e {
t.Fatal(s)
}
})
})
} }

4
go.mod
View File

@ -4,7 +4,5 @@ go 1.25.6
require ( require (
code.squareroundforest.org/arpio/syncbus v0.0.0-20260222175441-f7da66ad4045 code.squareroundforest.org/arpio/syncbus v0.0.0-20260222175441-f7da66ad4045
code.squareroundforest.org/arpio/times v0.0.0-20260304202452-0bdc043a8aa6 code.squareroundforest.org/arpio/times v0.0.0-20260305173954-03e9105ce6bb
) )
replace code.squareroundforest.org/arpio/times => ../times

2
go.sum
View File

@ -1,2 +1,4 @@
code.squareroundforest.org/arpio/syncbus v0.0.0-20260222175441-f7da66ad4045 h1:eSg4fnu8x6/7B6aem2ibxHX8SxFs9Mo2n2etWg4eGFY= code.squareroundforest.org/arpio/syncbus v0.0.0-20260222175441-f7da66ad4045 h1:eSg4fnu8x6/7B6aem2ibxHX8SxFs9Mo2n2etWg4eGFY=
code.squareroundforest.org/arpio/syncbus v0.0.0-20260222175441-f7da66ad4045/go.mod h1:xZqPFR30EESkog+JzR40zDKVoBc7zmrV1X+Wo0v86p4= code.squareroundforest.org/arpio/syncbus v0.0.0-20260222175441-f7da66ad4045/go.mod h1:xZqPFR30EESkog+JzR40zDKVoBc7zmrV1X+Wo0v86p4=
code.squareroundforest.org/arpio/times v0.0.0-20260305173954-03e9105ce6bb h1:1zN3YFn1vBRh+ePthL6aW5ohjQEIDxhwv0r3yPUDYhw=
code.squareroundforest.org/arpio/times v0.0.0-20260305173954-03e9105ce6bb/go.mod h1:ca765bCK+zdje0bPWmOO/8psbaU4qkBrvE+8he883Pg=

23
lib.go
View File

@ -50,7 +50,15 @@ type Algo interface {
// a single pool instance only calls it from a single goroutine at a time // a single pool instance only calls it from a single goroutine at a time
// items need to be allocated always by calling Get // items need to be allocated always by calling Get
// second return argument for requested next check // second return argument for requested next check
Target(Stats) (int, time.Duration) // the Target function of a single Algo implementation is not called concurrently
// called on all put, regardless of nextCheck
// not all nextChecks result in a call if a previously request nextCheck is still pending
Target(Stats) (target int, nextCheck time.Duration)
// called when Pool.Load
// can be used to adjust internal state
// can be noop if not required
Load(int)
} }
type Options struct { type Options struct {
@ -116,6 +124,10 @@ func (s Stats) String() string {
} }
// zero-config // zero-config
// potential caveats:
// - a caveaat depending on the expectations, since no absolute time input is used, identifies frequent
// spikes from zero and slow grow and shrink from and to zero cycles are considered the same and the pool cleans
// up idle items accordingly. In short: _|_|_|_ = __/\__/\__/\__
func Adaptive() Algo { func Adaptive() Algo {
return makeAdaptiveAlgo() return makeAdaptiveAlgo()
} }
@ -137,7 +149,7 @@ func Timeout(to time.Duration) Algo {
return makeMaxTimeout(0, to) return makeMaxTimeout(0, to)
} }
// the user code can decide not to put back items to the pool // the user code can decide not to put back items to the pool, however, the primary purpose is testing
func NoShrink() Algo { func NoShrink() Algo {
return makeMaxTimeout(0, 0) return makeMaxTimeout(0, 0)
} }
@ -154,10 +166,17 @@ func (p Pool[R]) Get() (R, error) {
return p.pool.get() return p.pool.get()
} }
// it is allowed to put items that were not received by get, but the selected algorithm may produce unexpected
// behavior. In most cases, it is recommended to use Load instead, and using Put to put back only those items
// that were received via Get.
func (p Pool[R]) Put(i R) { func (p Pool[R]) Put(i R) {
p.pool.put(i) p.pool.put(i)
} }
func (p Pool[R]) Load(i []R) {
p.pool.load(i)
}
func (p Pool[R]) Stats() Stats { func (p Pool[R]) Stats() Stats {
return p.pool.stats() return p.pool.stats()
} }

View File

@ -85,3 +85,14 @@ func (a *maxTimeout) Target(s Stats) (int, time.Duration) {
return t, a.items[0].Add(a.to).Sub(now) return t, a.items[0].Add(a.to).Sub(now)
} }
func (a *maxTimeout) Load(n int) {
if a.to <= 0 {
return
}
now := a.clock.Now()
for i := 0; i < n; i++ {
a.items = append(a.items, now)
}
}

464
maxto_test.go Normal file
View File

@ -0,0 +1,464 @@
package pool_test
import (
"code.squareroundforest.org/arpio/pool"
"code.squareroundforest.org/arpio/times"
"testing"
"time"
)
func TestMaxTO(t *testing.T) {
t.Run("noshrink", func(t *testing.T) {
base := scenarioOptions{algo: pool.NoShrink()}
t.Run("basic", func(t *testing.T) {
t.Run("no concurrency", func(t *testing.T) {
o := base
testBasicSet(t, o)
})
t.Run("low concurrency", func(t *testing.T) {
o := base
o.concurrency = 8
testBasicSet(t, o)
})
t.Run("high concurrency", func(t *testing.T) {
o := base
o.concurrency = 256
testBasicSet(t, o)
})
})
t.Run("cyclic", func(t *testing.T) {
t.Run("no concurrency", func(t *testing.T) {
o := base
testCyclicSet(t, o)
})
t.Run("low concurrency", func(t *testing.T) {
o := base
o.concurrency = 8
testCyclicSet(t, o)
})
t.Run("high concurrency", func(t *testing.T) {
o := base
o.concurrency = 256
testCyclicSet(t, o)
})
})
})
t.Run("max", func(t *testing.T) {
base := scenarioOptions{algo: pool.Max(60)}
t.Run("basic", func(t *testing.T) {
t.Run("no concurrency", func(t *testing.T) {
o := base
testBasicSet(t, o)
})
t.Run("low concurrency", func(t *testing.T) {
o := base
o.concurrency = 8
testBasicSet(t, o)
})
t.Run("high concurrency", func(t *testing.T) {
o := base
o.concurrency = 256
o.exclude = []string{
"steady_step_up_small",
"steady_step_up_large",
"slow_rise_from_zero_small",
"slow_rise_from_zero_large",
}
testBasicSet(t, o)
})
})
t.Run("cyclic", func(t *testing.T) {
t.Run("no concurrency", func(t *testing.T) {
o := base
testCyclicSet(t, o)
})
t.Run("low concurrency", func(t *testing.T) {
o := base
o.concurrency = 8
testCyclicSet(t, o)
})
t.Run("high concurrency", func(t *testing.T) {
o := base
o.concurrency = 256
testCyclicSet(t, o)
})
})
})
t.Run("to", func(t *testing.T) {
base := scenarioOptions{
algo: pool.Timeout(300 * time.Millisecond),
minDelay: 10 * time.Millisecond,
maxDelay: 100 * time.Millisecond,
}
t.Run("basic", func(t *testing.T) {
t.Run("no concurrency", func(t *testing.T) {
o := base
o.exclude = []string{
"steady_step_up_large",
"slow_rise_from_zero_small",
"slow_rise_from_zero_large",
}
testBasicSet(t, o)
})
t.Run("low concurrency", func(t *testing.T) {
o := base
o.concurrency = 8
testBasicSet(t, o)
})
t.Run("high concurrency", func(t *testing.T) {
o := base
o.concurrency = 256
testBasicSet(t, o)
})
})
t.Run("cyclic", func(t *testing.T) {
t.Run("no concurrency", func(t *testing.T) {
o := base
o.exclude = []string{"sinus_large"}
testCyclicSet(t, o)
})
t.Run("low concurrency", func(t *testing.T) {
o := base
o.concurrency = 8
testCyclicSet(t, o)
})
t.Run("high concurrency", func(t *testing.T) {
o := base
o.concurrency = 256
testCyclicSet(t, o)
})
})
})
t.Run("maxto", func(t *testing.T) {
base := scenarioOptions{
algo: pool.MaxTimeout(60, 300*time.Millisecond),
minDelay: 10 * time.Millisecond,
maxDelay: 100 * time.Millisecond,
}
t.Run("basic", func(t *testing.T) {
t.Run("no concurrency", func(t *testing.T) {
o := base
o.exclude = []string{
"steady_step_up_large",
"slow_rise_from_zero_small",
"slow_rise_from_zero_large",
}
testBasicSet(t, o)
})
t.Run("low concurrency", func(t *testing.T) {
o := base
o.concurrency = 8
testBasicSet(t, o)
})
t.Run("high concurrency", func(t *testing.T) {
o := base
o.concurrency = 256
o.exclude = []string{
"steady_step_up_large",
"steady_step_up_small",
}
testBasicSet(t, o)
})
})
t.Run("cyclic", func(t *testing.T) {
t.Run("no concurrency", func(t *testing.T) {
o := base
o.exclude = []string{"sinus_large"}
testCyclicSet(t, o)
})
t.Run("low concurrency", func(t *testing.T) {
o := base
o.concurrency = 8
testCyclicSet(t, o)
})
t.Run("high concurrency", func(t *testing.T) {
o := base
o.concurrency = 256
testCyclicSet(t, o)
})
})
})
t.Run("external put", func(t *testing.T) {
t.Run("initial", func(t *testing.T) {
const (
initialCount = 15
steadyUseCycles = 8
stepDuration = 30 * time.Millisecond
)
clock := times.Test()
o := pool.Options{
Clock: clock,
Algo: pool.MaxTimeout(15, 300*time.Millisecond),
}
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
p := pool.Make(alloc, nil, o)
var active [][]byte
get := func() {
b, err := p.Get()
if err != nil {
t.Fatal(err)
}
active = append(active, b)
}
put := func() {
if len(active) == 0 {
t.Fatal("put called from empty active")
}
var b []byte
b, active = active[0], active[1:]
p.Put(b)
}
for i := 0; i < initialCount; i++ {
p.Put(make([]byte, 1<<9))
}
for i := 0; i < steadyUseCycles; i++ {
get()
clock.Pass(stepDuration)
put()
clock.Pass(stepDuration)
}
s := p.Stats()
e := pool.Stats{Idle: 1, Active: 0, Get: 8, Put: 23, Alloc: 0, Free: 14}
if s != e {
t.Fatal(s)
}
})
t.Run("expect higher load", func(t *testing.T) {
const (
initialCount = 15
steadyUseCycles = 8
adjustCount = 15
highLoadCycles = 8
stepDuration = 30 * time.Millisecond
)
clock := times.Test()
o := pool.Options{
Clock: clock,
Algo: pool.MaxTimeout(15, 300*time.Millisecond),
}
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
p := pool.Make(alloc, nil, o)
var active [][]byte
get := func() {
b, err := p.Get()
if err != nil {
t.Fatal(err)
}
active = append(active, b)
}
put := func() {
if len(active) == 0 {
t.Fatal("put called from empty active")
}
var b []byte
b, active = active[0], active[1:]
p.Put(b)
}
for i := 0; i < initialCount; i++ {
get()
}
for i := 0; i < steadyUseCycles; i++ {
put()
clock.Pass(stepDuration)
get()
clock.Pass(stepDuration)
}
for i := 0; i < adjustCount; i++ {
p.Put(make([]byte, 1<<9))
}
for i := 0; i < highLoadCycles; i++ {
get()
clock.Pass(stepDuration)
get()
clock.Pass(stepDuration)
put()
clock.Pass(stepDuration)
}
s := p.Stats()
e := pool.Stats{Idle: 1, Active: 8, Get: 39, Put: 31, Alloc: 19, Free: 10}
if s != e {
t.Fatal(s)
}
})
})
t.Run("load", func(t *testing.T) {
t.Run("prewarm", func(t *testing.T) {
const (
initialCount = 15
steadyUseCycles = 8
stepDuration = 30 * time.Millisecond
)
clock := times.Test()
o := pool.Options{
Clock: clock,
Algo: pool.MaxTimeout(15, 300*time.Millisecond),
}
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
p := pool.Make(alloc, nil, o)
var active [][]byte
get := func() {
b, err := p.Get()
if err != nil {
t.Fatal(err)
}
active = append(active, b)
}
put := func() {
if len(active) == 0 {
t.Fatal("put called from empty active")
}
var b []byte
b, active = active[0], active[1:]
p.Put(b)
}
l := make([][]byte, initialCount)
for i := 0; i < initialCount; i++ {
l[i] = make([]byte, 1<<9)
}
p.Load(l)
for i := 0; i < steadyUseCycles; i++ {
get()
clock.Pass(stepDuration)
put()
clock.Pass(stepDuration)
}
s := p.Stats()
e := pool.Stats{Idle: 1, Active: 0, Get: 8, Put: 8, Alloc: 0, Free: 14}
if s != e {
t.Fatal(s)
}
})
t.Run("expect higher load", func(t *testing.T) {
const (
initialCount = 15
steadyUseCycles = 8
adjustCount = 15
highLoadCycles = 8
stepDuration = 30 * time.Millisecond
)
clock := times.Test()
o := pool.Options{
Clock: clock,
Algo: pool.MaxTimeout(15, 300*time.Millisecond),
}
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
p := pool.Make(alloc, nil, o)
var active [][]byte
get := func() {
b, err := p.Get()
if err != nil {
t.Fatal(err)
}
active = append(active, b)
}
put := func() {
if len(active) == 0 {
t.Fatal("put called from empty active")
}
var b []byte
b, active = active[0], active[1:]
p.Put(b)
}
for i := 0; i < initialCount; i++ {
get()
}
for i := 0; i < steadyUseCycles; i++ {
get()
clock.Pass(stepDuration)
put()
clock.Pass(stepDuration)
}
l := make([][]byte, adjustCount)
for i := 0; i < adjustCount; i++ {
l[i] = make([]byte, 1<<9)
}
p.Load(l)
for i := 0; i < highLoadCycles; i++ {
get()
clock.Pass(stepDuration)
get()
clock.Pass(stepDuration)
put()
clock.Pass(stepDuration)
}
s := p.Stats()
e := pool.Stats{Idle: 1, Active: 23, Get: 39, Put: 16, Alloc: 20, Free: 11}
if s != e {
t.Fatal(s)
}
})
})
}

16
pool.go
View File

@ -146,6 +146,17 @@ func (p pool[R]) put(r R) {
} }
} }
func (p pool[R]) load(i []R) {
s := <-p.state
defer func() {
p.state <- s
}()
s.items = append(s.items, i...)
s.stats.Idle = len(s.items)
p.options.Algo.Load(len(i))
}
func (p pool[R]) forcedCheck(s state[R], timeout time.Duration) state[R] { func (p pool[R]) forcedCheck(s state[R], timeout time.Duration) state[R] {
if s.forcedCheckPending { if s.forcedCheckPending {
return s return s
@ -153,8 +164,9 @@ func (p pool[R]) forcedCheck(s state[R], timeout time.Duration) state[R] {
s.forcedCheckPending = true s.forcedCheckPending = true
go func(to time.Duration) { go func(to time.Duration) {
p.options.TestBus.Signal("background-job-running") c := p.options.Clock.After(to)
<-p.options.Clock.After(to) p.options.TestBus.Signal("background-job-waiting")
<-c
p.freeIdle() p.freeIdle()
}(timeout) }(timeout)

View File

@ -152,6 +152,20 @@ func TestPool(t *testing.T) {
} }
}) })
t.Run("load items", func(t *testing.T) {
p := pool.Make[[]byte](nil, nil, pool.Options{Algo: pool.NoShrink()})
l := make([][]byte, 9)
for i := 0; i < len(l); i++ {
l[i] = make([]byte, 1<<9)
}
p.Load(l)
s := p.Stats()
if s.Idle != 9 {
t.Fatal(s)
}
})
t.Run("release on put no free", func(t *testing.T) { t.Run("release on put no free", func(t *testing.T) {
p := pool.Make[[]byte](nil, nil, pool.Options{Algo: pool.Max(2)}) p := pool.Make[[]byte](nil, nil, pool.Options{Algo: pool.Max(2)})
p.Put(make([]byte, 1<<9)) p.Put(make([]byte, 1<<9))
@ -236,22 +250,21 @@ func TestPool(t *testing.T) {
p := pool.Make[[]byte](nil, nil, o) p := pool.Make[[]byte](nil, nil, o)
p.Put(make([]byte, 1<<9)) p.Put(make([]byte, 1<<9))
if err := b.Wait("background-job-running"); err != nil { if err := b.Wait("background-job-waiting"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
c.Pass(2 * time.Millisecond) c.Pass(2 * time.Millisecond)
p.Put(make([]byte, 1<<9)) p.Put(make([]byte, 1<<9))
if err := b.Wait("background-job-running"); err != nil { c.Pass(2 * time.Millisecond)
if err := b.Wait("free-idle-done"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
c.Pass(2 * time.Millisecond)
b.Wait("free-idle-done")
s := p.Stats() s := p.Stats()
e := pool.Stats{Put: 2, Idle: 1, Free: 1} e := pool.Stats{Put: 2, Idle: 1, Free: 1}
if s == e { if s != e {
return t.Fatal(s)
} }
}) })
@ -268,26 +281,21 @@ func TestPool(t *testing.T) {
p := pool.Make[[]byte](nil, f, o) p := pool.Make[[]byte](nil, f, o)
p.Put(make([]byte, 1<<9)) p.Put(make([]byte, 1<<9))
if err := b.Wait("background-job-running"); err != nil { if err := b.Wait("background-job-waiting"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
c.Pass(2 * time.Millisecond) c.Pass(2 * time.Millisecond)
p.Put(make([]byte, 1<<9)) p.Put(make([]byte, 1<<9))
if err := b.Wait("background-job-running"); err != nil { c.Pass(2 * time.Millisecond)
if err := b.Wait("free-idle-done"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
c.Pass(2 * time.Millisecond)
b.Wait("free-idle-done")
s := p.Stats() s := p.Stats()
e := pool.Stats{Put: 2, Idle: 1, Free: 1} e := pool.Stats{Put: 2, Idle: 1, Free: 1}
if s == e { if s != e || freeCount != 1 {
if freeCount != 1 { t.Fatal(s, freeCount)
t.Fatal(freeCount)
}
return
} }
}) })
@ -310,7 +318,7 @@ func TestPool(t *testing.T) {
} }
s := p.Stats() s := p.Stats()
e := pool.Stats{Alloc: 9, Get: 9, Put: 6, Active: 3, Idle: 3, Free: 3} e := pool.Stats{Alloc: 9, Get: 9, Put: 6, Active: 3, Idle: 6, Free: 0}
if s != e { if s != e {
t.Fatal(s) t.Fatal(s)
} }

813
scenario_test.go Normal file
View File

@ -0,0 +1,813 @@
package pool_test
import (
"code.squareroundforest.org/arpio/pool"
"code.squareroundforest.org/arpio/times"
"math"
"math/rand"
"strings"
"testing"
"time"
)
type scenarioOptions struct {
algo pool.Algo
initial int
ops int
deviation int
changeRate int
minDelay time.Duration
maxDelay time.Duration
concurrency int
plot bool
exclude []string
}
type (
resource[T any] chan T
active resource[[][]byte]
stats resource[[]pool.Stats]
scenarioStep func(scenarioOptions, *rand.Rand, int, int, func(), func())
verifyScenario func(*testing.T, scenarioOptions, []pool.Stats)
)
func initResource[T any]() resource[T] {
var zero T
r := make(resource[T], 1)
r <- zero
return r
}
func (r resource[T]) apply(f func(T) T) {
v := <-r
defer func() {
r <- v
}()
v = f(v)
}
func (a active) count() int {
var c int
resource[[][]byte](a).apply(func(a [][]byte) [][]byte {
c = len(a)
return a
})
return c
}
func (a active) shift() []byte {
var b []byte
resource[[][]byte](a).apply(func(a [][]byte) [][]byte {
if len(a) == 0 {
return a
}
b, a = a[len(a)-1], a[:len(a)-1]
return a
})
return b
}
func (a active) push(b []byte) {
resource[[][]byte](a).apply(func(a [][]byte) [][]byte {
return append(a, b)
})
}
func (s stats) get() []pool.Stats {
var a []pool.Stats
resource[[]pool.Stats](s).apply(func(r []pool.Stats) []pool.Stats {
a = r
return r
})
return a
}
func (s stats) push(a pool.Stats) {
resource[[]pool.Stats](s).apply(func(r []pool.Stats) []pool.Stats {
return append(r, a)
})
}
func testScenario(t *testing.T, o scenarioOptions, step scenarioStep, verify verifyScenario) {
for _, n := range o.exclude {
if strings.HasSuffix(t.Name(), n) {
t.Skip()
}
}
var (
testClock times.TestClock
clock times.Clock
)
if o.concurrency <= 0 {
o.concurrency = 1
}
o.initial *= o.concurrency
if o.minDelay > 0 {
if o.maxDelay < o.minDelay {
o.maxDelay = o.minDelay
}
testClock = times.Test()
clock = testClock
}
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
po := pool.Options{
Algo: o.algo,
Clock: clock,
}
p := pool.Make(alloc, nil, po)
active := active(initResource[[][]byte]())
stats := stats(initResource[[]pool.Stats]())
get := func() {
b, err := p.Get()
if err != nil {
t.Fatal(err)
}
active.push(b)
}
put := func() {
b := active.shift()
if len(b) == 0 {
return
}
p.Put(b)
}
for i := 0; i < o.initial; i++ {
get()
}
stats.push(p.Stats())
rnd := rand.New(rand.NewSource(0))
c := make(chan struct{}, o.concurrency)
iter := func(i int, localClock times.TestClock) {
if o.minDelay > 0 {
d := o.minDelay
if o.maxDelay > o.minDelay {
diff := o.maxDelay - o.minDelay
rdiff := rand.Intn(int(diff))
d += time.Duration(rdiff)
}
localClock.Pass(d)
testClock.Jump(localClock.Now())
}
step(o, rnd, i, active.count(), get, put)
stats.push(p.Stats())
}
for i := 0; i < o.concurrency; i++ {
go func() {
var localClock times.TestClock
if o.minDelay > 0 {
localClock = times.Test()
}
for i := 0; i < o.ops; i++ {
iter(i, localClock)
}
c <- struct{}{}
}()
}
for i := 0; i < o.concurrency; i++ {
<-c
}
if o.plot {
for i, s := range stats.get() {
t.Log(i, s)
}
}
if verify != nil {
verify(t, o, stats.get())
}
}
func testCyclicScenario(t *testing.T, o scenarioOptions, step scenarioStep, verify verifyScenario) {
cyclicStep := func(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) {
o.ops = o.ops / 4
i = i % o.ops
step(o, rnd, i, active, get, put)
}
testScenario(t, o, cyclicStep, verify)
}
func steadyStep(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func(), target int) {
switch {
case active > target:
put()
case active < target-o.deviation:
get()
default:
if rnd.Intn(2) == 1 {
get()
} else {
put()
}
}
}
func steady(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) {
steadyStep(o, rnd, i, active, get, put, o.initial)
}
func jumpStep(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) {
get()
}
func dropStep(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) {
put()
}
func steadyStepUp(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) {
switch {
case i < o.ops/3:
steadyStep(o, rnd, i, active, get, put, o.initial)
case i >= o.ops/3 && i < 2*o.ops/3:
jumpStep(o, rnd, i, active, get, put)
default:
steadyStep(o, rnd, i, active, get, put, o.initial/3)
}
}
func steadyStepDown(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) {
switch {
case i < o.ops/3:
steadyStep(o, rnd, i, active, get, put, o.initial)
case i >= o.ops/3 && i < 2*o.ops/3:
dropStep(o, rnd, i, active, get, put)
default:
steadyStep(o, rnd, i, active, get, put, o.initial/3)
}
}
func steadyStepDownToZero(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) {
switch {
case i < o.ops/3:
steadyStep(o, rnd, i, active, get, put, o.initial)
case i >= o.ops/3 && i < 2*o.ops/3:
dropStep(o, rnd, i, active, get, put)
default:
steadyStep(o, rnd, i, active, get, put, 0)
}
}
func change(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) {
cr := o.changeRate
inc := get
dec := put
if cr < 0 {
cr = 0 - cr
inc, dec = put, get
}
v := rnd.Intn(cr + 2)
if v == 0 {
dec()
return
}
inc()
}
func changeAndJump(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) {
var f scenarioStep
switch {
case 3*i < 2*o.ops:
f = change
default:
f = jumpStep
}
f(o, rnd, i, active, get, put)
}
func cyclicSpikes(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) {
var f scenarioStep
switch {
case 3*i < o.ops:
o.initial = active
f = steady
case 3*i < 2*o.ops:
f = jumpStep
default:
f = dropStep
}
f(o, rnd, i, active, get, put)
}
func cyclicDrops(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) {
var f scenarioStep
switch {
case 3*i < o.ops:
o.initial = active
f = steady
case 3*i < 2*o.ops:
f = dropStep
default:
f = jumpStep
}
f(o, rnd, i, active, get, put)
}
func sigmaSteps(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) {
var f scenarioStep
switch {
case 6*i < o.ops:
f = jumpStep
case 2*i < o.ops:
o.initial = active
f = steady
case 3*i < 2*o.ops:
f = dropStep
default:
o.initial = active
f = steady
}
f(o, rnd, i, active, get, put)
}
func chainSaw(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) {
var f scenarioStep
switch {
case 3*i < o.ops:
f = change
case 2*i < o.ops:
f = dropStep
case 6*i < 5*o.ops:
f = change
default:
f = dropStep
}
f(o, rnd, i, active, get, put)
}
func inverseChainSaw(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) {
var f scenarioStep
switch {
case 6*i < o.ops:
f = jumpStep
case 2*i < o.ops:
f = change
case 3*i < 2*o.ops:
f = jumpStep
default:
f = change
}
f(o, rnd, i, active, get, put)
}
func sinusStep(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) {
cycle := float64(o.ops)
amp := cycle / 4
x := float64(i)
sin := amp * math.Sin(x*2*math.Pi/cycle)
target := o.initial + int(sin)
if target < 0 {
target = 0
}
delta := target - active
absDelta := delta
if absDelta < 0 {
absDelta = 0 - absDelta
}
absR := rnd.Intn(o.deviation + absDelta)
r := absR - o.deviation
var grow bool
switch {
case delta >= 0 && r >= 0:
grow = true
case delta <= 0 && r < 0:
grow = true
}
f := put
if grow {
f = get
}
f()
}
func noopVerify(*testing.T, scenarioOptions, []pool.Stats) {}
func verifySteady(t *testing.T, o scenarioOptions, s []pool.Stats) {
for i, stats := range s {
if 2*stats.Idle > 3*o.deviation*o.concurrency {
t.Fatal(i, stats)
}
}
}
func verifyAllocRate(t *testing.T, o scenarioOptions, s []pool.Stats) {
if len(s) == 0 {
t.Fatal("no stats")
}
last := s[len(s)-1]
if (last.Alloc-o.initial)*3 > last.Get*2 {
t.Fatal("too many allocations", last)
}
}
func verifyAllocRateLax(t *testing.T, o scenarioOptions, s []pool.Stats) {
if len(s) == 0 {
t.Fatal("no stats")
}
last := s[len(s)-1]
if (last.Alloc-o.initial)*10 > last.Get*9 {
t.Fatal("too many allocations", last)
}
}
func verifyDealloc(t *testing.T, o scenarioOptions, s []pool.Stats) {
if len(s) == 0 {
t.Fatal("no stats")
}
last := s[len(s)-1]
if last.Free > o.deviation*o.concurrency {
t.Fatal("too many deallocations", last)
}
}
func testSteadyUsage(t *testing.T, o scenarioOptions) {
testScenario(t, o, steady, verifySteady)
}
func testSteadyStepUp(t *testing.T, o scenarioOptions) {
testScenario(t, o, steadyStepUp, verifyAllocRate)
}
func testSteadyStepDown(t *testing.T, o scenarioOptions) {
testScenario(t, o, steadyStepDown, verifyAllocRate)
}
func testSteadyStepDownToZero(t *testing.T, o scenarioOptions) {
testScenario(t, o, steadyStepDownToZero, verifyAllocRate)
}
func testChange(t *testing.T, o scenarioOptions) {
verify := verifyDealloc
if o.changeRate < 0 {
verify = verifyAllocRate
}
testScenario(t, o, change, verify)
}
func testChangeAndJump(t *testing.T, o scenarioOptions) {
testScenario(t, o, changeAndJump, verifyAllocRate)
}
func testCyclicSpikes(t *testing.T, o scenarioOptions) {
verify := noopVerify
if o.initial > 0 {
verify = verifyAllocRate
}
testCyclicScenario(t, o, cyclicSpikes, verify)
}
func testCyclicDrops(t *testing.T, o scenarioOptions) {
testCyclicScenario(t, o, cyclicDrops, verifyAllocRate)
}
func testCyclicSigmaSteps(t *testing.T, o scenarioOptions) {
testCyclicScenario(t, o, sigmaSteps, verifyAllocRate)
}
func testCyclicChainSaw(t *testing.T, o scenarioOptions) {
testCyclicScenario(t, o, chainSaw, verifyAllocRate)
}
func testCyclicInverseChainSaw(t *testing.T, o scenarioOptions) {
testCyclicScenario(t, o, inverseChainSaw, verifyAllocRateLax)
}
func testCyclicSinus(t *testing.T, o scenarioOptions) {
o.initial += o.ops / 16 // a single cycle is o.ops / 4, the amp in sinusStep is cycle ops / 4
testCyclicScenario(t, o, sinusStep, verifyAllocRate)
}
func testBasicSet(t *testing.T, base scenarioOptions) {
t.Run("steady small", func(t *testing.T) {
o := base
o.initial = 8
o.ops = 60
o.deviation = 2
testSteadyUsage(t, o)
})
t.Run("steady large", func(t *testing.T) {
o := base
o.initial = 60
o.ops = 1200
o.deviation = 10
testSteadyUsage(t, o)
})
t.Run("steady step up small", func(t *testing.T) {
o := base
o.initial = 8
o.ops = 60
o.deviation = 2
testSteadyStepUp(t, o)
})
t.Run("steady step up large", func(t *testing.T) {
o := base
o.initial = 60
o.ops = 1200
o.deviation = 10
testSteadyStepUp(t, o)
})
t.Run("steady step down small", func(t *testing.T) {
o := base
o.initial = 20
o.ops = 60
o.deviation = 2
testSteadyStepDown(t, o)
})
t.Run("steady step down large", func(t *testing.T) {
o := base
o.initial = 450
o.ops = 1200
o.deviation = 10
testSteadyStepDown(t, o)
})
t.Run("steady step down small to zero", func(t *testing.T) {
o := base
o.initial = 20
o.ops = 60
o.deviation = 2
testSteadyStepDownToZero(t, o)
})
t.Run("steady step down large to zero", func(t *testing.T) {
o := base
o.initial = 450
o.ops = 1200
o.deviation = 10
testSteadyStepDownToZero(t, o)
})
t.Run("slow rise from zero small", func(t *testing.T) {
o := base
o.ops = 60
o.changeRate = 1
o.deviation = 3
testChange(t, o)
})
t.Run("slow rise from zero large", func(t *testing.T) {
o := base
o.ops = 1200
o.changeRate = 1
o.deviation = 10
testChange(t, o)
})
t.Run("slow decrease to zero small", func(t *testing.T) {
o := base
o.initial = 15
o.ops = 60
o.changeRate = -1
o.deviation = 3
testChange(t, o)
})
t.Run("slow decrease to zero large", func(t *testing.T) {
o := base
o.initial = 300
o.ops = 1200
o.changeRate = -1
o.deviation = 10
testChange(t, o)
})
t.Run("slow decrease and jump small", func(t *testing.T) {
o := base
o.initial = 15
o.ops = 60
o.changeRate = -2
o.deviation = 3
testChangeAndJump(t, o)
})
t.Run("slow decrease and jump large", func(t *testing.T) {
o := base
o.initial = 300
o.ops = 1200
o.changeRate = -2
o.deviation = 10
testChangeAndJump(t, o)
})
}
func testCyclicSet(t *testing.T, base scenarioOptions) {
t.Run("cyclic spikes from zero small", func(t *testing.T) {
o := base
o.ops = 300
o.deviation = 3
testCyclicSpikes(t, o)
})
t.Run("cyclic spikes from zero large", func(t *testing.T) {
o := base
o.ops = 6000
o.deviation = 10
testCyclicSpikes(t, o)
})
t.Run("steady and cyclic spikes small", func(t *testing.T) {
o := base
o.initial = 30
o.ops = 300
o.deviation = 3
testCyclicSpikes(t, o)
})
t.Run("steady and cyclic spikes large", func(t *testing.T) {
o := base
o.initial = 300
o.ops = 6000
o.deviation = 10
testCyclicSpikes(t, o)
})
t.Run("steady and cyclic drops to zero small", func(t *testing.T) {
o := base
o.initial = 30
o.ops = 300
o.deviation = 3
testCyclicDrops(t, o)
})
t.Run("steady and cyclic drops to zero large", func(t *testing.T) {
o := base
o.initial = 420
o.ops = 6000
o.deviation = 10
testCyclicDrops(t, o)
})
t.Run("sigma steps from zero small", func(t *testing.T) {
o := base
o.ops = 300
o.deviation = 3
testCyclicSigmaSteps(t, o)
})
t.Run("sigma steps from zero large", func(t *testing.T) {
o := base
o.ops = 6000
o.deviation = 10
testCyclicSigmaSteps(t, o)
})
t.Run("sigma steps small", func(t *testing.T) {
o := base
o.initial = 60
o.ops = 300
o.deviation = 3
testCyclicSigmaSteps(t, o)
})
t.Run("sigma steps large", func(t *testing.T) {
o := base
o.initial = 600
o.ops = 6000
o.deviation = 10
testCyclicSigmaSteps(t, o)
})
t.Run("chain saw from zero small", func(t *testing.T) {
o := base
o.ops = 300
o.deviation = 3
o.changeRate = 2
testCyclicChainSaw(t, o)
})
t.Run("chain saw from zero large", func(t *testing.T) {
o := base
o.ops = 6000
o.deviation = 3
o.changeRate = 2
testCyclicChainSaw(t, o)
})
t.Run("chain saw small", func(t *testing.T) {
o := base
o.initial = 60
o.ops = 300
o.deviation = 3
o.changeRate = 2
testCyclicChainSaw(t, o)
})
t.Run("chain saw large", func(t *testing.T) {
o := base
o.initial = 600
o.ops = 6000
o.deviation = 3
o.changeRate = 2
testCyclicChainSaw(t, o)
})
t.Run("inverse chain saw from zero small", func(t *testing.T) {
o := base
o.ops = 300
o.deviation = 3
o.changeRate = -3
testCyclicInverseChainSaw(t, o)
})
t.Run("inverse chain saw from zero large", func(t *testing.T) {
o := base
o.ops = 6000
o.deviation = 3
o.changeRate = -3
testCyclicInverseChainSaw(t, o)
})
t.Run("inverse chain saw small", func(t *testing.T) {
o := base
o.initial = 60
o.ops = 300
o.deviation = 3
o.changeRate = -3
testCyclicInverseChainSaw(t, o)
})
t.Run("inverse chain saw large", func(t *testing.T) {
o := base
o.initial = 600
o.ops = 6000
o.deviation = 3
o.changeRate = -3
testCyclicInverseChainSaw(t, o)
})
t.Run("sinus to zero small", func(t *testing.T) {
o := base
o.ops = 300
o.deviation = 3
testCyclicSinus(t, o)
})
t.Run("sinus to zero large", func(t *testing.T) {
o := base
o.ops = 6000
o.deviation = 10
testCyclicSinus(t, o)
})
t.Run("sinus small", func(t *testing.T) {
o := base
o.initial = 15
o.ops = 300
o.deviation = 3
testCyclicSinus(t, o)
})
t.Run("sinus large", func(t *testing.T) {
o := base
o.initial = 60
o.ops = 6000
o.deviation = 10
testCyclicSinus(t, o)
})
}