diff --git a/Makefile b/Makefile index afb28e9..8dda6d7 100644 --- a/Makefile +++ b/Makefile @@ -28,4 +28,4 @@ bench: $(sources) clean: go clean - rm .cover + rm -f .cover diff --git a/adaptive_test.go b/adaptive_test.go index 2e165b6..78777ad 100644 --- a/adaptive_test.go +++ b/adaptive_test.go @@ -264,7 +264,7 @@ func TestAdaptive(t *testing.T) { } s := p.Stats() - e := pool.Stats{Idle: 3, Active: 0, Get: 8, Put: 8, Alloc: 0, Free: 12} + e := pool.Stats{Idle: 3, Active: 0, Get: 8, Put: 8, Alloc: 0, Load: initialCount, Free: 12} if s != e { t.Fatal(s) } @@ -323,7 +323,7 @@ func TestAdaptive(t *testing.T) { } s := p.Stats() - e := pool.Stats{Idle: 8, Active: 23, Get: 39, Put: 16, Alloc: 16, Free: 0} + e := pool.Stats{Idle: 8, Active: 23, Get: 39, Put: 16, Alloc: 16, Load: adjustCount, Free: 0} if s != e { t.Fatal(s) } diff --git a/lib.go b/lib.go index 2528c4a..8e50d2c 100644 --- a/lib.go +++ b/lib.go @@ -1,8 +1,10 @@ -// pool with support for: -// - finalizing items -// - monitoring: events and/or stats -// - adaptive shrinking algorithm -// - custom shrinking algorithms +// Package pool provides a resource pool implementation that is safe to access from multiple goroutines. +// +// The pool supports finalizing items when shrinking the pool. It helps monitoring the pool usage and state with +// events and statistics. While it implements max idle size and timeout based shrinking algorithms to release +// idle resources from the pool, it also provides a zero-config adaptive algorithm for this purpose that can +// automatically adapt to changing resource usage characteristics. It also accepts custom algorithm +// implementations. package pool import ( @@ -14,70 +16,137 @@ import ( "time" ) +// Stats provides information about the pool state. type Stats struct { - Idle int + + // Idle is the number of resources currently held by the pool. + Idle int + + // Active is the nubmer of resources that are currently in use as known by the pool. Active int - Get int - Put int - Alloc int - Free int + + // Get is the number of get operations during the entire life cycle of the pool. + Get int + + // Put is the number of put operations during the entire life cycle of the pool. + Put int + + // Alloc is the number of allocations executed by the pool during the entire life cycle of the pool. + Alloc int + + // Load is the total number of items explicitly added to the pool via the Load method. + Load int + + // Free is the number of deallocations executed by the pool during the entire life cycle of the pool. + Free int } +// EventType is a binary flag categorizing the reason of an event. The types of events can be combined together, +// e.g. if a get operation requires an allocate operation, then the event type will be +// GetOperation|AllocateOperation. type EventType int const ( - None EventType = 0 + // None can be used to mask out all event types and not receiving any events. + None EventType = 0 + + // GetOperation is the type of events sent after a get operation. GetOperation EventType = 1 << iota + + // PutOperation is the type of events sent after a put operation. PutOperation + + // AllocateOperation is the type of events sent after an allocate operation. AllocateOperation + + // LoadOperation is the type of events sent after a load operation. + LoadOperation + + // FreeOperation is the type of events sent after a free operation. FreeOperation + + // AllocateError is the type of events sent after a failed allocation. The error will not be included + // with the event, but it will be returned by the failed Get function call. AllocateError + // AllEvents can be used as a mask that includes all the event types. AllEvents = GetOperation | PutOperation | AllocateOperation | FreeOperation | AllocateError ) +// Event values are sent by the pool after various operations, if it is configured to use a channel the send the +// events to. type Event struct { - Type EventType + + // Type is the binary flag depicting the type of the event. + Type EventType + + // Stats contains the statistics about the pool at the time of the event. Stats Stats } +// Algo implementations control when the shrinking of the idle items in the pool happens. The pool can be used +// with the implementations provided by this package or custom ones. The implementation can hold internal state, +// the pool guarantees that the algo instance is accessed only from one goroutine at a time. type Algo interface { - // always called - // desired idle items - // implementations should consider the cost of freeing the stored resources - // must support being called from a goroutine other than it was created in - // a single pool instance only calls it from a single goroutine at a time - // items need to be allocated always by calling Get - // second return argument for requested next check - // the Target function of a single Algo implementation is not called concurrently - // called on all put, regardless of nextCheck - // not all nextChecks result in a call if a previously request nextCheck is still pending + // Target is called on every Put operation with the current pool state as the input. It is expected to + // return the target idle count, as dictated by the implementing algorithm. Optionally, a timeout value + // can be returned (nextCheck), and if it is a positive value, the pool will call Target again after the + // defined time expires to see if the next target idle count. In each case, when Target returns a + // smaller number than the current idle count, it shrinks the pool to the defined target. + // + // When using nextCheck, not every returned nextCheck results in calling Target by the pool, only the + // ones that were set after the previous one expired. + // + // Implementations should consider that while the recommended way of using the pool is to only call Put + // with items that were received by calling Get, the pool itself doesn't prohibit calling Put with + // 'foreign' items. Target(Stats) (target int, nextCheck time.Duration) - // called when Pool.Load - // can be used to adjust internal state - // can be noop if not required + // Load is called by the pool when Pool.Load is used, passing in the number of items that were loaded as + // the result of a 'prewarm' or other preallocation. The number of loaded items is passed in as + // argument, which can be used to adjust the algorithm's internal state. If the implemented algorithm is + // stateless, or it is not sensitive to loading items this way, Load can be implemented as a noop. Load(int) } +// Options can be used to configure the pool. Some of the options are provided to support testing various +// scenarios. type Options struct { - // events can be dropped if the consumer is blocked + // Events is a channel that, when set, the pool is using for sending events. The channel needs to be + // used together with a non-default event mask set. When using events, we should consider to use a + // buffered channel. Events can be dropped if the consumer is blocked and the channel is not ready to + // communicate at the time of the event. Events chan<- Event + // EventMask is a binary flag that defines which events will be sent to the provided channel. The + // default is no events. EventMask EventType - Algo Algo - Clock times.Clock - TestBus *syncbus.SyncBus + + // Algo is the algorithm implementation used for shrinking the pool. The default is Adaptive(). + Algo Algo + + // Clock is an optional clock meant to be used with testing. The main purpose is to avoid time sensitive + // tests running for a too long time. + Clock times.Clock + + // TestBus is an optional signal bus meant to be used with testing. The main purpose is to ensure that + // specific blocks of code are executed in a predefiend order during concurrent tests. + TestBus *syncbus.SyncBus } +// Pool is a synchronized resource pool of resources, that are considered expensive to allocate. Initialize the +// pool with the Make() function. Methods of uninitialized Pool instances may block forever. For the usage of +// the pool, see the docs of its method, initialization options and the provided algorithms. type Pool[R any] struct { pool pool[R] } +// ErrEmpty is returned on Get calls when the pool is empty and it was initialized without an allocate function. var ErrEmpty = errors.New("empty pool") +// String returns the string representation of the EventType binary flag, including all the flags that are set. func (et EventType) String() string { var s []string if et&GetOperation != 0 { @@ -92,6 +161,10 @@ func (et EventType) String() string { s = append(s, "allocate") } + if et&LoadOperation != 0 { + s = append(s, "load") + } + if et&FreeOperation != 0 { s = append(s, "free") } @@ -107,80 +180,115 @@ func (et EventType) String() string { return strings.Join(s, "|") } +// String returns the string representation of an Event value, including the type and statistics. func (ev Event) String() string { return fmt.Sprintf("%v; %v", ev.Type, ev.Stats) } +// String returns the string representation of a set of statistics about the pool. func (s Stats) String() string { return fmt.Sprintf( - "idle: %d, active: %d, get: %d, put: %d, alloc: %d, free: %d", + "idle: %d, active: %d, get: %d, put: %d, alloc: %d, load: %d, free: %d", s.Idle, s.Active, s.Get, s.Put, s.Alloc, + s.Load, s.Free, ) } -// zero-config -// potential caveats: -// - a caveaat depending on the expectations, since no absolute time input is used, identifies frequent -// spikes from zero and slow grow and shrink from and to zero cycles are considered the same and the pool cleans -// up idle items accordingly. In short: _|_|_|_ = __/\__/\__/\__ +// Adaptive creates a zero-config pool shrink algorithm instance. It is the default algorithm used by the pool. +// +// It is based on exponential moving average of the active items and the deviation of it. This way it can react +// to, and to some extent overbuild, on the perceived stress. It decays the number of idle items gradually, and +// on very sudden drops in traffic, it ensures the eventual release of all pooled items with a background job, +// that is timed based on the duration of the last active usage session, which is the time while there were +// active items. Together with the pool implementation, it always reuses the most recent items, as in LIFO for +// Get and FIFO for Free. +// +// We need to be aware of some potential caveats due to its zero-config nature. It doesn't use any absolute +// values, like a timing parameter. It only considers the sequence of the pool states. This can result in +// behaviour that, without understanding this zero-config nature, might be unexpected. A trivial example is that +// the algorithm doesn't differentiate between periodic long grow/shrink/steady patterns and periodic short +// spikes/steady patterns. In short, it can happen that: __/\__/\__/\__ = _|_|_|_. Instead, it optimizes for +// having enough but no more 'capacity' (idle items) for the predicated 'load' (active items). func Adaptive() Algo { return makeAdaptiveAlgo() } -// enfoces a max pool size and a timeout for the items -// when adding items to the pool via Put that were not fetched via Get, there can discrepancies occur in which -// items get timed out, but the general pool limitations get still consistently enforced eventually +// MaxTimeout creates a pool shrink algorithm instance, that releases items whenever the number of idle items +// would be greater than max, and it also releases those items that were idle for too long. Together with the +// pool, it ensures that the Get operation is LIFO and the Free operation is FIFO. +// +// If max <= 0, the max pool size is not enforced. If to <= 0, the timeout is not enforced. func MaxTimeout(max int, to time.Duration) Algo { return makeMaxTimeout(max, to) } -// like MaxTimeout but without enforcing timeouts +// Max is like MaxTimeout, but without the mas idle time. func Max(max int) Algo { return makeMaxTimeout(max, 0) } -// like MaxTimeout but without enforcing max pool size +// Timeout is like MaxTimeout, but without the max pool size. func Timeout(to time.Duration) Algo { return makeMaxTimeout(0, to) } -// the user code can decide not to put back items to the pool, however, the primary purpose is testing +// NoShrink is a noop shrink algorithm, it doesn't release any idle items. The user code can decide whether to +// put back items in the pool or not. It might be useful in certain testing scenarios. func NoShrink() Algo { return makeMaxTimeout(0, 0) } -// alloc and free need to support calls from goroutines other than they were created in -// a single pool instance only calls them from a single goroutine at a time -// free happens synchronously, user code may execute it in the background, in which case it is the user code's -// responsibility to ensure that free is fully carried out before the application exits, if that's necessary +// Make initializes a Pool instance. +// +// The paramter alloc is used on Get operations when the pool is empty. If alloc is nil, and the pool is empty +// at the time of calling Get, Get will return ErrEmpty. If alloc returns an error, the same error is returned +// by Get. If events were configured, alloc triggers AllocateOperation event. This event is typically the same +// as the GetOperation event. +// +// The parameter free is called when an item is released from the pool, with the item being released as the +// argument. It can be nil for resource types that don't need explicit deallocation. If events were configured, +// releasing an item triggers a FreeOperation event, regardless if the free parameter is nil. func Make[R any](alloc func() (R, error), free func(R), o Options) Pool[R] { return Pool[R]{pool: makePool(alloc, free, o)} } +// Get returns an item from the pool. If the pool is empty and no allocation function was configured, it returns +// ErrEmpty. If the pool is empty, and the allocation function returns an error, it returns that error. If +// events were configured, Get triggers a GetOperation event. func (p Pool[R]) Get() (R, error) { return p.pool.get() } -// it is allowed to put items that were not received by get, but the selected algorithm may produce unexpected -// behavior. In most cases, it is recommended to use Load instead, and using Put to put back only those items -// that were received via Get. +// Put stores an item in the pool. If events were configured, it triggers a PutOperation event. +// +// It is recommended to use it only with items that were received by the Get method. While it is allowed to put +// other items in the pool, it may change the way the shrinking algorithm works. E.g. it can be considered as a +// sudden drop in the number of active items. If the pool needs to be prewarmed, or prepared for an expected +// spike of traffic, consider using the Load method. func (p Pool[R]) Put(i R) { p.pool.put(i) } +// Load can be used to populate the pool with items that were not allocated as the result of the Get operation. +// It can be useful in scenarios where prewarming or preparation for an expected sudden traffic spike is +// expected. If events were configured, it triggers a LoadOperation event. func (p Pool[R]) Load(i []R) { p.pool.load(i) } +// Stats returns statistics about the current state of the pool. It contains the current number of active/idle +// items, and perpetual counters for the various pool operations. func (p Pool[R]) Stats() Stats { return p.pool.stats() } +// Free releases all idle items in the pool. While the pool stays operational, Free is meant to be used when the +// pool is not required anymore. func (p Pool[R]) Free() { p.pool.freePool() } diff --git a/license b/license index 00e6290..c1ca00a 100644 --- a/license +++ b/license @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2018 Arpad Ryszka +Copyright (c) 2026 Arpad Ryszka Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/maxto_test.go b/maxto_test.go index 13f9c45..0c80fff 100644 --- a/maxto_test.go +++ b/maxto_test.go @@ -384,7 +384,7 @@ func TestMaxTO(t *testing.T) { } s := p.Stats() - e := pool.Stats{Idle: 1, Active: 0, Get: 8, Put: 8, Alloc: 0, Free: 14} + e := pool.Stats{Idle: 1, Active: 0, Get: 8, Put: 8, Alloc: 0, Load: initialCount, Free: 14} if s != e { t.Fatal(s) } @@ -455,7 +455,7 @@ func TestMaxTO(t *testing.T) { } s := p.Stats() - e := pool.Stats{Idle: 1, Active: 23, Get: 39, Put: 16, Alloc: 20, Free: 11} + e := pool.Stats{Idle: 1, Active: 23, Get: 39, Put: 16, Alloc: 20, Load: adjustCount, Free: 11} if s != e { t.Fatal(s) } diff --git a/pool.go b/pool.go index 6af6980..43c6c00 100644 --- a/pool.go +++ b/pool.go @@ -154,7 +154,10 @@ func (p pool[R]) load(i []R) { s.items = append(s.items, i...) s.stats.Idle = len(s.items) + s.stats.Load += len(i) p.options.Algo.Load(len(i)) + event := LoadOperation + p.sendEvent(event, s.stats) } func (p pool[R]) forcedCheck(s state[R], timeout time.Duration) state[R] {