diff --git a/adaptive_test.go b/adaptive_test.go index 53e1ceb..f9ee7b0 100644 --- a/adaptive_test.go +++ b/adaptive_test.go @@ -50,11 +50,10 @@ func TestAdaptive(t *testing.T) { bus := syncbus.New(time.Second) clock := times.Test() - o := pool.Options{ - Clock: clock, - TestBus: bus, - } + var o pool.Options + o.Testing.Clock = clock + o.Testing.Bus = bus alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil } p := pool.Make(alloc, nil, o) diff --git a/lib.go b/lib.go index 2d06516..0a6ebb5 100644 --- a/lib.go +++ b/lib.go @@ -139,13 +139,16 @@ type Options struct { // Algo is the algorithm implementation used for shrinking the pool. The default is Adaptive(). Algo Algo - // Clock is an optional clock meant to be used with testing. The main purpose is to avoid time sensitive - // tests running for a too long time. - Clock times.Clock + Testing struct { - // TestBus is an optional signal bus meant to be used with testing. The main purpose is to ensure that - // specific blocks of code are executed in a predefined order during concurrent tests. - TestBus *syncbus.SyncBus + // Clock is an optional clock meant to be used with testing. The main purpose is to avoid time sensitive + // tests running for a too long time. + Clock times.Clock + + // Bus is an optional signal bus meant to be used with testing. The main purpose is to ensure that + // specific blocks of code are executed in a predefined order during concurrent tests. + Bus *syncbus.SyncBus + } } // Pool is a synchronized pool of resources that are considered expensive to allocate. Initialize the pool with @@ -155,8 +158,9 @@ type Pool[R any] struct { pool pool[R] } -// ErrEmpty is returned on Get calls when the pool is empty and it was initialized without an allocate function. -var ErrEmpty = errors.New("empty pool") +// ErrNoItems is returned on Get calls when the pool does not enough items and it was initialized without an +// allocate function. +var ErrNoItems = errors.New("not enough items in the pool") // String returns the string representation of the EventType binary flag, including all the flags that are set. func (et EventType) String() string { @@ -273,30 +277,50 @@ func Make[R any](alloc func() (R, error), free func(R), o Options) Pool[R] { return Pool[R]{pool: makePool(alloc, free, o)} } +// GetN returns n items from the pool. If the pool is empty and no allocation function was configured, it +// returns ErrEmpty. If the pool has less than n items and no allocation function was configured, it returns the +// available items and ErrEtmpy. If the pool is empty or has less than the requested items, and the allocation +// function returns an error, it returns the available items and that error. If events were configured, GetN +// triggers a GetOperation event. +func (p Pool[R]) GetN(n int) ([]R, error) { + return p.pool.get(n) +} + // Get returns an item from the pool. If the pool is empty and no allocation function was configured, it returns // ErrEmpty. If the pool is empty, and the allocation function returns an error, it returns that error. If // events were configured, Get triggers a GetOperation event. func (p Pool[R]) Get() (R, error) { - return p.pool.get() + r, err := p.pool.get(1) + if err != nil { + var rr R + return rr, err + } + + return r[0], nil } -// Put stores an item in the pool. If events were configured, it triggers a PutOperation event. +// Put stores one or more items in the pool. If events were configured, it triggers a PutOperation event. // // It is recommended to use it only with items that were received by the Get method. While it is allowed to put // other items in the pool, it may change the way the shrinking algorithm works. E.g. it can be considered as a // sudden drop in the number of active items. If the pool needs to be prewarmed, or prepared for an expected // spike of traffic, consider using the Load method. -func (p Pool[R]) Put(i R) { - p.pool.put(i) +func (p Pool[R]) Put(i ...R) { + p.pool.put(i...) } -// Drop indicates to the pool that an item received by Get was dropped and will not be put back into the pool. +// DropN indicates to the pool that N items received by Get were dropped and will not be put back into the pool. // -// While it's not mandatory call it when item was dropped, it has a twofold purpose. Depending on the shrinking -// algorithm, it may work more consistently, if the algorithm is notified about the change in the active items. -// The other purpose is to allow the pool to reflect these cases in the events and the stats. +// While it's not mandatory call it when some items were dropped, it has a twofold purpose. Depending on the +// shrinking algorithm, it may work more consistently, if the algorithm is notified about the change in the +// active items. The other purpose is to allow the pool to reflect these cases in the events and the stats. +func (p Pool[R]) DropN(n int) { + p.pool.drop(n) +} + +// Drop is like DropN, but for a single item. func (p Pool[R]) Drop() { - p.pool.drop() + p.pool.drop(1) } // Load can be used to populate the pool with items that were not allocated as the result of the Get operation. diff --git a/maxto_test.go b/maxto_test.go index 38d86a0..46b4a3d 100644 --- a/maxto_test.go +++ b/maxto_test.go @@ -221,11 +221,10 @@ func TestMaxTO(t *testing.T) { ) clock := times.Test() - o := pool.Options{ - Clock: clock, - Algo: pool.MaxTimeout(15, 300*time.Millisecond), - } + var o pool.Options + o.Algo = pool.MaxTimeout(15, 300*time.Millisecond) + o.Testing.Clock = clock alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil } p := pool.Make(alloc, nil, o) @@ -277,11 +276,10 @@ func TestMaxTO(t *testing.T) { ) clock := times.Test() - o := pool.Options{ - Clock: clock, - Algo: pool.MaxTimeout(15, 300*time.Millisecond), - } + var o pool.Options + o.Algo = pool.MaxTimeout(15, 300*time.Millisecond) + o.Testing.Clock = clock alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil } p := pool.Make(alloc, nil, o) @@ -346,11 +344,10 @@ func TestMaxTO(t *testing.T) { ) clock := times.Test() - o := pool.Options{ - Clock: clock, - Algo: pool.MaxTimeout(15, 300*time.Millisecond), - } + var o pool.Options + o.Algo = pool.MaxTimeout(15, 300*time.Millisecond) + o.Testing.Clock = clock alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil } p := pool.Make(alloc, nil, o) @@ -404,11 +401,10 @@ func TestMaxTO(t *testing.T) { ) clock := times.Test() - o := pool.Options{ - Clock: clock, - Algo: pool.MaxTimeout(15, 300*time.Millisecond), - } + var o pool.Options + o.Algo = pool.MaxTimeout(15, 300*time.Millisecond) + o.Testing.Clock = clock alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil } p := pool.Make(alloc, nil, o) diff --git a/pool.go b/pool.go index 5020b6b..c73de2b 100644 --- a/pool.go +++ b/pool.go @@ -23,12 +23,12 @@ func makePool[R any](alloc func() (R, error), free func(r R), o Options) pool[R] o.Algo = Adaptive() } - if o.Clock == nil { - o.Clock = times.Sys() + if o.Testing.Clock == nil { + o.Testing.Clock = times.Sys() } if sc, ok := o.Algo.(interface{ setClock(times.Clock) }); ok { - sc.setClock(o.Clock) + sc.setClock(o.Testing.Clock) } s := make(chan state[R], 1) @@ -70,50 +70,80 @@ func (p pool[R]) sendEvent(e EventType, s Stats) { } } -func (p pool[R]) get() (R, error) { +func (p pool[R]) get(n int) ([]R, error) { + if n <= 0 { + return nil, nil + } + s := <-p.state defer func() { p.state <- s }() - var ( - r R - event EventType - err error - ) - + var event EventType event |= GetOperation s.stats.Get++ - switch { - case len(s.items) == 0 && p.alloc == nil: - s.stats.Alloc++ - event |= AllocateOperation - event |= AllocateError - err = ErrEmpty - case len(s.items) == 0: - s.stats.Alloc++ - event |= AllocateOperation - r, err = p.alloc() - if err != nil { - event |= AllocateError - } else { - s.stats.Active++ - } - default: - r, s.items = s.items[len(s.items)-1], s.items[:len(s.items)-1] - if len(s.items) == 0 { - s.items = nil - } - - s.stats.Active++ - s.stats.Idle = len(s.items) + if len(s.items) < n && p.alloc == nil { + return nil, ErrNoItems } + r := make([]R, n) + if len(s.items) <= n { + copy(r, s.items) + } + + if len(s.items) > n { + // using the items from the end of the list: + copy(r, s.items[len(s.items)-n:]) + } + + if len(s.items) < n { + var err error + event |= AllocateOperation + for i := len(s.items); i < n; i++ { + s.stats.Alloc++ + r[i], err = p.alloc() + if err == nil { + continue + } + + s.items = append(s.items, r[len(s.items):i]...) + s.stats.Idle = len(s.items) + event |= AllocateError + p.sendEvent(event, s.stats) + return nil, err + } + } + + if len(s.items) <= n { + var zero R + for i := 0; i < len(s.items); i++ { + s.items[i] = zero + } + + s.items = nil + } + + if len(s.items) > n { + var zero R + for i := len(s.items) - n; i < len(s.items); i++ { + s.items[i] = zero + } + + s.items = s.items[:len(s.items)-n] + } + + s.stats.Idle = len(s.items) + s.stats.Active += n p.sendEvent(event, s.stats) - return r, err + return r, nil } -func (p pool[R]) put(r R) { +func (p pool[R]) put(r ...R) { + if len(r) == 0 { + return + } + s := <-p.state defer func() { p.state <- s @@ -122,23 +152,28 @@ func (p pool[R]) put(r R) { var event EventType event |= PutOperation s.stats.Put++ - s.stats.Idle++ + s.stats.Idle += len(r) + s.stats.Active -= len(r) // one may put in items that were allocated outside of the pool: - if s.stats.Active > 0 { - s.stats.Active-- + if s.stats.Active < 0 { + s.stats.Active = 0 } t, f := p.options.Algo.Target(s.stats) switch { - case t > len(s.items): - s.items = append(s.items, r) + case t >= len(s.items)+len(r): + s.items = append(s.items, r...) + case t > len(s.items) && t < len(s.items)+len(r): + event |= FreeOperation + s = p.freeItems(s, r[t-len(s.items):]) + s.items = append(s.items, r[:t-len(s.items)]...) default: event |= FreeOperation - s = p.freeItems(s, t, r) + s = p.freeItems(s, r) + s = p.freeAboveTarget(s, t) } - // fix provisioned idle count: s.stats.Idle = len(s.items) p.sendEvent(event, s.stats) if f > 0 { @@ -146,7 +181,11 @@ func (p pool[R]) put(r R) { } } -func (p pool[R]) drop() { +func (p pool[R]) drop(n int) { + if n <= 0 { + return + } + s := <-p.state defer func() { p.state <- s @@ -154,15 +193,16 @@ func (p pool[R]) drop() { var event EventType event |= DropOperation - s.stats.Drop++ - if s.stats.Active > 0 { - s.stats.Active-- + s.stats.Drop += n + s.stats.Active -= n + if s.stats.Active < 0 { + s.stats.Active = 0 } t, f := p.options.Algo.Target(s.stats) if t < len(s.items) { event |= FreeOperation - s = p.freeItems(s, t) + s = p.freeAboveTarget(s, t) } s.stats.Idle = len(s.items) @@ -173,6 +213,10 @@ func (p pool[R]) drop() { } func (p pool[R]) load(i []R) { + if len(i) == 0 { + return + } + s := <-p.state defer func() { p.state <- s @@ -193,59 +237,49 @@ func (p pool[R]) forcedCheck(s state[R], timeout time.Duration) state[R] { s.forcedCheckPending = true go func(to time.Duration) { - c := p.options.Clock.After(to) - p.options.TestBus.Signal("background-job-waiting") + c := p.options.Testing.Clock.After(to) + p.options.Testing.Bus.Signal("background-job-waiting") <-c - p.freeIdle() + p.checkAndFree() }(timeout) return s } -func (p pool[R]) freeItems(s state[R], target int, additional ...R) state[R] { - if len(s.items)+len(additional) <= target { - return s - } - - s.stats.Free += len(additional) - if p.free == nil && len(s.items) <= target { - return s - } - - var f []R - if p.free != nil { - f = additional - if len(s.items) > target { - f = append(f, s.items[:len(s.items)-target]...) - } - } - - if len(s.items) > target { - s.stats.Free += len(s.items) - target - - var zero R - for i := 0; i < len(s.items)-target; i++ { - s.items[i] = zero - } - - s.items = s.items[len(s.items)-target:] - if len(s.items) == 0 { - s.items = nil - } - } - +func (p pool[R]) freeItems(s state[R], r []R) state[R] { + s.stats.Free += len(r) if p.free == nil { return s } - for _, fi := range f { - p.free(fi) + for _, ri := range r { + p.free(ri) } return s } -func (p pool[R]) freeIdle() { +func (p pool[R]) freeAboveTarget(s state[R], target int) state[R] { + if len(s.items) <= target { + return s + } + + f := s.items[:len(s.items)-target] + + var zero R + for i := 0; i < len(s.items)-target; i++ { + s.items[i] = zero + } + + s.items = s.items[len(s.items)-target:] + if len(s.items) == 0 { + s.items = nil + } + + return p.freeItems(s, f) +} + +func (p pool[R]) checkAndFree() { s := <-p.state defer func() { p.state <- s @@ -254,7 +288,7 @@ func (p pool[R]) freeIdle() { s.forcedCheckPending = false t, f := p.options.Algo.Target(s.stats) prev := s.stats.Free - s = p.freeItems(s, t) + s = p.freeAboveTarget(s, t) s.stats.Idle = len(s.items) if s.stats.Free > prev { p.sendEvent(FreeOperation, s.stats) @@ -264,7 +298,7 @@ func (p pool[R]) freeIdle() { s = p.forcedCheck(s, f) } - p.options.TestBus.Signal("free-idle-done") + p.options.Testing.Bus.Signal("free-idle-done") } func (p pool[R]) freePool() { @@ -275,7 +309,8 @@ func (p pool[R]) freePool() { s.stats.Idle = 0 prev := s.stats.Free - s = p.freeItems(s, 0) + s = p.freeItems(s, s.items) + s.items = nil if s.stats.Free > prev { p.sendEvent(FreeOperation, s.stats) } diff --git a/pool_test.go b/pool_test.go index 2c98e43..5ce70f1 100644 --- a/pool_test.go +++ b/pool_test.go @@ -93,12 +93,12 @@ func TestPool(t *testing.T) { t.Run("get own alloc not available", func(t *testing.T) { p := pool.Make[[]byte](nil, nil, pool.Options{Algo: pool.NoShrink()}) _, err := p.Get() - if !errors.Is(err, pool.ErrEmpty) { + if !errors.Is(err, pool.ErrNoItems) { t.Fatal(err) } s := p.Stats() - e := pool.Stats{Alloc: 1, Get: 1} + e := pool.Stats{Get: 1} if s != e { t.Fatal(s) } @@ -242,12 +242,11 @@ func TestPool(t *testing.T) { t.Run("release on timeout no free", func(t *testing.T) { c := times.Test() b := syncbus.New(time.Second) - o := pool.Options{ - Algo: pool.Timeout(3 * time.Millisecond), - Clock: c, - TestBus: b, - } + var o pool.Options + o.Algo = pool.Timeout(3 * time.Millisecond) + o.Testing.Clock = c + o.Testing.Bus = b p := pool.Make[[]byte](nil, nil, o) p.Put(make([]byte, 1<<9)) if err := b.Wait("background-job-waiting"); err != nil { @@ -273,12 +272,11 @@ func TestPool(t *testing.T) { f := func([]byte) { freeCount++ } c := times.Test() b := syncbus.New(time.Second) - o := pool.Options{ - Algo: pool.Timeout(3 * time.Millisecond), - Clock: c, - TestBus: b, - } + var o pool.Options + o.Algo = pool.Timeout(3 * time.Millisecond) + o.Testing.Clock = c + o.Testing.Bus = b p := pool.Make[[]byte](nil, f, o) p.Put(make([]byte, 1<<9)) if err := b.Wait("background-job-waiting"); err != nil { diff --git a/scenario_test.go b/scenario_test.go index 2946b93..d28bd65 100644 --- a/scenario_test.go +++ b/scenario_test.go @@ -120,11 +120,10 @@ func testScenario(t *testing.T, o scenarioOptions, step scenarioStep, verify ver } alloc := func() ([]byte, error) { return make([]byte, 1<<9), nil } - po := pool.Options{ - Algo: o.algo, - Clock: clock, - } + var po pool.Options + po.Algo = o.algo + po.Testing.Clock = clock p := pool.Make(alloc, nil, po) active := active(initResource[[][]byte]()) stats := stats(initResource[[]pool.Stats]())