1
0

allow handling multiple resource items at a time

This commit is contained in:
Arpad Ryszka 2026-04-15 18:31:44 +02:00
parent aa169bc832
commit 087b58e5dd
6 changed files with 192 additions and 141 deletions

View File

@ -50,11 +50,10 @@ func TestAdaptive(t *testing.T) {
bus := syncbus.New(time.Second)
clock := times.Test()
o := pool.Options{
Clock: clock,
TestBus: bus,
}
var o pool.Options
o.Testing.Clock = clock
o.Testing.Bus = bus
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
p := pool.Make(alloc, nil, o)

58
lib.go
View File

@ -139,13 +139,16 @@ type Options struct {
// Algo is the algorithm implementation used for shrinking the pool. The default is Adaptive().
Algo Algo
// Clock is an optional clock meant to be used with testing. The main purpose is to avoid time sensitive
// tests running for a too long time.
Clock times.Clock
Testing struct {
// TestBus is an optional signal bus meant to be used with testing. The main purpose is to ensure that
// specific blocks of code are executed in a predefined order during concurrent tests.
TestBus *syncbus.SyncBus
// Clock is an optional clock meant to be used with testing. The main purpose is to avoid time sensitive
// tests running for a too long time.
Clock times.Clock
// Bus is an optional signal bus meant to be used with testing. The main purpose is to ensure that
// specific blocks of code are executed in a predefined order during concurrent tests.
Bus *syncbus.SyncBus
}
}
// Pool is a synchronized pool of resources that are considered expensive to allocate. Initialize the pool with
@ -155,8 +158,9 @@ type Pool[R any] struct {
pool pool[R]
}
// ErrEmpty is returned on Get calls when the pool is empty and it was initialized without an allocate function.
var ErrEmpty = errors.New("empty pool")
// ErrNoItems is returned on Get calls when the pool does not enough items and it was initialized without an
// allocate function.
var ErrNoItems = errors.New("not enough items in the pool")
// String returns the string representation of the EventType binary flag, including all the flags that are set.
func (et EventType) String() string {
@ -273,30 +277,50 @@ func Make[R any](alloc func() (R, error), free func(R), o Options) Pool[R] {
return Pool[R]{pool: makePool(alloc, free, o)}
}
// GetN returns n items from the pool. If the pool is empty and no allocation function was configured, it
// returns ErrEmpty. If the pool has less than n items and no allocation function was configured, it returns the
// available items and ErrEtmpy. If the pool is empty or has less than the requested items, and the allocation
// function returns an error, it returns the available items and that error. If events were configured, GetN
// triggers a GetOperation event.
func (p Pool[R]) GetN(n int) ([]R, error) {
return p.pool.get(n)
}
// Get returns an item from the pool. If the pool is empty and no allocation function was configured, it returns
// ErrEmpty. If the pool is empty, and the allocation function returns an error, it returns that error. If
// events were configured, Get triggers a GetOperation event.
func (p Pool[R]) Get() (R, error) {
return p.pool.get()
r, err := p.pool.get(1)
if err != nil {
var rr R
return rr, err
}
return r[0], nil
}
// Put stores an item in the pool. If events were configured, it triggers a PutOperation event.
// Put stores one or more items in the pool. If events were configured, it triggers a PutOperation event.
//
// It is recommended to use it only with items that were received by the Get method. While it is allowed to put
// other items in the pool, it may change the way the shrinking algorithm works. E.g. it can be considered as a
// sudden drop in the number of active items. If the pool needs to be prewarmed, or prepared for an expected
// spike of traffic, consider using the Load method.
func (p Pool[R]) Put(i R) {
p.pool.put(i)
func (p Pool[R]) Put(i ...R) {
p.pool.put(i...)
}
// Drop indicates to the pool that an item received by Get was dropped and will not be put back into the pool.
// DropN indicates to the pool that N items received by Get were dropped and will not be put back into the pool.
//
// While it's not mandatory call it when item was dropped, it has a twofold purpose. Depending on the shrinking
// algorithm, it may work more consistently, if the algorithm is notified about the change in the active items.
// The other purpose is to allow the pool to reflect these cases in the events and the stats.
// While it's not mandatory call it when some items were dropped, it has a twofold purpose. Depending on the
// shrinking algorithm, it may work more consistently, if the algorithm is notified about the change in the
// active items. The other purpose is to allow the pool to reflect these cases in the events and the stats.
func (p Pool[R]) DropN(n int) {
p.pool.drop(n)
}
// Drop is like DropN, but for a single item.
func (p Pool[R]) Drop() {
p.pool.drop()
p.pool.drop(1)
}
// Load can be used to populate the pool with items that were not allocated as the result of the Get operation.

View File

@ -221,11 +221,10 @@ func TestMaxTO(t *testing.T) {
)
clock := times.Test()
o := pool.Options{
Clock: clock,
Algo: pool.MaxTimeout(15, 300*time.Millisecond),
}
var o pool.Options
o.Algo = pool.MaxTimeout(15, 300*time.Millisecond)
o.Testing.Clock = clock
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
p := pool.Make(alloc, nil, o)
@ -277,11 +276,10 @@ func TestMaxTO(t *testing.T) {
)
clock := times.Test()
o := pool.Options{
Clock: clock,
Algo: pool.MaxTimeout(15, 300*time.Millisecond),
}
var o pool.Options
o.Algo = pool.MaxTimeout(15, 300*time.Millisecond)
o.Testing.Clock = clock
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
p := pool.Make(alloc, nil, o)
@ -346,11 +344,10 @@ func TestMaxTO(t *testing.T) {
)
clock := times.Test()
o := pool.Options{
Clock: clock,
Algo: pool.MaxTimeout(15, 300*time.Millisecond),
}
var o pool.Options
o.Algo = pool.MaxTimeout(15, 300*time.Millisecond)
o.Testing.Clock = clock
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
p := pool.Make(alloc, nil, o)
@ -404,11 +401,10 @@ func TestMaxTO(t *testing.T) {
)
clock := times.Test()
o := pool.Options{
Clock: clock,
Algo: pool.MaxTimeout(15, 300*time.Millisecond),
}
var o pool.Options
o.Algo = pool.MaxTimeout(15, 300*time.Millisecond)
o.Testing.Clock = clock
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
p := pool.Make(alloc, nil, o)

211
pool.go
View File

@ -23,12 +23,12 @@ func makePool[R any](alloc func() (R, error), free func(r R), o Options) pool[R]
o.Algo = Adaptive()
}
if o.Clock == nil {
o.Clock = times.Sys()
if o.Testing.Clock == nil {
o.Testing.Clock = times.Sys()
}
if sc, ok := o.Algo.(interface{ setClock(times.Clock) }); ok {
sc.setClock(o.Clock)
sc.setClock(o.Testing.Clock)
}
s := make(chan state[R], 1)
@ -70,50 +70,80 @@ func (p pool[R]) sendEvent(e EventType, s Stats) {
}
}
func (p pool[R]) get() (R, error) {
func (p pool[R]) get(n int) ([]R, error) {
if n <= 0 {
return nil, nil
}
s := <-p.state
defer func() {
p.state <- s
}()
var (
r R
event EventType
err error
)
var event EventType
event |= GetOperation
s.stats.Get++
switch {
case len(s.items) == 0 && p.alloc == nil:
s.stats.Alloc++
event |= AllocateOperation
event |= AllocateError
err = ErrEmpty
case len(s.items) == 0:
s.stats.Alloc++
event |= AllocateOperation
r, err = p.alloc()
if err != nil {
event |= AllocateError
} else {
s.stats.Active++
}
default:
r, s.items = s.items[len(s.items)-1], s.items[:len(s.items)-1]
if len(s.items) == 0 {
s.items = nil
}
s.stats.Active++
s.stats.Idle = len(s.items)
if len(s.items) < n && p.alloc == nil {
return nil, ErrNoItems
}
r := make([]R, n)
if len(s.items) <= n {
copy(r, s.items)
}
if len(s.items) > n {
// using the items from the end of the list:
copy(r, s.items[len(s.items)-n:])
}
if len(s.items) < n {
var err error
event |= AllocateOperation
for i := len(s.items); i < n; i++ {
s.stats.Alloc++
r[i], err = p.alloc()
if err == nil {
continue
}
s.items = append(s.items, r[len(s.items):i]...)
s.stats.Idle = len(s.items)
event |= AllocateError
p.sendEvent(event, s.stats)
return nil, err
}
}
if len(s.items) <= n {
var zero R
for i := 0; i < len(s.items); i++ {
s.items[i] = zero
}
s.items = nil
}
if len(s.items) > n {
var zero R
for i := len(s.items) - n; i < len(s.items); i++ {
s.items[i] = zero
}
s.items = s.items[:len(s.items)-n]
}
s.stats.Idle = len(s.items)
s.stats.Active += n
p.sendEvent(event, s.stats)
return r, err
return r, nil
}
func (p pool[R]) put(r R) {
func (p pool[R]) put(r ...R) {
if len(r) == 0 {
return
}
s := <-p.state
defer func() {
p.state <- s
@ -122,23 +152,28 @@ func (p pool[R]) put(r R) {
var event EventType
event |= PutOperation
s.stats.Put++
s.stats.Idle++
s.stats.Idle += len(r)
s.stats.Active -= len(r)
// one may put in items that were allocated outside of the pool:
if s.stats.Active > 0 {
s.stats.Active--
if s.stats.Active < 0 {
s.stats.Active = 0
}
t, f := p.options.Algo.Target(s.stats)
switch {
case t > len(s.items):
s.items = append(s.items, r)
case t >= len(s.items)+len(r):
s.items = append(s.items, r...)
case t > len(s.items) && t < len(s.items)+len(r):
event |= FreeOperation
s = p.freeItems(s, r[t-len(s.items):])
s.items = append(s.items, r[:t-len(s.items)]...)
default:
event |= FreeOperation
s = p.freeItems(s, t, r)
s = p.freeItems(s, r)
s = p.freeAboveTarget(s, t)
}
// fix provisioned idle count:
s.stats.Idle = len(s.items)
p.sendEvent(event, s.stats)
if f > 0 {
@ -146,7 +181,11 @@ func (p pool[R]) put(r R) {
}
}
func (p pool[R]) drop() {
func (p pool[R]) drop(n int) {
if n <= 0 {
return
}
s := <-p.state
defer func() {
p.state <- s
@ -154,15 +193,16 @@ func (p pool[R]) drop() {
var event EventType
event |= DropOperation
s.stats.Drop++
if s.stats.Active > 0 {
s.stats.Active--
s.stats.Drop += n
s.stats.Active -= n
if s.stats.Active < 0 {
s.stats.Active = 0
}
t, f := p.options.Algo.Target(s.stats)
if t < len(s.items) {
event |= FreeOperation
s = p.freeItems(s, t)
s = p.freeAboveTarget(s, t)
}
s.stats.Idle = len(s.items)
@ -173,6 +213,10 @@ func (p pool[R]) drop() {
}
func (p pool[R]) load(i []R) {
if len(i) == 0 {
return
}
s := <-p.state
defer func() {
p.state <- s
@ -193,59 +237,49 @@ func (p pool[R]) forcedCheck(s state[R], timeout time.Duration) state[R] {
s.forcedCheckPending = true
go func(to time.Duration) {
c := p.options.Clock.After(to)
p.options.TestBus.Signal("background-job-waiting")
c := p.options.Testing.Clock.After(to)
p.options.Testing.Bus.Signal("background-job-waiting")
<-c
p.freeIdle()
p.checkAndFree()
}(timeout)
return s
}
func (p pool[R]) freeItems(s state[R], target int, additional ...R) state[R] {
if len(s.items)+len(additional) <= target {
return s
}
s.stats.Free += len(additional)
if p.free == nil && len(s.items) <= target {
return s
}
var f []R
if p.free != nil {
f = additional
if len(s.items) > target {
f = append(f, s.items[:len(s.items)-target]...)
}
}
if len(s.items) > target {
s.stats.Free += len(s.items) - target
var zero R
for i := 0; i < len(s.items)-target; i++ {
s.items[i] = zero
}
s.items = s.items[len(s.items)-target:]
if len(s.items) == 0 {
s.items = nil
}
}
func (p pool[R]) freeItems(s state[R], r []R) state[R] {
s.stats.Free += len(r)
if p.free == nil {
return s
}
for _, fi := range f {
p.free(fi)
for _, ri := range r {
p.free(ri)
}
return s
}
func (p pool[R]) freeIdle() {
func (p pool[R]) freeAboveTarget(s state[R], target int) state[R] {
if len(s.items) <= target {
return s
}
f := s.items[:len(s.items)-target]
var zero R
for i := 0; i < len(s.items)-target; i++ {
s.items[i] = zero
}
s.items = s.items[len(s.items)-target:]
if len(s.items) == 0 {
s.items = nil
}
return p.freeItems(s, f)
}
func (p pool[R]) checkAndFree() {
s := <-p.state
defer func() {
p.state <- s
@ -254,7 +288,7 @@ func (p pool[R]) freeIdle() {
s.forcedCheckPending = false
t, f := p.options.Algo.Target(s.stats)
prev := s.stats.Free
s = p.freeItems(s, t)
s = p.freeAboveTarget(s, t)
s.stats.Idle = len(s.items)
if s.stats.Free > prev {
p.sendEvent(FreeOperation, s.stats)
@ -264,7 +298,7 @@ func (p pool[R]) freeIdle() {
s = p.forcedCheck(s, f)
}
p.options.TestBus.Signal("free-idle-done")
p.options.Testing.Bus.Signal("free-idle-done")
}
func (p pool[R]) freePool() {
@ -275,7 +309,8 @@ func (p pool[R]) freePool() {
s.stats.Idle = 0
prev := s.stats.Free
s = p.freeItems(s, 0)
s = p.freeItems(s, s.items)
s.items = nil
if s.stats.Free > prev {
p.sendEvent(FreeOperation, s.stats)
}

View File

@ -93,12 +93,12 @@ func TestPool(t *testing.T) {
t.Run("get own alloc not available", func(t *testing.T) {
p := pool.Make[[]byte](nil, nil, pool.Options{Algo: pool.NoShrink()})
_, err := p.Get()
if !errors.Is(err, pool.ErrEmpty) {
if !errors.Is(err, pool.ErrNoItems) {
t.Fatal(err)
}
s := p.Stats()
e := pool.Stats{Alloc: 1, Get: 1}
e := pool.Stats{Get: 1}
if s != e {
t.Fatal(s)
}
@ -242,12 +242,11 @@ func TestPool(t *testing.T) {
t.Run("release on timeout no free", func(t *testing.T) {
c := times.Test()
b := syncbus.New(time.Second)
o := pool.Options{
Algo: pool.Timeout(3 * time.Millisecond),
Clock: c,
TestBus: b,
}
var o pool.Options
o.Algo = pool.Timeout(3 * time.Millisecond)
o.Testing.Clock = c
o.Testing.Bus = b
p := pool.Make[[]byte](nil, nil, o)
p.Put(make([]byte, 1<<9))
if err := b.Wait("background-job-waiting"); err != nil {
@ -273,12 +272,11 @@ func TestPool(t *testing.T) {
f := func([]byte) { freeCount++ }
c := times.Test()
b := syncbus.New(time.Second)
o := pool.Options{
Algo: pool.Timeout(3 * time.Millisecond),
Clock: c,
TestBus: b,
}
var o pool.Options
o.Algo = pool.Timeout(3 * time.Millisecond)
o.Testing.Clock = c
o.Testing.Bus = b
p := pool.Make[[]byte](nil, f, o)
p.Put(make([]byte, 1<<9))
if err := b.Wait("background-job-waiting"); err != nil {

View File

@ -120,11 +120,10 @@ func testScenario(t *testing.T, o scenarioOptions, step scenarioStep, verify ver
}
alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil }
po := pool.Options{
Algo: o.algo,
Clock: clock,
}
var po pool.Options
po.Algo = o.algo
po.Testing.Clock = clock
p := pool.Make(alloc, nil, po)
active := active(initResource[[][]byte]())
stats := stats(initResource[[]pool.Stats]())