From 4dea0194c475a784c27e5870a9963daa1f591115 Mon Sep 17 00:00:00 2001 From: Arpad Ryszka Date: Sat, 14 Mar 2026 19:03:18 +0100 Subject: [PATCH] testing and prewarm functionality --- adapative.go | 47 ++- adaptive_test.go | 328 ++++++++++++++++++- go.mod | 4 +- go.sum | 2 + lib.go | 23 +- maxto.go | 11 + maxto_test.go | 464 +++++++++++++++++++++++++++ pool.go | 16 +- pool_test.go | 42 ++- scenario_test.go | 813 +++++++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 1697 insertions(+), 53 deletions(-) create mode 100644 maxto_test.go create mode 100644 scenario_test.go diff --git a/adapative.go b/adapative.go index a38aedc..f968999 100644 --- a/adapative.go +++ b/adapative.go @@ -2,6 +2,7 @@ package pool import ( "code.squareroundforest.org/arpio/times" + "math" "time" ) @@ -16,8 +17,8 @@ type adaptive struct { activeTime time.Time nsTO time.Duration idle bool - average int - deviation int + average float64 + deviation float64 } func makeAdaptiveAlgo() *adaptive { @@ -28,40 +29,24 @@ func (a *adaptive) setClock(c times.Clock) { a.clock = c } -func abs(v int) int { - if v >= 0 { - return v - } - - return 0 - v +func movingAverage(prev, currv float64) float64 { + return prev + (currv-prev)/math.E } -func divE(v int) int { - return (3 * v) >> 3 // 1 / 2.72 => 3 / 8 +func movingAbsoluteDeviation(prev, currv, currav float64) float64 { + return prev + (math.Abs(currv-currav)-prev)/math.E } -func mulE(v int) int { - return (11 * v) >> 2 // 2.72 => 11 / 4 -} - -func movingAverage(prev, currv int) int { - return prev + divE(currv-prev) -} - -func movingAbsoluteDeviation(prev, currv, currav int) int { - return prev + divE(abs(currv-currav)-prev) -} - -func targetCapacity(av, dev int) int { - return av + mulE(dev) +func targetCapacity(av, dev float64) float64 { + return av + dev*math.E } func (a *adaptive) target(s Stats) int { - av := movingAverage(a.average, s.Active) - dev := movingAbsoluteDeviation(a.deviation, s.Active, av) + av := movingAverage(a.average, float64(s.Active)) + dev := movingAbsoluteDeviation(a.deviation, float64(s.Active), av) a.average = av a.deviation = dev - return targetCapacity(av, dev) + return int(targetCapacity(av, dev)) } func (a *adaptive) nightshift(s Stats) time.Duration { @@ -81,7 +66,7 @@ func (a *adaptive) nightshift(s Stats) time.Duration { } a.nsTO = now.Sub(a.activeTime) - a.nsTO = (3 * a.nsTO) >> 3 + a.nsTO = (3 * a.nsTO) / 8 if a.nsTO < minNightshiftTime { a.nsTO = minNightshiftTime } @@ -98,3 +83,9 @@ func (a *adaptive) Target(s Stats) (int, time.Duration) { ns := a.nightshift(s) return t, ns } + +func (a *adaptive) Load(n int) { + // we lie to the algorithm when adding the additional idle count to the average active count. This way + // we can adjust the calculated target capacity: + a.average += float64(n) +} diff --git a/adaptive_test.go b/adaptive_test.go index 5764bfe..2e165b6 100644 --- a/adaptive_test.go +++ b/adaptive_test.go @@ -1,6 +1,332 @@ package pool_test -import "testing" +import ( + "code.squareroundforest.org/arpio/pool" + "code.squareroundforest.org/arpio/syncbus" + "code.squareroundforest.org/arpio/times" + "testing" + "time" +) func TestAdaptive(t *testing.T) { + t.Run("basic", func(t *testing.T) { + t.Run("no concurrency", func(t *testing.T) { + testBasicSet(t, scenarioOptions{}) + }) + + t.Run("low concurrency", func(t *testing.T) { + testBasicSet(t, scenarioOptions{concurrency: 8}) + }) + + t.Run("high concurrency", func(t *testing.T) { + testBasicSet(t, scenarioOptions{concurrency: 256}) + }) + }) + + t.Run("cyclic", func(t *testing.T) { + t.Run("no concurrency", func(t *testing.T) { + testCyclicSet(t, scenarioOptions{}) + }) + + t.Run("low concurrency", func(t *testing.T) { + testCyclicSet(t, scenarioOptions{concurrency: 8}) + }) + + t.Run("high concurrency", func(t *testing.T) { + testCyclicSet(t, scenarioOptions{concurrency: 256}) + }) + }) + + t.Run("nightshift", func(t *testing.T) { + const ( + initial = 15 + variation = 10 + variationCycles = 3 + variationStepTime = 10 * time.Millisecond + dropStepTime = time.Millisecond + waitCycles = 36 + waitTime = time.Second + ) + + bus := syncbus.New(time.Second) + clock := times.Test() + o := pool.Options{ + Clock: clock, + TestBus: bus, + } + + alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil } + p := pool.Make(alloc, nil, o) + + var active [][]byte + get := func() { + b, err := p.Get() + if err != nil { + t.Fatal(err) + } + + active = append(active, b) + } + + put := func() { + if len(active) == 0 { + t.Fatal("put called from empty active") + } + + var b []byte + b, active = active[0], active[1:] + p.Put(b) + } + + for i := 0; i < initial; i++ { + get() + } + + for i := 0; i <= variationCycles; i++ { + for j := 0; j < variation; j++ { + put() + clock.Pass(variationStepTime) + } + + for j := 0; j < variation; j++ { + get() + clock.Pass(variationStepTime) + } + } + + for len(active) > 0 { + put() + clock.Pass(dropStepTime) + } + + for i := 0; i < waitCycles; i++ { + if err := bus.Wait("background-job-waiting"); err != nil { + t.Fatal(err) + } + + bus.ResetSignals("background-job-waiting") + clock.Pass(waitTime) + if err := bus.Wait("free-idle-done"); err != nil { + t.Fatal(err) + } + + bus.ResetSignals("free-idle-done") + } + + if p.Stats().Idle != 0 { + t.Fatal(p.Stats()) + } + }) + + t.Run("external put", func(t *testing.T) { + t.Run("initial", func(t *testing.T) { + const ( + initialCount = 15 + steadyUseCycles = 8 + ) + + alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil } + p := pool.Make(alloc, nil, pool.Options{}) + + var active [][]byte + get := func() { + b, err := p.Get() + if err != nil { + t.Fatal(err) + } + + active = append(active, b) + } + + put := func() { + if len(active) == 0 { + t.Fatal("put called from empty active") + } + + var b []byte + b, active = active[0], active[1:] + p.Put(b) + } + + for i := 0; i < initialCount; i++ { + p.Put(make([]byte, 1<<9)) + } + + for i := 0; i < steadyUseCycles; i++ { + get() + put() + } + + s := p.Stats() + e := pool.Stats{Idle: 0, Active: 0, Get: 8, Put: 23, Alloc: 8, Free: 23} + if s != e { + t.Fatal(s) + } + }) + + t.Run("expect higher load", func(t *testing.T) { + const ( + initialCount = 15 + steadyUseCycles = 8 + adjustCount = 15 + highLoadCycles = 8 + ) + + alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil } + p := pool.Make(alloc, nil, pool.Options{}) + + var active [][]byte + get := func() { + b, err := p.Get() + if err != nil { + t.Fatal(err) + } + + active = append(active, b) + } + + put := func() { + if len(active) == 0 { + t.Fatal("put called from empty active") + } + + var b []byte + b, active = active[0], active[1:] + p.Put(b) + } + + for i := 0; i < initialCount; i++ { + get() + } + + for i := 0; i < steadyUseCycles; i++ { + get() + put() + } + + for i := 0; i < adjustCount; i++ { + p.Put(make([]byte, 1<<9)) + } + + for i := 0; i < highLoadCycles; i++ { + get() + get() + put() + } + + s := p.Stats() + e := pool.Stats{Idle: 1, Active: 8, Get: 39, Put: 31, Alloc: 20, Free: 11} + if s != e { + t.Fatal(s) + } + }) + }) + + t.Run("load", func(t *testing.T) { + t.Run("prewarm", func(t *testing.T) { + const ( + initialCount = 15 + steadyUseCycles = 8 + ) + + alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil } + p := pool.Make(alloc, nil, pool.Options{}) + + var active [][]byte + get := func() { + b, err := p.Get() + if err != nil { + t.Fatal(err) + } + + active = append(active, b) + } + + put := func() { + if len(active) == 0 { + t.Fatal("put called from empty active") + } + + var b []byte + b, active = active[0], active[1:] + p.Put(b) + } + + l := make([][]byte, initialCount) + for i := 0; i < initialCount; i++ { + l[i] = make([]byte, 1<<9) + } + + p.Load(l) + for i := 0; i < steadyUseCycles; i++ { + get() + put() + } + + s := p.Stats() + e := pool.Stats{Idle: 3, Active: 0, Get: 8, Put: 8, Alloc: 0, Free: 12} + if s != e { + t.Fatal(s) + } + }) + + t.Run("expect higher load", func(t *testing.T) { + const ( + initialCount = 15 + steadyUseCycles = 8 + adjustCount = 15 + highLoadCycles = 8 + ) + + alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil } + p := pool.Make(alloc, nil, pool.Options{}) + + var active [][]byte + get := func() { + b, err := p.Get() + if err != nil { + t.Fatal(err) + } + + active = append(active, b) + } + + put := func() { + if len(active) == 0 { + t.Fatal("put called from empty active") + } + + var b []byte + b, active = active[0], active[1:] + p.Put(b) + } + + for i := 0; i < initialCount; i++ { + get() + } + + for i := 0; i < steadyUseCycles; i++ { + get() + put() + } + + l := make([][]byte, adjustCount) + for i := 0; i < adjustCount; i++ { + l[i] = make([]byte, 1<<9) + } + + p.Load(l) + for i := 0; i < highLoadCycles; i++ { + get() + get() + put() + } + + s := p.Stats() + e := pool.Stats{Idle: 8, Active: 23, Get: 39, Put: 16, Alloc: 16, Free: 0} + if s != e { + t.Fatal(s) + } + }) + }) } diff --git a/go.mod b/go.mod index e112747..2b2aae0 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,5 @@ go 1.25.6 require ( code.squareroundforest.org/arpio/syncbus v0.0.0-20260222175441-f7da66ad4045 - code.squareroundforest.org/arpio/times v0.0.0-20260304202452-0bdc043a8aa6 + code.squareroundforest.org/arpio/times v0.0.0-20260305173954-03e9105ce6bb ) - -replace code.squareroundforest.org/arpio/times => ../times diff --git a/go.sum b/go.sum index 649789f..0628c13 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,4 @@ code.squareroundforest.org/arpio/syncbus v0.0.0-20260222175441-f7da66ad4045 h1:eSg4fnu8x6/7B6aem2ibxHX8SxFs9Mo2n2etWg4eGFY= code.squareroundforest.org/arpio/syncbus v0.0.0-20260222175441-f7da66ad4045/go.mod h1:xZqPFR30EESkog+JzR40zDKVoBc7zmrV1X+Wo0v86p4= +code.squareroundforest.org/arpio/times v0.0.0-20260305173954-03e9105ce6bb h1:1zN3YFn1vBRh+ePthL6aW5ohjQEIDxhwv0r3yPUDYhw= +code.squareroundforest.org/arpio/times v0.0.0-20260305173954-03e9105ce6bb/go.mod h1:ca765bCK+zdje0bPWmOO/8psbaU4qkBrvE+8he883Pg= diff --git a/lib.go b/lib.go index 57f92a0..2528c4a 100644 --- a/lib.go +++ b/lib.go @@ -50,7 +50,15 @@ type Algo interface { // a single pool instance only calls it from a single goroutine at a time // items need to be allocated always by calling Get // second return argument for requested next check - Target(Stats) (int, time.Duration) + // the Target function of a single Algo implementation is not called concurrently + // called on all put, regardless of nextCheck + // not all nextChecks result in a call if a previously request nextCheck is still pending + Target(Stats) (target int, nextCheck time.Duration) + + // called when Pool.Load + // can be used to adjust internal state + // can be noop if not required + Load(int) } type Options struct { @@ -116,6 +124,10 @@ func (s Stats) String() string { } // zero-config +// potential caveats: +// - a caveaat depending on the expectations, since no absolute time input is used, identifies frequent +// spikes from zero and slow grow and shrink from and to zero cycles are considered the same and the pool cleans +// up idle items accordingly. In short: _|_|_|_ = __/\__/\__/\__ func Adaptive() Algo { return makeAdaptiveAlgo() } @@ -137,7 +149,7 @@ func Timeout(to time.Duration) Algo { return makeMaxTimeout(0, to) } -// the user code can decide not to put back items to the pool +// the user code can decide not to put back items to the pool, however, the primary purpose is testing func NoShrink() Algo { return makeMaxTimeout(0, 0) } @@ -154,10 +166,17 @@ func (p Pool[R]) Get() (R, error) { return p.pool.get() } +// it is allowed to put items that were not received by get, but the selected algorithm may produce unexpected +// behavior. In most cases, it is recommended to use Load instead, and using Put to put back only those items +// that were received via Get. func (p Pool[R]) Put(i R) { p.pool.put(i) } +func (p Pool[R]) Load(i []R) { + p.pool.load(i) +} + func (p Pool[R]) Stats() Stats { return p.pool.stats() } diff --git a/maxto.go b/maxto.go index 3d59eeb..83c1c34 100644 --- a/maxto.go +++ b/maxto.go @@ -85,3 +85,14 @@ func (a *maxTimeout) Target(s Stats) (int, time.Duration) { return t, a.items[0].Add(a.to).Sub(now) } + +func (a *maxTimeout) Load(n int) { + if a.to <= 0 { + return + } + + now := a.clock.Now() + for i := 0; i < n; i++ { + a.items = append(a.items, now) + } +} diff --git a/maxto_test.go b/maxto_test.go new file mode 100644 index 0000000..13f9c45 --- /dev/null +++ b/maxto_test.go @@ -0,0 +1,464 @@ +package pool_test + +import ( + "code.squareroundforest.org/arpio/pool" + "code.squareroundforest.org/arpio/times" + "testing" + "time" +) + +func TestMaxTO(t *testing.T) { + t.Run("noshrink", func(t *testing.T) { + base := scenarioOptions{algo: pool.NoShrink()} + t.Run("basic", func(t *testing.T) { + t.Run("no concurrency", func(t *testing.T) { + o := base + testBasicSet(t, o) + }) + + t.Run("low concurrency", func(t *testing.T) { + o := base + o.concurrency = 8 + testBasicSet(t, o) + }) + + t.Run("high concurrency", func(t *testing.T) { + o := base + o.concurrency = 256 + testBasicSet(t, o) + }) + }) + + t.Run("cyclic", func(t *testing.T) { + t.Run("no concurrency", func(t *testing.T) { + o := base + testCyclicSet(t, o) + }) + + t.Run("low concurrency", func(t *testing.T) { + o := base + o.concurrency = 8 + testCyclicSet(t, o) + }) + + t.Run("high concurrency", func(t *testing.T) { + o := base + o.concurrency = 256 + testCyclicSet(t, o) + }) + }) + }) + + t.Run("max", func(t *testing.T) { + base := scenarioOptions{algo: pool.Max(60)} + t.Run("basic", func(t *testing.T) { + t.Run("no concurrency", func(t *testing.T) { + o := base + testBasicSet(t, o) + }) + + t.Run("low concurrency", func(t *testing.T) { + o := base + o.concurrency = 8 + testBasicSet(t, o) + }) + + t.Run("high concurrency", func(t *testing.T) { + o := base + o.concurrency = 256 + o.exclude = []string{ + "steady_step_up_small", + "steady_step_up_large", + "slow_rise_from_zero_small", + "slow_rise_from_zero_large", + } + + testBasicSet(t, o) + }) + }) + + t.Run("cyclic", func(t *testing.T) { + t.Run("no concurrency", func(t *testing.T) { + o := base + testCyclicSet(t, o) + }) + + t.Run("low concurrency", func(t *testing.T) { + o := base + o.concurrency = 8 + testCyclicSet(t, o) + }) + + t.Run("high concurrency", func(t *testing.T) { + o := base + o.concurrency = 256 + testCyclicSet(t, o) + }) + }) + }) + + t.Run("to", func(t *testing.T) { + base := scenarioOptions{ + algo: pool.Timeout(300 * time.Millisecond), + minDelay: 10 * time.Millisecond, + maxDelay: 100 * time.Millisecond, + } + + t.Run("basic", func(t *testing.T) { + t.Run("no concurrency", func(t *testing.T) { + o := base + o.exclude = []string{ + "steady_step_up_large", + "slow_rise_from_zero_small", + "slow_rise_from_zero_large", + } + + testBasicSet(t, o) + }) + + t.Run("low concurrency", func(t *testing.T) { + o := base + o.concurrency = 8 + testBasicSet(t, o) + }) + + t.Run("high concurrency", func(t *testing.T) { + o := base + o.concurrency = 256 + testBasicSet(t, o) + }) + }) + + t.Run("cyclic", func(t *testing.T) { + t.Run("no concurrency", func(t *testing.T) { + o := base + o.exclude = []string{"sinus_large"} + testCyclicSet(t, o) + }) + + t.Run("low concurrency", func(t *testing.T) { + o := base + o.concurrency = 8 + testCyclicSet(t, o) + }) + + t.Run("high concurrency", func(t *testing.T) { + o := base + o.concurrency = 256 + testCyclicSet(t, o) + }) + }) + }) + + t.Run("maxto", func(t *testing.T) { + base := scenarioOptions{ + algo: pool.MaxTimeout(60, 300*time.Millisecond), + minDelay: 10 * time.Millisecond, + maxDelay: 100 * time.Millisecond, + } + + t.Run("basic", func(t *testing.T) { + t.Run("no concurrency", func(t *testing.T) { + o := base + o.exclude = []string{ + "steady_step_up_large", + "slow_rise_from_zero_small", + "slow_rise_from_zero_large", + } + + testBasicSet(t, o) + }) + + t.Run("low concurrency", func(t *testing.T) { + o := base + o.concurrency = 8 + testBasicSet(t, o) + }) + + t.Run("high concurrency", func(t *testing.T) { + o := base + o.concurrency = 256 + o.exclude = []string{ + "steady_step_up_large", + "steady_step_up_small", + } + + testBasicSet(t, o) + }) + }) + + t.Run("cyclic", func(t *testing.T) { + t.Run("no concurrency", func(t *testing.T) { + o := base + o.exclude = []string{"sinus_large"} + testCyclicSet(t, o) + }) + + t.Run("low concurrency", func(t *testing.T) { + o := base + o.concurrency = 8 + testCyclicSet(t, o) + }) + + t.Run("high concurrency", func(t *testing.T) { + o := base + o.concurrency = 256 + testCyclicSet(t, o) + }) + }) + }) + + t.Run("external put", func(t *testing.T) { + t.Run("initial", func(t *testing.T) { + const ( + initialCount = 15 + steadyUseCycles = 8 + stepDuration = 30 * time.Millisecond + ) + + clock := times.Test() + o := pool.Options{ + Clock: clock, + Algo: pool.MaxTimeout(15, 300*time.Millisecond), + } + + alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil } + p := pool.Make(alloc, nil, o) + + var active [][]byte + get := func() { + b, err := p.Get() + if err != nil { + t.Fatal(err) + } + + active = append(active, b) + } + + put := func() { + if len(active) == 0 { + t.Fatal("put called from empty active") + } + + var b []byte + b, active = active[0], active[1:] + p.Put(b) + } + + for i := 0; i < initialCount; i++ { + p.Put(make([]byte, 1<<9)) + } + + for i := 0; i < steadyUseCycles; i++ { + get() + clock.Pass(stepDuration) + put() + clock.Pass(stepDuration) + } + + s := p.Stats() + e := pool.Stats{Idle: 1, Active: 0, Get: 8, Put: 23, Alloc: 0, Free: 14} + if s != e { + t.Fatal(s) + } + }) + + t.Run("expect higher load", func(t *testing.T) { + const ( + initialCount = 15 + steadyUseCycles = 8 + adjustCount = 15 + highLoadCycles = 8 + stepDuration = 30 * time.Millisecond + ) + + clock := times.Test() + o := pool.Options{ + Clock: clock, + Algo: pool.MaxTimeout(15, 300*time.Millisecond), + } + + alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil } + p := pool.Make(alloc, nil, o) + + var active [][]byte + get := func() { + b, err := p.Get() + if err != nil { + t.Fatal(err) + } + + active = append(active, b) + } + + put := func() { + if len(active) == 0 { + t.Fatal("put called from empty active") + } + + var b []byte + b, active = active[0], active[1:] + p.Put(b) + } + + for i := 0; i < initialCount; i++ { + get() + } + + for i := 0; i < steadyUseCycles; i++ { + put() + clock.Pass(stepDuration) + get() + clock.Pass(stepDuration) + } + + for i := 0; i < adjustCount; i++ { + p.Put(make([]byte, 1<<9)) + } + + for i := 0; i < highLoadCycles; i++ { + get() + clock.Pass(stepDuration) + get() + clock.Pass(stepDuration) + put() + clock.Pass(stepDuration) + } + + s := p.Stats() + e := pool.Stats{Idle: 1, Active: 8, Get: 39, Put: 31, Alloc: 19, Free: 10} + if s != e { + t.Fatal(s) + } + }) + }) + + t.Run("load", func(t *testing.T) { + t.Run("prewarm", func(t *testing.T) { + const ( + initialCount = 15 + steadyUseCycles = 8 + stepDuration = 30 * time.Millisecond + ) + + clock := times.Test() + o := pool.Options{ + Clock: clock, + Algo: pool.MaxTimeout(15, 300*time.Millisecond), + } + + alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil } + p := pool.Make(alloc, nil, o) + + var active [][]byte + get := func() { + b, err := p.Get() + if err != nil { + t.Fatal(err) + } + + active = append(active, b) + } + + put := func() { + if len(active) == 0 { + t.Fatal("put called from empty active") + } + + var b []byte + b, active = active[0], active[1:] + p.Put(b) + } + + l := make([][]byte, initialCount) + for i := 0; i < initialCount; i++ { + l[i] = make([]byte, 1<<9) + } + + p.Load(l) + for i := 0; i < steadyUseCycles; i++ { + get() + clock.Pass(stepDuration) + put() + clock.Pass(stepDuration) + } + + s := p.Stats() + e := pool.Stats{Idle: 1, Active: 0, Get: 8, Put: 8, Alloc: 0, Free: 14} + if s != e { + t.Fatal(s) + } + }) + + t.Run("expect higher load", func(t *testing.T) { + const ( + initialCount = 15 + steadyUseCycles = 8 + adjustCount = 15 + highLoadCycles = 8 + stepDuration = 30 * time.Millisecond + ) + + clock := times.Test() + o := pool.Options{ + Clock: clock, + Algo: pool.MaxTimeout(15, 300*time.Millisecond), + } + + alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil } + p := pool.Make(alloc, nil, o) + + var active [][]byte + get := func() { + b, err := p.Get() + if err != nil { + t.Fatal(err) + } + + active = append(active, b) + } + + put := func() { + if len(active) == 0 { + t.Fatal("put called from empty active") + } + + var b []byte + b, active = active[0], active[1:] + p.Put(b) + } + + for i := 0; i < initialCount; i++ { + get() + } + + for i := 0; i < steadyUseCycles; i++ { + get() + clock.Pass(stepDuration) + put() + clock.Pass(stepDuration) + } + + l := make([][]byte, adjustCount) + for i := 0; i < adjustCount; i++ { + l[i] = make([]byte, 1<<9) + } + + p.Load(l) + for i := 0; i < highLoadCycles; i++ { + get() + clock.Pass(stepDuration) + get() + clock.Pass(stepDuration) + put() + clock.Pass(stepDuration) + } + + s := p.Stats() + e := pool.Stats{Idle: 1, Active: 23, Get: 39, Put: 16, Alloc: 20, Free: 11} + if s != e { + t.Fatal(s) + } + }) + }) +} diff --git a/pool.go b/pool.go index f548c26..6af6980 100644 --- a/pool.go +++ b/pool.go @@ -146,6 +146,17 @@ func (p pool[R]) put(r R) { } } +func (p pool[R]) load(i []R) { + s := <-p.state + defer func() { + p.state <- s + }() + + s.items = append(s.items, i...) + s.stats.Idle = len(s.items) + p.options.Algo.Load(len(i)) +} + func (p pool[R]) forcedCheck(s state[R], timeout time.Duration) state[R] { if s.forcedCheckPending { return s @@ -153,8 +164,9 @@ func (p pool[R]) forcedCheck(s state[R], timeout time.Duration) state[R] { s.forcedCheckPending = true go func(to time.Duration) { - p.options.TestBus.Signal("background-job-running") - <-p.options.Clock.After(to) + c := p.options.Clock.After(to) + p.options.TestBus.Signal("background-job-waiting") + <-c p.freeIdle() }(timeout) diff --git a/pool_test.go b/pool_test.go index 76cacf6..30e3490 100644 --- a/pool_test.go +++ b/pool_test.go @@ -152,6 +152,20 @@ func TestPool(t *testing.T) { } }) + t.Run("load items", func(t *testing.T) { + p := pool.Make[[]byte](nil, nil, pool.Options{Algo: pool.NoShrink()}) + l := make([][]byte, 9) + for i := 0; i < len(l); i++ { + l[i] = make([]byte, 1<<9) + } + + p.Load(l) + s := p.Stats() + if s.Idle != 9 { + t.Fatal(s) + } + }) + t.Run("release on put no free", func(t *testing.T) { p := pool.Make[[]byte](nil, nil, pool.Options{Algo: pool.Max(2)}) p.Put(make([]byte, 1<<9)) @@ -236,22 +250,21 @@ func TestPool(t *testing.T) { p := pool.Make[[]byte](nil, nil, o) p.Put(make([]byte, 1<<9)) - if err := b.Wait("background-job-running"); err != nil { + if err := b.Wait("background-job-waiting"); err != nil { t.Fatal(err) } c.Pass(2 * time.Millisecond) p.Put(make([]byte, 1<<9)) - if err := b.Wait("background-job-running"); err != nil { + c.Pass(2 * time.Millisecond) + if err := b.Wait("free-idle-done"); err != nil { t.Fatal(err) } - c.Pass(2 * time.Millisecond) - b.Wait("free-idle-done") s := p.Stats() e := pool.Stats{Put: 2, Idle: 1, Free: 1} - if s == e { - return + if s != e { + t.Fatal(s) } }) @@ -268,26 +281,21 @@ func TestPool(t *testing.T) { p := pool.Make[[]byte](nil, f, o) p.Put(make([]byte, 1<<9)) - if err := b.Wait("background-job-running"); err != nil { + if err := b.Wait("background-job-waiting"); err != nil { t.Fatal(err) } c.Pass(2 * time.Millisecond) p.Put(make([]byte, 1<<9)) - if err := b.Wait("background-job-running"); err != nil { + c.Pass(2 * time.Millisecond) + if err := b.Wait("free-idle-done"); err != nil { t.Fatal(err) } - c.Pass(2 * time.Millisecond) - b.Wait("free-idle-done") s := p.Stats() e := pool.Stats{Put: 2, Idle: 1, Free: 1} - if s == e { - if freeCount != 1 { - t.Fatal(freeCount) - } - - return + if s != e || freeCount != 1 { + t.Fatal(s, freeCount) } }) @@ -310,7 +318,7 @@ func TestPool(t *testing.T) { } s := p.Stats() - e := pool.Stats{Alloc: 9, Get: 9, Put: 6, Active: 3, Idle: 3, Free: 3} + e := pool.Stats{Alloc: 9, Get: 9, Put: 6, Active: 3, Idle: 6, Free: 0} if s != e { t.Fatal(s) } diff --git a/scenario_test.go b/scenario_test.go new file mode 100644 index 0000000..f81054d --- /dev/null +++ b/scenario_test.go @@ -0,0 +1,813 @@ +package pool_test + +import ( + "code.squareroundforest.org/arpio/pool" + "code.squareroundforest.org/arpio/times" + "math" + "math/rand" + "strings" + "testing" + "time" +) + +type scenarioOptions struct { + algo pool.Algo + initial int + ops int + deviation int + changeRate int + minDelay time.Duration + maxDelay time.Duration + concurrency int + plot bool + exclude []string +} + +type ( + resource[T any] chan T + active resource[[][]byte] + stats resource[[]pool.Stats] + scenarioStep func(scenarioOptions, *rand.Rand, int, int, func(), func()) + verifyScenario func(*testing.T, scenarioOptions, []pool.Stats) +) + +func initResource[T any]() resource[T] { + var zero T + r := make(resource[T], 1) + r <- zero + return r +} + +func (r resource[T]) apply(f func(T) T) { + v := <-r + defer func() { + r <- v + }() + + v = f(v) +} + +func (a active) count() int { + var c int + resource[[][]byte](a).apply(func(a [][]byte) [][]byte { + c = len(a) + return a + }) + + return c +} + +func (a active) shift() []byte { + var b []byte + resource[[][]byte](a).apply(func(a [][]byte) [][]byte { + if len(a) == 0 { + return a + } + + b, a = a[len(a)-1], a[:len(a)-1] + return a + }) + + return b +} + +func (a active) push(b []byte) { + resource[[][]byte](a).apply(func(a [][]byte) [][]byte { + return append(a, b) + }) +} + +func (s stats) get() []pool.Stats { + var a []pool.Stats + resource[[]pool.Stats](s).apply(func(r []pool.Stats) []pool.Stats { + a = r + return r + }) + + return a +} + +func (s stats) push(a pool.Stats) { + resource[[]pool.Stats](s).apply(func(r []pool.Stats) []pool.Stats { + return append(r, a) + }) +} + +func testScenario(t *testing.T, o scenarioOptions, step scenarioStep, verify verifyScenario) { + for _, n := range o.exclude { + if strings.HasSuffix(t.Name(), n) { + t.Skip() + } + } + + var ( + testClock times.TestClock + clock times.Clock + ) + + if o.concurrency <= 0 { + o.concurrency = 1 + } + + o.initial *= o.concurrency + if o.minDelay > 0 { + if o.maxDelay < o.minDelay { + o.maxDelay = o.minDelay + } + + testClock = times.Test() + clock = testClock + } + + alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil } + po := pool.Options{ + Algo: o.algo, + Clock: clock, + } + + p := pool.Make(alloc, nil, po) + active := active(initResource[[][]byte]()) + stats := stats(initResource[[]pool.Stats]()) + get := func() { + b, err := p.Get() + if err != nil { + t.Fatal(err) + } + + active.push(b) + } + + put := func() { + b := active.shift() + if len(b) == 0 { + return + } + + p.Put(b) + } + + for i := 0; i < o.initial; i++ { + get() + } + + stats.push(p.Stats()) + rnd := rand.New(rand.NewSource(0)) + c := make(chan struct{}, o.concurrency) + iter := func(i int, localClock times.TestClock) { + if o.minDelay > 0 { + d := o.minDelay + if o.maxDelay > o.minDelay { + diff := o.maxDelay - o.minDelay + rdiff := rand.Intn(int(diff)) + d += time.Duration(rdiff) + } + + localClock.Pass(d) + testClock.Jump(localClock.Now()) + } + + step(o, rnd, i, active.count(), get, put) + stats.push(p.Stats()) + } + + for i := 0; i < o.concurrency; i++ { + go func() { + var localClock times.TestClock + if o.minDelay > 0 { + localClock = times.Test() + } + + for i := 0; i < o.ops; i++ { + iter(i, localClock) + } + + c <- struct{}{} + }() + } + + for i := 0; i < o.concurrency; i++ { + <-c + } + + if o.plot { + for i, s := range stats.get() { + t.Log(i, s) + } + } + + if verify != nil { + verify(t, o, stats.get()) + } +} + +func testCyclicScenario(t *testing.T, o scenarioOptions, step scenarioStep, verify verifyScenario) { + cyclicStep := func(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) { + o.ops = o.ops / 4 + i = i % o.ops + step(o, rnd, i, active, get, put) + } + + testScenario(t, o, cyclicStep, verify) +} + +func steadyStep(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func(), target int) { + switch { + case active > target: + put() + case active < target-o.deviation: + get() + default: + if rnd.Intn(2) == 1 { + get() + } else { + put() + } + } +} + +func steady(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) { + steadyStep(o, rnd, i, active, get, put, o.initial) +} + +func jumpStep(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) { + get() +} + +func dropStep(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) { + put() +} + +func steadyStepUp(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) { + switch { + case i < o.ops/3: + steadyStep(o, rnd, i, active, get, put, o.initial) + case i >= o.ops/3 && i < 2*o.ops/3: + jumpStep(o, rnd, i, active, get, put) + default: + steadyStep(o, rnd, i, active, get, put, o.initial/3) + } +} + +func steadyStepDown(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) { + switch { + case i < o.ops/3: + steadyStep(o, rnd, i, active, get, put, o.initial) + case i >= o.ops/3 && i < 2*o.ops/3: + dropStep(o, rnd, i, active, get, put) + default: + steadyStep(o, rnd, i, active, get, put, o.initial/3) + } +} + +func steadyStepDownToZero(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) { + switch { + case i < o.ops/3: + steadyStep(o, rnd, i, active, get, put, o.initial) + case i >= o.ops/3 && i < 2*o.ops/3: + dropStep(o, rnd, i, active, get, put) + default: + steadyStep(o, rnd, i, active, get, put, 0) + } +} + +func change(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) { + cr := o.changeRate + inc := get + dec := put + if cr < 0 { + cr = 0 - cr + inc, dec = put, get + } + + v := rnd.Intn(cr + 2) + if v == 0 { + dec() + return + } + + inc() +} + +func changeAndJump(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) { + var f scenarioStep + switch { + case 3*i < 2*o.ops: + f = change + default: + f = jumpStep + } + + f(o, rnd, i, active, get, put) +} + +func cyclicSpikes(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) { + var f scenarioStep + switch { + case 3*i < o.ops: + o.initial = active + f = steady + case 3*i < 2*o.ops: + f = jumpStep + default: + f = dropStep + } + + f(o, rnd, i, active, get, put) +} + +func cyclicDrops(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) { + var f scenarioStep + switch { + case 3*i < o.ops: + o.initial = active + f = steady + case 3*i < 2*o.ops: + f = dropStep + default: + f = jumpStep + } + + f(o, rnd, i, active, get, put) +} + +func sigmaSteps(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) { + var f scenarioStep + switch { + case 6*i < o.ops: + f = jumpStep + case 2*i < o.ops: + o.initial = active + f = steady + case 3*i < 2*o.ops: + f = dropStep + default: + o.initial = active + f = steady + } + + f(o, rnd, i, active, get, put) +} + +func chainSaw(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) { + var f scenarioStep + switch { + case 3*i < o.ops: + f = change + case 2*i < o.ops: + f = dropStep + case 6*i < 5*o.ops: + f = change + default: + f = dropStep + } + + f(o, rnd, i, active, get, put) +} + +func inverseChainSaw(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) { + var f scenarioStep + switch { + case 6*i < o.ops: + f = jumpStep + case 2*i < o.ops: + f = change + case 3*i < 2*o.ops: + f = jumpStep + default: + f = change + } + + f(o, rnd, i, active, get, put) +} + +func sinusStep(o scenarioOptions, rnd *rand.Rand, i, active int, get, put func()) { + cycle := float64(o.ops) + amp := cycle / 4 + x := float64(i) + sin := amp * math.Sin(x*2*math.Pi/cycle) + target := o.initial + int(sin) + if target < 0 { + target = 0 + } + + delta := target - active + absDelta := delta + if absDelta < 0 { + absDelta = 0 - absDelta + } + + absR := rnd.Intn(o.deviation + absDelta) + r := absR - o.deviation + + var grow bool + switch { + case delta >= 0 && r >= 0: + grow = true + case delta <= 0 && r < 0: + grow = true + } + + f := put + if grow { + f = get + } + + f() +} + +func noopVerify(*testing.T, scenarioOptions, []pool.Stats) {} + +func verifySteady(t *testing.T, o scenarioOptions, s []pool.Stats) { + for i, stats := range s { + if 2*stats.Idle > 3*o.deviation*o.concurrency { + t.Fatal(i, stats) + } + } +} + +func verifyAllocRate(t *testing.T, o scenarioOptions, s []pool.Stats) { + if len(s) == 0 { + t.Fatal("no stats") + } + + last := s[len(s)-1] + if (last.Alloc-o.initial)*3 > last.Get*2 { + t.Fatal("too many allocations", last) + } +} + +func verifyAllocRateLax(t *testing.T, o scenarioOptions, s []pool.Stats) { + if len(s) == 0 { + t.Fatal("no stats") + } + + last := s[len(s)-1] + if (last.Alloc-o.initial)*10 > last.Get*9 { + t.Fatal("too many allocations", last) + } +} + +func verifyDealloc(t *testing.T, o scenarioOptions, s []pool.Stats) { + if len(s) == 0 { + t.Fatal("no stats") + } + + last := s[len(s)-1] + + if last.Free > o.deviation*o.concurrency { + t.Fatal("too many deallocations", last) + } +} + +func testSteadyUsage(t *testing.T, o scenarioOptions) { + testScenario(t, o, steady, verifySteady) +} + +func testSteadyStepUp(t *testing.T, o scenarioOptions) { + testScenario(t, o, steadyStepUp, verifyAllocRate) +} + +func testSteadyStepDown(t *testing.T, o scenarioOptions) { + testScenario(t, o, steadyStepDown, verifyAllocRate) +} + +func testSteadyStepDownToZero(t *testing.T, o scenarioOptions) { + testScenario(t, o, steadyStepDownToZero, verifyAllocRate) +} + +func testChange(t *testing.T, o scenarioOptions) { + verify := verifyDealloc + if o.changeRate < 0 { + verify = verifyAllocRate + } + + testScenario(t, o, change, verify) +} + +func testChangeAndJump(t *testing.T, o scenarioOptions) { + testScenario(t, o, changeAndJump, verifyAllocRate) +} + +func testCyclicSpikes(t *testing.T, o scenarioOptions) { + verify := noopVerify + if o.initial > 0 { + verify = verifyAllocRate + } + + testCyclicScenario(t, o, cyclicSpikes, verify) +} + +func testCyclicDrops(t *testing.T, o scenarioOptions) { + testCyclicScenario(t, o, cyclicDrops, verifyAllocRate) +} + +func testCyclicSigmaSteps(t *testing.T, o scenarioOptions) { + testCyclicScenario(t, o, sigmaSteps, verifyAllocRate) +} + +func testCyclicChainSaw(t *testing.T, o scenarioOptions) { + testCyclicScenario(t, o, chainSaw, verifyAllocRate) +} + +func testCyclicInverseChainSaw(t *testing.T, o scenarioOptions) { + testCyclicScenario(t, o, inverseChainSaw, verifyAllocRateLax) +} + +func testCyclicSinus(t *testing.T, o scenarioOptions) { + o.initial += o.ops / 16 // a single cycle is o.ops / 4, the amp in sinusStep is cycle ops / 4 + testCyclicScenario(t, o, sinusStep, verifyAllocRate) +} + +func testBasicSet(t *testing.T, base scenarioOptions) { + t.Run("steady small", func(t *testing.T) { + o := base + o.initial = 8 + o.ops = 60 + o.deviation = 2 + testSteadyUsage(t, o) + }) + + t.Run("steady large", func(t *testing.T) { + o := base + o.initial = 60 + o.ops = 1200 + o.deviation = 10 + testSteadyUsage(t, o) + }) + + t.Run("steady step up small", func(t *testing.T) { + o := base + o.initial = 8 + o.ops = 60 + o.deviation = 2 + testSteadyStepUp(t, o) + }) + + t.Run("steady step up large", func(t *testing.T) { + o := base + o.initial = 60 + o.ops = 1200 + o.deviation = 10 + testSteadyStepUp(t, o) + }) + + t.Run("steady step down small", func(t *testing.T) { + o := base + o.initial = 20 + o.ops = 60 + o.deviation = 2 + testSteadyStepDown(t, o) + }) + + t.Run("steady step down large", func(t *testing.T) { + o := base + o.initial = 450 + o.ops = 1200 + o.deviation = 10 + testSteadyStepDown(t, o) + }) + + t.Run("steady step down small to zero", func(t *testing.T) { + o := base + o.initial = 20 + o.ops = 60 + o.deviation = 2 + testSteadyStepDownToZero(t, o) + }) + + t.Run("steady step down large to zero", func(t *testing.T) { + o := base + o.initial = 450 + o.ops = 1200 + o.deviation = 10 + testSteadyStepDownToZero(t, o) + }) + + t.Run("slow rise from zero small", func(t *testing.T) { + o := base + o.ops = 60 + o.changeRate = 1 + o.deviation = 3 + testChange(t, o) + }) + + t.Run("slow rise from zero large", func(t *testing.T) { + o := base + o.ops = 1200 + o.changeRate = 1 + o.deviation = 10 + testChange(t, o) + }) + + t.Run("slow decrease to zero small", func(t *testing.T) { + o := base + o.initial = 15 + o.ops = 60 + o.changeRate = -1 + o.deviation = 3 + testChange(t, o) + }) + + t.Run("slow decrease to zero large", func(t *testing.T) { + o := base + o.initial = 300 + o.ops = 1200 + o.changeRate = -1 + o.deviation = 10 + testChange(t, o) + }) + + t.Run("slow decrease and jump small", func(t *testing.T) { + o := base + o.initial = 15 + o.ops = 60 + o.changeRate = -2 + o.deviation = 3 + testChangeAndJump(t, o) + }) + + t.Run("slow decrease and jump large", func(t *testing.T) { + o := base + o.initial = 300 + o.ops = 1200 + o.changeRate = -2 + o.deviation = 10 + testChangeAndJump(t, o) + }) +} + +func testCyclicSet(t *testing.T, base scenarioOptions) { + t.Run("cyclic spikes from zero small", func(t *testing.T) { + o := base + o.ops = 300 + o.deviation = 3 + testCyclicSpikes(t, o) + }) + + t.Run("cyclic spikes from zero large", func(t *testing.T) { + o := base + o.ops = 6000 + o.deviation = 10 + testCyclicSpikes(t, o) + }) + + t.Run("steady and cyclic spikes small", func(t *testing.T) { + o := base + o.initial = 30 + o.ops = 300 + o.deviation = 3 + testCyclicSpikes(t, o) + }) + + t.Run("steady and cyclic spikes large", func(t *testing.T) { + o := base + o.initial = 300 + o.ops = 6000 + o.deviation = 10 + testCyclicSpikes(t, o) + }) + + t.Run("steady and cyclic drops to zero small", func(t *testing.T) { + o := base + o.initial = 30 + o.ops = 300 + o.deviation = 3 + testCyclicDrops(t, o) + }) + + t.Run("steady and cyclic drops to zero large", func(t *testing.T) { + o := base + o.initial = 420 + o.ops = 6000 + o.deviation = 10 + testCyclicDrops(t, o) + }) + + t.Run("sigma steps from zero small", func(t *testing.T) { + o := base + o.ops = 300 + o.deviation = 3 + testCyclicSigmaSteps(t, o) + }) + + t.Run("sigma steps from zero large", func(t *testing.T) { + o := base + o.ops = 6000 + o.deviation = 10 + testCyclicSigmaSteps(t, o) + }) + + t.Run("sigma steps small", func(t *testing.T) { + o := base + o.initial = 60 + o.ops = 300 + o.deviation = 3 + testCyclicSigmaSteps(t, o) + }) + + t.Run("sigma steps large", func(t *testing.T) { + o := base + o.initial = 600 + o.ops = 6000 + o.deviation = 10 + testCyclicSigmaSteps(t, o) + }) + + t.Run("chain saw from zero small", func(t *testing.T) { + o := base + o.ops = 300 + o.deviation = 3 + o.changeRate = 2 + testCyclicChainSaw(t, o) + }) + + t.Run("chain saw from zero large", func(t *testing.T) { + o := base + o.ops = 6000 + o.deviation = 3 + o.changeRate = 2 + testCyclicChainSaw(t, o) + }) + + t.Run("chain saw small", func(t *testing.T) { + o := base + o.initial = 60 + o.ops = 300 + o.deviation = 3 + o.changeRate = 2 + testCyclicChainSaw(t, o) + }) + + t.Run("chain saw large", func(t *testing.T) { + o := base + o.initial = 600 + o.ops = 6000 + o.deviation = 3 + o.changeRate = 2 + testCyclicChainSaw(t, o) + }) + + t.Run("inverse chain saw from zero small", func(t *testing.T) { + o := base + o.ops = 300 + o.deviation = 3 + o.changeRate = -3 + testCyclicInverseChainSaw(t, o) + }) + + t.Run("inverse chain saw from zero large", func(t *testing.T) { + o := base + o.ops = 6000 + o.deviation = 3 + o.changeRate = -3 + testCyclicInverseChainSaw(t, o) + }) + + t.Run("inverse chain saw small", func(t *testing.T) { + o := base + o.initial = 60 + o.ops = 300 + o.deviation = 3 + o.changeRate = -3 + testCyclicInverseChainSaw(t, o) + }) + + t.Run("inverse chain saw large", func(t *testing.T) { + o := base + o.initial = 600 + o.ops = 6000 + o.deviation = 3 + o.changeRate = -3 + testCyclicInverseChainSaw(t, o) + }) + + t.Run("sinus to zero small", func(t *testing.T) { + o := base + o.ops = 300 + o.deviation = 3 + testCyclicSinus(t, o) + }) + + t.Run("sinus to zero large", func(t *testing.T) { + o := base + o.ops = 6000 + o.deviation = 10 + testCyclicSinus(t, o) + }) + + t.Run("sinus small", func(t *testing.T) { + o := base + o.initial = 15 + o.ops = 300 + o.deviation = 3 + testCyclicSinus(t, o) + }) + + t.Run("sinus large", func(t *testing.T) { + o := base + o.initial = 60 + o.ops = 6000 + o.deviation = 10 + testCyclicSinus(t, o) + }) +}