package documentation
This commit is contained in:
parent
4dea0194c4
commit
e77941b52e
2
Makefile
2
Makefile
@ -28,4 +28,4 @@ bench: $(sources)
|
|||||||
|
|
||||||
clean:
|
clean:
|
||||||
go clean
|
go clean
|
||||||
rm .cover
|
rm -f .cover
|
||||||
|
|||||||
@ -264,7 +264,7 @@ func TestAdaptive(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
s := p.Stats()
|
s := p.Stats()
|
||||||
e := pool.Stats{Idle: 3, Active: 0, Get: 8, Put: 8, Alloc: 0, Free: 12}
|
e := pool.Stats{Idle: 3, Active: 0, Get: 8, Put: 8, Alloc: 0, Load: initialCount, Free: 12}
|
||||||
if s != e {
|
if s != e {
|
||||||
t.Fatal(s)
|
t.Fatal(s)
|
||||||
}
|
}
|
||||||
@ -323,7 +323,7 @@ func TestAdaptive(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
s := p.Stats()
|
s := p.Stats()
|
||||||
e := pool.Stats{Idle: 8, Active: 23, Get: 39, Put: 16, Alloc: 16, Free: 0}
|
e := pool.Stats{Idle: 8, Active: 23, Get: 39, Put: 16, Alloc: 16, Load: adjustCount, Free: 0}
|
||||||
if s != e {
|
if s != e {
|
||||||
t.Fatal(s)
|
t.Fatal(s)
|
||||||
}
|
}
|
||||||
|
|||||||
184
lib.go
184
lib.go
@ -1,8 +1,10 @@
|
|||||||
// pool with support for:
|
// Package pool provides a resource pool implementation that is safe to access from multiple goroutines.
|
||||||
// - finalizing items
|
//
|
||||||
// - monitoring: events and/or stats
|
// The pool supports finalizing items when shrinking the pool. It helps monitoring the pool usage and state with
|
||||||
// - adaptive shrinking algorithm
|
// events and statistics. While it implements max idle size and timeout based shrinking algorithms to release
|
||||||
// - custom shrinking algorithms
|
// idle resources from the pool, it also provides a zero-config adaptive algorithm for this purpose that can
|
||||||
|
// automatically adapt to changing resource usage characteristics. It also accepts custom algorithm
|
||||||
|
// implementations.
|
||||||
package pool
|
package pool
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -14,70 +16,137 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Stats provides information about the pool state.
|
||||||
type Stats struct {
|
type Stats struct {
|
||||||
|
|
||||||
|
// Idle is the number of resources currently held by the pool.
|
||||||
Idle int
|
Idle int
|
||||||
|
|
||||||
|
// Active is the nubmer of resources that are currently in use as known by the pool.
|
||||||
Active int
|
Active int
|
||||||
|
|
||||||
|
// Get is the number of get operations during the entire life cycle of the pool.
|
||||||
Get int
|
Get int
|
||||||
|
|
||||||
|
// Put is the number of put operations during the entire life cycle of the pool.
|
||||||
Put int
|
Put int
|
||||||
|
|
||||||
|
// Alloc is the number of allocations executed by the pool during the entire life cycle of the pool.
|
||||||
Alloc int
|
Alloc int
|
||||||
|
|
||||||
|
// Load is the total number of items explicitly added to the pool via the Load method.
|
||||||
|
Load int
|
||||||
|
|
||||||
|
// Free is the number of deallocations executed by the pool during the entire life cycle of the pool.
|
||||||
Free int
|
Free int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EventType is a binary flag categorizing the reason of an event. The types of events can be combined together,
|
||||||
|
// e.g. if a get operation requires an allocate operation, then the event type will be
|
||||||
|
// GetOperation|AllocateOperation.
|
||||||
type EventType int
|
type EventType int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
// None can be used to mask out all event types and not receiving any events.
|
||||||
None EventType = 0
|
None EventType = 0
|
||||||
|
|
||||||
|
// GetOperation is the type of events sent after a get operation.
|
||||||
GetOperation EventType = 1 << iota
|
GetOperation EventType = 1 << iota
|
||||||
|
|
||||||
|
// PutOperation is the type of events sent after a put operation.
|
||||||
PutOperation
|
PutOperation
|
||||||
|
|
||||||
|
// AllocateOperation is the type of events sent after an allocate operation.
|
||||||
AllocateOperation
|
AllocateOperation
|
||||||
|
|
||||||
|
// LoadOperation is the type of events sent after a load operation.
|
||||||
|
LoadOperation
|
||||||
|
|
||||||
|
// FreeOperation is the type of events sent after a free operation.
|
||||||
FreeOperation
|
FreeOperation
|
||||||
|
|
||||||
|
// AllocateError is the type of events sent after a failed allocation. The error will not be included
|
||||||
|
// with the event, but it will be returned by the failed Get function call.
|
||||||
AllocateError
|
AllocateError
|
||||||
|
|
||||||
|
// AllEvents can be used as a mask that includes all the event types.
|
||||||
AllEvents = GetOperation | PutOperation | AllocateOperation | FreeOperation | AllocateError
|
AllEvents = GetOperation | PutOperation | AllocateOperation | FreeOperation | AllocateError
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Event values are sent by the pool after various operations, if it is configured to use a channel the send the
|
||||||
|
// events to.
|
||||||
type Event struct {
|
type Event struct {
|
||||||
|
|
||||||
|
// Type is the binary flag depicting the type of the event.
|
||||||
Type EventType
|
Type EventType
|
||||||
|
|
||||||
|
// Stats contains the statistics about the pool at the time of the event.
|
||||||
Stats Stats
|
Stats Stats
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Algo implementations control when the shrinking of the idle items in the pool happens. The pool can be used
|
||||||
|
// with the implementations provided by this package or custom ones. The implementation can hold internal state,
|
||||||
|
// the pool guarantees that the algo instance is accessed only from one goroutine at a time.
|
||||||
type Algo interface {
|
type Algo interface {
|
||||||
|
|
||||||
// always called
|
// Target is called on every Put operation with the current pool state as the input. It is expected to
|
||||||
// desired idle items
|
// return the target idle count, as dictated by the implementing algorithm. Optionally, a timeout value
|
||||||
// implementations should consider the cost of freeing the stored resources
|
// can be returned (nextCheck), and if it is a positive value, the pool will call Target again after the
|
||||||
// must support being called from a goroutine other than it was created in
|
// defined time expires to see if the next target idle count. In each case, when Target returns a
|
||||||
// a single pool instance only calls it from a single goroutine at a time
|
// smaller number than the current idle count, it shrinks the pool to the defined target.
|
||||||
// items need to be allocated always by calling Get
|
//
|
||||||
// second return argument for requested next check
|
// When using nextCheck, not every returned nextCheck results in calling Target by the pool, only the
|
||||||
// the Target function of a single Algo implementation is not called concurrently
|
// ones that were set after the previous one expired.
|
||||||
// called on all put, regardless of nextCheck
|
//
|
||||||
// not all nextChecks result in a call if a previously request nextCheck is still pending
|
// Implementations should consider that while the recommended way of using the pool is to only call Put
|
||||||
|
// with items that were received by calling Get, the pool itself doesn't prohibit calling Put with
|
||||||
|
// 'foreign' items.
|
||||||
Target(Stats) (target int, nextCheck time.Duration)
|
Target(Stats) (target int, nextCheck time.Duration)
|
||||||
|
|
||||||
// called when Pool.Load
|
// Load is called by the pool when Pool.Load is used, passing in the number of items that were loaded as
|
||||||
// can be used to adjust internal state
|
// the result of a 'prewarm' or other preallocation. The number of loaded items is passed in as
|
||||||
// can be noop if not required
|
// argument, which can be used to adjust the algorithm's internal state. If the implemented algorithm is
|
||||||
|
// stateless, or it is not sensitive to loading items this way, Load can be implemented as a noop.
|
||||||
Load(int)
|
Load(int)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Options can be used to configure the pool. Some of the options are provided to support testing various
|
||||||
|
// scenarios.
|
||||||
type Options struct {
|
type Options struct {
|
||||||
|
|
||||||
// events can be dropped if the consumer is blocked
|
// Events is a channel that, when set, the pool is using for sending events. The channel needs to be
|
||||||
|
// used together with a non-default event mask set. When using events, we should consider to use a
|
||||||
|
// buffered channel. Events can be dropped if the consumer is blocked and the channel is not ready to
|
||||||
|
// communicate at the time of the event.
|
||||||
Events chan<- Event
|
Events chan<- Event
|
||||||
|
|
||||||
|
// EventMask is a binary flag that defines which events will be sent to the provided channel. The
|
||||||
|
// default is no events.
|
||||||
EventMask EventType
|
EventMask EventType
|
||||||
|
|
||||||
|
// Algo is the algorithm implementation used for shrinking the pool. The default is Adaptive().
|
||||||
Algo Algo
|
Algo Algo
|
||||||
|
|
||||||
|
// Clock is an optional clock meant to be used with testing. The main purpose is to avoid time sensitive
|
||||||
|
// tests running for a too long time.
|
||||||
Clock times.Clock
|
Clock times.Clock
|
||||||
|
|
||||||
|
// TestBus is an optional signal bus meant to be used with testing. The main purpose is to ensure that
|
||||||
|
// specific blocks of code are executed in a predefiend order during concurrent tests.
|
||||||
TestBus *syncbus.SyncBus
|
TestBus *syncbus.SyncBus
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Pool is a synchronized resource pool of resources, that are considered expensive to allocate. Initialize the
|
||||||
|
// pool with the Make() function. Methods of uninitialized Pool instances may block forever. For the usage of
|
||||||
|
// the pool, see the docs of its method, initialization options and the provided algorithms.
|
||||||
type Pool[R any] struct {
|
type Pool[R any] struct {
|
||||||
pool pool[R]
|
pool pool[R]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ErrEmpty is returned on Get calls when the pool is empty and it was initialized without an allocate function.
|
||||||
var ErrEmpty = errors.New("empty pool")
|
var ErrEmpty = errors.New("empty pool")
|
||||||
|
|
||||||
|
// String returns the string representation of the EventType binary flag, including all the flags that are set.
|
||||||
func (et EventType) String() string {
|
func (et EventType) String() string {
|
||||||
var s []string
|
var s []string
|
||||||
if et&GetOperation != 0 {
|
if et&GetOperation != 0 {
|
||||||
@ -92,6 +161,10 @@ func (et EventType) String() string {
|
|||||||
s = append(s, "allocate")
|
s = append(s, "allocate")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if et&LoadOperation != 0 {
|
||||||
|
s = append(s, "load")
|
||||||
|
}
|
||||||
|
|
||||||
if et&FreeOperation != 0 {
|
if et&FreeOperation != 0 {
|
||||||
s = append(s, "free")
|
s = append(s, "free")
|
||||||
}
|
}
|
||||||
@ -107,80 +180,115 @@ func (et EventType) String() string {
|
|||||||
return strings.Join(s, "|")
|
return strings.Join(s, "|")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// String returns the string representation of an Event value, including the type and statistics.
|
||||||
func (ev Event) String() string {
|
func (ev Event) String() string {
|
||||||
return fmt.Sprintf("%v; %v", ev.Type, ev.Stats)
|
return fmt.Sprintf("%v; %v", ev.Type, ev.Stats)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// String returns the string representation of a set of statistics about the pool.
|
||||||
func (s Stats) String() string {
|
func (s Stats) String() string {
|
||||||
return fmt.Sprintf(
|
return fmt.Sprintf(
|
||||||
"idle: %d, active: %d, get: %d, put: %d, alloc: %d, free: %d",
|
"idle: %d, active: %d, get: %d, put: %d, alloc: %d, load: %d, free: %d",
|
||||||
s.Idle,
|
s.Idle,
|
||||||
s.Active,
|
s.Active,
|
||||||
s.Get,
|
s.Get,
|
||||||
s.Put,
|
s.Put,
|
||||||
s.Alloc,
|
s.Alloc,
|
||||||
|
s.Load,
|
||||||
s.Free,
|
s.Free,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// zero-config
|
// Adaptive creates a zero-config pool shrink algorithm instance. It is the default algorithm used by the pool.
|
||||||
// potential caveats:
|
//
|
||||||
// - a caveaat depending on the expectations, since no absolute time input is used, identifies frequent
|
// It is based on exponential moving average of the active items and the deviation of it. This way it can react
|
||||||
// spikes from zero and slow grow and shrink from and to zero cycles are considered the same and the pool cleans
|
// to, and to some extent overbuild, on the perceived stress. It decays the number of idle items gradually, and
|
||||||
// up idle items accordingly. In short: _|_|_|_ = __/\__/\__/\__
|
// on very sudden drops in traffic, it ensures the eventual release of all pooled items with a background job,
|
||||||
|
// that is timed based on the duration of the last active usage session, which is the time while there were
|
||||||
|
// active items. Together with the pool implementation, it always reuses the most recent items, as in LIFO for
|
||||||
|
// Get and FIFO for Free.
|
||||||
|
//
|
||||||
|
// We need to be aware of some potential caveats due to its zero-config nature. It doesn't use any absolute
|
||||||
|
// values, like a timing parameter. It only considers the sequence of the pool states. This can result in
|
||||||
|
// behaviour that, without understanding this zero-config nature, might be unexpected. A trivial example is that
|
||||||
|
// the algorithm doesn't differentiate between periodic long grow/shrink/steady patterns and periodic short
|
||||||
|
// spikes/steady patterns. In short, it can happen that: __/\__/\__/\__ = _|_|_|_. Instead, it optimizes for
|
||||||
|
// having enough but no more 'capacity' (idle items) for the predicated 'load' (active items).
|
||||||
func Adaptive() Algo {
|
func Adaptive() Algo {
|
||||||
return makeAdaptiveAlgo()
|
return makeAdaptiveAlgo()
|
||||||
}
|
}
|
||||||
|
|
||||||
// enfoces a max pool size and a timeout for the items
|
// MaxTimeout creates a pool shrink algorithm instance, that releases items whenever the number of idle items
|
||||||
// when adding items to the pool via Put that were not fetched via Get, there can discrepancies occur in which
|
// would be greater than max, and it also releases those items that were idle for too long. Together with the
|
||||||
// items get timed out, but the general pool limitations get still consistently enforced eventually
|
// pool, it ensures that the Get operation is LIFO and the Free operation is FIFO.
|
||||||
|
//
|
||||||
|
// If max <= 0, the max pool size is not enforced. If to <= 0, the timeout is not enforced.
|
||||||
func MaxTimeout(max int, to time.Duration) Algo {
|
func MaxTimeout(max int, to time.Duration) Algo {
|
||||||
return makeMaxTimeout(max, to)
|
return makeMaxTimeout(max, to)
|
||||||
}
|
}
|
||||||
|
|
||||||
// like MaxTimeout but without enforcing timeouts
|
// Max is like MaxTimeout, but without the mas idle time.
|
||||||
func Max(max int) Algo {
|
func Max(max int) Algo {
|
||||||
return makeMaxTimeout(max, 0)
|
return makeMaxTimeout(max, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// like MaxTimeout but without enforcing max pool size
|
// Timeout is like MaxTimeout, but without the max pool size.
|
||||||
func Timeout(to time.Duration) Algo {
|
func Timeout(to time.Duration) Algo {
|
||||||
return makeMaxTimeout(0, to)
|
return makeMaxTimeout(0, to)
|
||||||
}
|
}
|
||||||
|
|
||||||
// the user code can decide not to put back items to the pool, however, the primary purpose is testing
|
// NoShrink is a noop shrink algorithm, it doesn't release any idle items. The user code can decide whether to
|
||||||
|
// put back items in the pool or not. It might be useful in certain testing scenarios.
|
||||||
func NoShrink() Algo {
|
func NoShrink() Algo {
|
||||||
return makeMaxTimeout(0, 0)
|
return makeMaxTimeout(0, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// alloc and free need to support calls from goroutines other than they were created in
|
// Make initializes a Pool instance.
|
||||||
// a single pool instance only calls them from a single goroutine at a time
|
//
|
||||||
// free happens synchronously, user code may execute it in the background, in which case it is the user code's
|
// The paramter alloc is used on Get operations when the pool is empty. If alloc is nil, and the pool is empty
|
||||||
// responsibility to ensure that free is fully carried out before the application exits, if that's necessary
|
// at the time of calling Get, Get will return ErrEmpty. If alloc returns an error, the same error is returned
|
||||||
|
// by Get. If events were configured, alloc triggers AllocateOperation event. This event is typically the same
|
||||||
|
// as the GetOperation event.
|
||||||
|
//
|
||||||
|
// The parameter free is called when an item is released from the pool, with the item being released as the
|
||||||
|
// argument. It can be nil for resource types that don't need explicit deallocation. If events were configured,
|
||||||
|
// releasing an item triggers a FreeOperation event, regardless if the free parameter is nil.
|
||||||
func Make[R any](alloc func() (R, error), free func(R), o Options) Pool[R] {
|
func Make[R any](alloc func() (R, error), free func(R), o Options) Pool[R] {
|
||||||
return Pool[R]{pool: makePool(alloc, free, o)}
|
return Pool[R]{pool: makePool(alloc, free, o)}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get returns an item from the pool. If the pool is empty and no allocation function was configured, it returns
|
||||||
|
// ErrEmpty. If the pool is empty, and the allocation function returns an error, it returns that error. If
|
||||||
|
// events were configured, Get triggers a GetOperation event.
|
||||||
func (p Pool[R]) Get() (R, error) {
|
func (p Pool[R]) Get() (R, error) {
|
||||||
return p.pool.get()
|
return p.pool.get()
|
||||||
}
|
}
|
||||||
|
|
||||||
// it is allowed to put items that were not received by get, but the selected algorithm may produce unexpected
|
// Put stores an item in the pool. If events were configured, it triggers a PutOperation event.
|
||||||
// behavior. In most cases, it is recommended to use Load instead, and using Put to put back only those items
|
//
|
||||||
// that were received via Get.
|
// It is recommended to use it only with items that were received by the Get method. While it is allowed to put
|
||||||
|
// other items in the pool, it may change the way the shrinking algorithm works. E.g. it can be considered as a
|
||||||
|
// sudden drop in the number of active items. If the pool needs to be prewarmed, or prepared for an expected
|
||||||
|
// spike of traffic, consider using the Load method.
|
||||||
func (p Pool[R]) Put(i R) {
|
func (p Pool[R]) Put(i R) {
|
||||||
p.pool.put(i)
|
p.pool.put(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Load can be used to populate the pool with items that were not allocated as the result of the Get operation.
|
||||||
|
// It can be useful in scenarios where prewarming or preparation for an expected sudden traffic spike is
|
||||||
|
// expected. If events were configured, it triggers a LoadOperation event.
|
||||||
func (p Pool[R]) Load(i []R) {
|
func (p Pool[R]) Load(i []R) {
|
||||||
p.pool.load(i)
|
p.pool.load(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stats returns statistics about the current state of the pool. It contains the current number of active/idle
|
||||||
|
// items, and perpetual counters for the various pool operations.
|
||||||
func (p Pool[R]) Stats() Stats {
|
func (p Pool[R]) Stats() Stats {
|
||||||
return p.pool.stats()
|
return p.pool.stats()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Free releases all idle items in the pool. While the pool stays operational, Free is meant to be used when the
|
||||||
|
// pool is not required anymore.
|
||||||
func (p Pool[R]) Free() {
|
func (p Pool[R]) Free() {
|
||||||
p.pool.freePool()
|
p.pool.freePool()
|
||||||
}
|
}
|
||||||
|
|||||||
2
license
2
license
@ -1,6 +1,6 @@
|
|||||||
MIT License
|
MIT License
|
||||||
|
|
||||||
Copyright (c) 2018 Arpad Ryszka
|
Copyright (c) 2026 Arpad Ryszka
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
of this software and associated documentation files (the "Software"), to deal
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
|||||||
@ -384,7 +384,7 @@ func TestMaxTO(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
s := p.Stats()
|
s := p.Stats()
|
||||||
e := pool.Stats{Idle: 1, Active: 0, Get: 8, Put: 8, Alloc: 0, Free: 14}
|
e := pool.Stats{Idle: 1, Active: 0, Get: 8, Put: 8, Alloc: 0, Load: initialCount, Free: 14}
|
||||||
if s != e {
|
if s != e {
|
||||||
t.Fatal(s)
|
t.Fatal(s)
|
||||||
}
|
}
|
||||||
@ -455,7 +455,7 @@ func TestMaxTO(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
s := p.Stats()
|
s := p.Stats()
|
||||||
e := pool.Stats{Idle: 1, Active: 23, Get: 39, Put: 16, Alloc: 20, Free: 11}
|
e := pool.Stats{Idle: 1, Active: 23, Get: 39, Put: 16, Alloc: 20, Load: adjustCount, Free: 11}
|
||||||
if s != e {
|
if s != e {
|
||||||
t.Fatal(s)
|
t.Fatal(s)
|
||||||
}
|
}
|
||||||
|
|||||||
3
pool.go
3
pool.go
@ -154,7 +154,10 @@ func (p pool[R]) load(i []R) {
|
|||||||
|
|
||||||
s.items = append(s.items, i...)
|
s.items = append(s.items, i...)
|
||||||
s.stats.Idle = len(s.items)
|
s.stats.Idle = len(s.items)
|
||||||
|
s.stats.Load += len(i)
|
||||||
p.options.Algo.Load(len(i))
|
p.options.Algo.Load(len(i))
|
||||||
|
event := LoadOperation
|
||||||
|
p.sendEvent(event, s.stats)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p pool[R]) forcedCheck(s state[R], timeout time.Duration) state[R] {
|
func (p pool[R]) forcedCheck(s state[R], timeout time.Duration) state[R] {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user