1
0

support small sporadic traffic

This commit is contained in:
Arpad Ryszka 2026-03-15 17:18:20 +01:00
parent 389e91147c
commit 2866936856
4 changed files with 232 additions and 53 deletions

View File

@ -8,21 +8,22 @@ import (
const ( const (
// arbitrary values to be most likely out of sync with anything else: // arbitrary values to be most likely out of sync with anything else:
minNightshiftTime = 729 * time.Millisecond // ~1sec minNightshiftTime = 6561 * time.Millisecond // ~6sec
maxNightshiftTime = 59049 * time.Second // ~2/3day maxNightshiftTime = 59049 * time.Second // ~2/3day
) )
type adaptive struct { type adaptive struct {
clock times.Clock clock times.Clock
activeTime time.Time prevState Stats
nsTO time.Duration initialized bool
idle bool activeStart time.Time
average float64 activeEnd time.Time
deviation float64 average float64
deviation float64
} }
func makeAdaptiveAlgo() *adaptive { func makeAdaptiveAlgo() *adaptive {
return &adaptive{idle: true} return &adaptive{}
} }
func (a *adaptive) setClock(c times.Clock) { func (a *adaptive) setClock(c times.Clock) {
@ -49,38 +50,198 @@ func (a *adaptive) target(s Stats) int {
return int(targetCapacity(av, dev)) return int(targetCapacity(av, dev))
} }
func (a *adaptive) nightshift(s Stats) time.Duration { func (a *adaptive) nightshiftTO() time.Duration {
if a.idle && s.Active == 0 { to := a.activeEnd.Sub(a.activeStart)
return a.nsTO to = (3 * to) / 8
if to < minNightshiftTime {
to = minNightshiftTime
} }
if !a.idle && s.Active > 0 { if to > maxNightshiftTime {
to = maxNightshiftTime
}
return to
}
// ensure that a single client does not get freed if using it
// ensure that eventually gets collected if not using it
// ensure that the background job is not running forever
// handle load as well
func (a *adaptive) nightshift(s Stats, now time.Time) time.Duration {
// idle, inactive, uninitialized => not idle, initialize, 0
// idle, inactive, initialized => to
// idle, active, uninitialized => not idle, initialize, 0
// idle, active, initialized => not idle, initialize, 0
// not idle, inactive, uninitialized => not idle, initialize, 0
// not idle, inactive, initialized => idle, new to
// not idle, active, uninitialized => not idle, initialize, 0
// not idle, active, initialized => 0
// idle := a.idle
// active := s.Active > 0
// initialized := !a.activeTime.IsZero()
// if !initialized {
// a.idle = false
// a.activeTime = now
// return 0
// }
// if idle && !active {
// return a.nsTO
// }
// if idle {
// a.idle = false
// a.nsTO = 0
// a.activeTime = now
// return 0
// }
// if active {
// return 0
// }
// a.idle = true
// a.nsTO = nightshiftTO(a.activeTime, now)
// return a.nsTO
// --
// state flags:
// - initialized
// - pempty
// - pactive
// - empty
// - active
// actions:
// - start active
// - call to
// - update prev state on every call
// states:
// X not initialized, not pempty, not pactive, not empty, not active
// X not initialized, not pempty, not pactive, not empty, active
// X not initialized, not pempty, not pactive, empty, not active
// X not initialized, not pempty, not pactive, empty, active
// X not initialized, not pempty, pactive, not empty, not active
// X not initialized, not pempty, pactive, not empty, active
// X not initialized, not pempty, pactive, empty, not active
// X not initialized, not pempty, pactive, empty, active
// * not initialized, pempty, not pactive, not empty, not active => start active, call to
// * not initialized, pempty, not pactive, not empty, active => start active
// X not initialized, pempty, not pactive, empty, not active
// X not initialized, pempty, not pactive, empty, active
// X not initialized, pempty, pactive, not empty, not active
// X not initialized, pempty, pactive, not empty, active
// X not initialized, pempty, pactive, empty, not active
// X not initialized, pempty, pactive, empty, active
// * initialized, not pempty, not pactive, not empty, not active => end active, call to
// * initialized, not pempty, not pactive, not empty, active => start active
// * initialized, not pempty, not pactive, empty, not active => noop
// * initialized, not pempty, not pactive, empty, active => start active
// * initialized, not pempty, pactive, not empty, not active => end active, call to
// * initialized, not pempty, pactive, not empty, active => noop
// * initialized, not pempty, pactive, empty, not active => end active, noop
// * initialized, not pempty, pactive, empty, active => noop
// * initialized, pempty, not pactive, not empty, not active => call to
// * initialized, pempty, not pactive, not empty, active => start active
// * initialized, pempty, not pactive, empty, not active => noop
// * initialized, pempty, not pactive, empty, active => start active
// * initialized, pempty, pactive, not empty, not active => end active, call to
// * initialized, pempty, pactive, not empty, active => noop
// - initialized, pempty, pactive, empty, not active => end active, noop
// * initialized, pempty, pactive, empty, active => noop
pempty := a.prevState.Idle == 0
pactive := a.prevState.Active > 0
empty := s.Idle == 0
active := s.Active > 0
a.prevState = s
// not initialized, pempty, not pactive, not empty, not active => start active, call to
if !a.initialized && pempty && !pactive && !empty && !active {
a.initialized = true
a.activeStart = now
return a.nightshiftTO()
}
// not initialized, pempty, not pactive, not empty, active => start active
if !a.initialized && pempty && !pactive && !empty && active {
a.initialized = true
a.activeStart = now
return 0 return 0
} }
now := a.clock.Now() if !a.initialized {
a.idle = !a.idle
if !a.idle {
a.activeTime = now
return 0 return 0
} }
a.nsTO = now.Sub(a.activeTime) // initialized, not pempty, not pactive, not empty, not active => end active, call to
a.nsTO = (3 * a.nsTO) / 8 if !pempty && !pactive && !empty && !active {
if a.nsTO < minNightshiftTime { ns := a.nightshiftTO()
a.nsTO = minNightshiftTime return ns
} }
if a.nsTO > maxNightshiftTime { // initialized, not pempty, not pactive, not empty, active => start active
a.nsTO = maxNightshiftTime // initialized, not pempty, not pactive, empty, active => start active
if !pempty && !pactive && active {
a.activeStart = now
return 0
} }
return a.nsTO // initialized, not pempty, pactive, not empty, not active => end active, call to
if !pempty && pactive && !empty && !active {
a.activeEnd = now
ns := a.nightshiftTO()
return ns
}
// initialized, not pempty, pactive, empty, not active => end active, noop
if !pempty && pactive && empty && !active {
a.activeEnd = now
return 0
}
// initialized, pempty, not pactive, not empty, not active => call to
if pempty && !pactive && !empty && !active {
return a.nightshiftTO()
}
// initialized, pempty, not pactive, not empty, active => start active
// initialized, pempty, not pactive, empty, active => start active
if pempty && !pactive && active {
a.activeStart = now
return 0
}
// initialized, pempty, pactive, not empty, not active => end active, call to
if pempty && pactive && !empty && !active {
a.activeEnd = now
return a.nightshiftTO()
}
// initialized, pempty, pactive, empty, not active => end active, noop
if pempty && pactive && empty && !active {
a.activeEnd = now
return 0
}
return 0
} }
func (a *adaptive) Target(s Stats) (int, time.Duration) { func (a *adaptive) Target(s Stats) (int, time.Duration) {
t := a.target(s) t := a.target(s) // handle when t < 2
ns := a.nightshift(s)
// magic number 2: we allow max 2 idle items to be collected only by the nightshift, to provide better
// support for sporadic requests, when it's active or just going inactive:
if t < 2 && a.activeStart.After(a.activeEnd) {
t = 2
}
// TODO: optimize, only take the clock when necessary
ns := a.nightshift(s, a.clock.Now())
return t, ns return t, ns
} }
@ -88,4 +249,7 @@ func (a *adaptive) Load(n int) {
// we lie to the algorithm when adding the additional idle count to the average active count. This way // we lie to the algorithm when adding the additional idle count to the average active count. This way
// we can adjust the calculated target capacity: // we can adjust the calculated target capacity:
a.average += float64(n) a.average += float64(n)
s := a.prevState
s.Idle += n
a.nightshift(s, a.clock.Now())
} }

View File

@ -45,7 +45,7 @@ func TestAdaptive(t *testing.T) {
variationStepTime = 10 * time.Millisecond variationStepTime = 10 * time.Millisecond
dropStepTime = time.Millisecond dropStepTime = time.Millisecond
waitCycles = 36 waitCycles = 36
waitTime = time.Second waitTime = 9 * time.Second
) )
bus := syncbus.New(time.Second) bus := syncbus.New(time.Second)
@ -111,6 +111,9 @@ func TestAdaptive(t *testing.T) {
} }
bus.ResetSignals("free-idle-done") bus.ResetSignals("free-idle-done")
if p.Stats().Idle == 0 {
break
}
} }
if p.Stats().Idle != 0 { if p.Stats().Idle != 0 {
@ -158,7 +161,7 @@ func TestAdaptive(t *testing.T) {
} }
s := p.Stats() s := p.Stats()
e := pool.Stats{Idle: 0, Active: 0, Get: 8, Put: 23, Alloc: 8, Free: 23} e := pool.Stats{Idle: 2, Active: 0, Get: 8, Put: 23, Alloc: 0, Free: 13}
if s != e { if s != e {
t.Fatal(s) t.Fatal(s)
} }

View File

@ -8,8 +8,10 @@ import (
) )
func TestMaxTO(t *testing.T) { func TestMaxTO(t *testing.T) {
maxTOBase := scenarioOptions{exclude: []string{"steady_minimal"}}
t.Run("noshrink", func(t *testing.T) { t.Run("noshrink", func(t *testing.T) {
base := scenarioOptions{algo: pool.NoShrink()} base := maxTOBase
base.algo = pool.NoShrink()
t.Run("basic", func(t *testing.T) { t.Run("basic", func(t *testing.T) {
t.Run("no concurrency", func(t *testing.T) { t.Run("no concurrency", func(t *testing.T) {
o := base o := base
@ -50,7 +52,8 @@ func TestMaxTO(t *testing.T) {
}) })
t.Run("max", func(t *testing.T) { t.Run("max", func(t *testing.T) {
base := scenarioOptions{algo: pool.Max(60)} base := maxTOBase
base.algo = pool.Max(60)
t.Run("basic", func(t *testing.T) { t.Run("basic", func(t *testing.T) {
t.Run("no concurrency", func(t *testing.T) { t.Run("no concurrency", func(t *testing.T) {
o := base o := base
@ -66,12 +69,13 @@ func TestMaxTO(t *testing.T) {
t.Run("high concurrency", func(t *testing.T) { t.Run("high concurrency", func(t *testing.T) {
o := base o := base
o.concurrency = 256 o.concurrency = 256
o.exclude = []string{ o.exclude = append(
o.exclude,
"steady_step_up_small", "steady_step_up_small",
"steady_step_up_large", "steady_step_up_large",
"slow_rise_from_zero_small", "slow_rise_from_zero_small",
"slow_rise_from_zero_large", "slow_rise_from_zero_large",
} )
testBasicSet(t, o) testBasicSet(t, o)
}) })
@ -98,20 +102,19 @@ func TestMaxTO(t *testing.T) {
}) })
t.Run("to", func(t *testing.T) { t.Run("to", func(t *testing.T) {
base := scenarioOptions{ base := maxTOBase
algo: pool.Timeout(300 * time.Millisecond), base.algo = pool.Timeout(300 * time.Millisecond)
minDelay: 10 * time.Millisecond, base.minDelay = 10 * time.Millisecond
maxDelay: 100 * time.Millisecond, base.maxDelay = 100 * time.Millisecond
}
t.Run("basic", func(t *testing.T) { t.Run("basic", func(t *testing.T) {
t.Run("no concurrency", func(t *testing.T) { t.Run("no concurrency", func(t *testing.T) {
o := base o := base
o.exclude = []string{ o.exclude = append(
o.exclude,
"steady_step_up_large", "steady_step_up_large",
"slow_rise_from_zero_small", "slow_rise_from_zero_small",
"slow_rise_from_zero_large", "slow_rise_from_zero_large",
} )
testBasicSet(t, o) testBasicSet(t, o)
}) })
@ -132,7 +135,7 @@ func TestMaxTO(t *testing.T) {
t.Run("cyclic", func(t *testing.T) { t.Run("cyclic", func(t *testing.T) {
t.Run("no concurrency", func(t *testing.T) { t.Run("no concurrency", func(t *testing.T) {
o := base o := base
o.exclude = []string{"sinus_large"} o.exclude = append(o.exclude, "sinus_large")
testCyclicSet(t, o) testCyclicSet(t, o)
}) })
@ -151,20 +154,19 @@ func TestMaxTO(t *testing.T) {
}) })
t.Run("maxto", func(t *testing.T) { t.Run("maxto", func(t *testing.T) {
base := scenarioOptions{ base := maxTOBase
algo: pool.MaxTimeout(60, 300*time.Millisecond), base.algo = pool.MaxTimeout(60, 300*time.Millisecond)
minDelay: 10 * time.Millisecond, base.minDelay = 10 * time.Millisecond
maxDelay: 100 * time.Millisecond, base.maxDelay = 100 * time.Millisecond
}
t.Run("basic", func(t *testing.T) { t.Run("basic", func(t *testing.T) {
t.Run("no concurrency", func(t *testing.T) { t.Run("no concurrency", func(t *testing.T) {
o := base o := base
o.exclude = []string{ o.exclude = append(
o.exclude,
"steady_step_up_large", "steady_step_up_large",
"slow_rise_from_zero_small", "slow_rise_from_zero_small",
"slow_rise_from_zero_large", "slow_rise_from_zero_large",
} )
testBasicSet(t, o) testBasicSet(t, o)
}) })
@ -178,10 +180,12 @@ func TestMaxTO(t *testing.T) {
t.Run("high concurrency", func(t *testing.T) { t.Run("high concurrency", func(t *testing.T) {
o := base o := base
o.concurrency = 256 o.concurrency = 256
o.exclude = []string{ o.exclude = append(
o.exclude,
"steady_step_up_large", "steady_step_up_large",
"steady_step_up_small", "steady_step_up_small",
} "slow_rise_from_zero_large",
)
testBasicSet(t, o) testBasicSet(t, o)
}) })
@ -190,7 +194,7 @@ func TestMaxTO(t *testing.T) {
t.Run("cyclic", func(t *testing.T) { t.Run("cyclic", func(t *testing.T) {
t.Run("no concurrency", func(t *testing.T) { t.Run("no concurrency", func(t *testing.T) {
o := base o := base
o.exclude = []string{"sinus_large"} o.exclude = append(o.exclude, "sinus_large")
testCyclicSet(t, o) testCyclicSet(t, o)
}) })

View File

@ -419,7 +419,7 @@ func noopVerify(*testing.T, scenarioOptions, []pool.Stats) {}
func verifySteady(t *testing.T, o scenarioOptions, s []pool.Stats) { func verifySteady(t *testing.T, o scenarioOptions, s []pool.Stats) {
for i, stats := range s { for i, stats := range s {
if 2*stats.Idle > 3*o.deviation*o.concurrency { if 2*stats.Idle > 3*(o.deviation+1)*o.concurrency {
t.Fatal(i, stats) t.Fatal(i, stats)
} }
} }
@ -519,6 +519,14 @@ func testCyclicSinus(t *testing.T, o scenarioOptions) {
} }
func testBasicSet(t *testing.T, base scenarioOptions) { func testBasicSet(t *testing.T, base scenarioOptions) {
t.Run("steady minimal", func(t *testing.T) {
o := base
o.initial = 1
o.ops = 60
o.deviation = 1
testSteadyUsage(t, o)
})
t.Run("steady small", func(t *testing.T) { t.Run("steady small", func(t *testing.T) {
o := base o := base
o.initial = 8 o.initial = 8