initial implementation
This commit is contained in:
commit
99e8b6f6b6
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
.cover
|
||||
31
Makefile
Normal file
31
Makefile
Normal file
@ -0,0 +1,31 @@
|
||||
sources = $(shell find . -name "*.go")
|
||||
|
||||
default: build
|
||||
|
||||
build: $(sources)
|
||||
go build
|
||||
|
||||
fmt: $(sources)
|
||||
go fmt
|
||||
|
||||
check: $(sources)
|
||||
go test -count 1
|
||||
|
||||
race: $(source)
|
||||
go test -count 1 -race
|
||||
|
||||
.cover: $(sources)
|
||||
go test -count 1 -coverprofile .cover
|
||||
|
||||
cover: .cover
|
||||
go tool cover -func .cover
|
||||
|
||||
showcover: .cover
|
||||
go tool cover -html .cover
|
||||
|
||||
bench: $(sources)
|
||||
go test -bench Benchmark -run ^$
|
||||
|
||||
clean:
|
||||
go clean
|
||||
rm .cover
|
||||
163
algo.go
Normal file
163
algo.go
Normal file
@ -0,0 +1,163 @@
|
||||
package pool
|
||||
|
||||
import "time"
|
||||
|
||||
const (
|
||||
// arbitrary values to be most likely out of sync with anything else:
|
||||
minNightshiftTime = 729 * time.Millisecond // ~1sec
|
||||
maxNightshiftTime = 59049 * time.Second // ~2/3day
|
||||
)
|
||||
|
||||
type adaptive struct {
|
||||
lastTurnedActive time.Time
|
||||
nightshiftTime time.Duration
|
||||
idle bool
|
||||
average int
|
||||
deviation int
|
||||
}
|
||||
|
||||
type maxTimeout struct {
|
||||
max int
|
||||
to time.Duration
|
||||
items []time.Time
|
||||
}
|
||||
|
||||
func makeAdaptiveAlgo() *adaptive {
|
||||
return &adaptive{idle: true}
|
||||
}
|
||||
|
||||
func abs(v int) int {
|
||||
if v >= 0 {
|
||||
return v
|
||||
}
|
||||
|
||||
return 0 - v
|
||||
}
|
||||
|
||||
func divE(v int) int {
|
||||
return (3 * v) >> 3 // 1 / 2.72 => 3 / 8
|
||||
}
|
||||
|
||||
func mulE(v int) int {
|
||||
return (11 * v) >> 2 // 2.72 => 11 / 4
|
||||
}
|
||||
|
||||
func movingAverage(prev, currv int) int {
|
||||
return prev + divE(currv-prev)
|
||||
}
|
||||
|
||||
func movingAbsoluteDeviation(prev, currv, currav int) int {
|
||||
return prev + divE(abs(currv-currav)-prev)
|
||||
}
|
||||
|
||||
func targetCapacity(av, dev int) int {
|
||||
return av + mulE(dev)
|
||||
}
|
||||
|
||||
func (a *adaptive) target(s Stats) int {
|
||||
av := movingAverage(a.average, s.Active)
|
||||
dev := movingAbsoluteDeviation(a.deviation, s.Active, av)
|
||||
a.average = av
|
||||
a.deviation = dev
|
||||
return targetCapacity(av, dev)
|
||||
}
|
||||
|
||||
func (a *adaptive) nightshift(s Stats) time.Duration {
|
||||
if a.idle && s.Active == 0 {
|
||||
return a.nightshiftTime
|
||||
}
|
||||
|
||||
if !a.idle && s.Active > 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
a.idle = !a.idle
|
||||
if !a.idle {
|
||||
a.lastTurnedActive = now
|
||||
return 0
|
||||
}
|
||||
|
||||
a.nightshiftTime = now.Sub(a.lastTurnedActive)
|
||||
a.nightshiftTime = (3 * a.nightshiftTime) >> 3
|
||||
if a.nightshiftTime < minNightshiftTime {
|
||||
a.nightshiftTime = minNightshiftTime
|
||||
}
|
||||
|
||||
if a.nightshiftTime > maxNightshiftTime {
|
||||
a.nightshiftTime = maxNightshiftTime
|
||||
}
|
||||
|
||||
return a.nightshiftTime
|
||||
}
|
||||
|
||||
func (a *adaptive) Target(s Stats) (int, time.Duration) {
|
||||
t := a.target(s)
|
||||
ns := a.nightshift(s)
|
||||
return t, ns
|
||||
}
|
||||
|
||||
func makeMaxTimeout(max int, to time.Duration) *maxTimeout {
|
||||
return &maxTimeout{
|
||||
max: max,
|
||||
to: to,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *maxTimeout) Target(s Stats) (int, time.Duration) {
|
||||
if a.max <= 0 && a.to <= 0 {
|
||||
return s.Idle, 0
|
||||
}
|
||||
|
||||
if a.max > 0 && a.to <= 0 {
|
||||
t := s.Idle
|
||||
if t > a.max {
|
||||
t = a.max
|
||||
}
|
||||
|
||||
return t, 0
|
||||
}
|
||||
|
||||
var zero time.Time
|
||||
if len(a.items) > s.Idle {
|
||||
for i := s.Idle; i < len(a.items); i++ {
|
||||
a.items[i] = zero
|
||||
}
|
||||
|
||||
a.items = a.items[:s.Idle]
|
||||
if len(a.items) == 0 {
|
||||
a.items = nil
|
||||
}
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
for len(a.items) < s.Idle {
|
||||
a.items = append(a.items, now)
|
||||
}
|
||||
|
||||
var drop int
|
||||
for drop < len(a.items) && a.items[drop].Add(a.to).Before(now) {
|
||||
a.items[drop] = zero
|
||||
drop++
|
||||
}
|
||||
|
||||
if drop > 0 {
|
||||
for i := 0; i < drop; i++ {
|
||||
a.items[i] = zero
|
||||
}
|
||||
|
||||
a.items = a.items[drop:]
|
||||
}
|
||||
|
||||
if len(a.items) == 0 {
|
||||
a.items = nil
|
||||
return 0, 0
|
||||
}
|
||||
|
||||
t := len(a.items)
|
||||
if a.max > 0 && t > a.max {
|
||||
t = a.max
|
||||
}
|
||||
|
||||
return t, a.items[0].Add(a.to).Sub(now)
|
||||
}
|
||||
44
algo_test.go
Normal file
44
algo_test.go
Normal file
@ -0,0 +1,44 @@
|
||||
package pool_test
|
||||
|
||||
import (
|
||||
"code.squareroundforest.org/arpio/pool"
|
||||
"time"
|
||||
)
|
||||
|
||||
type maxAlgo int
|
||||
|
||||
type timeoutAlgo struct {
|
||||
to time.Duration
|
||||
items []time.Time
|
||||
}
|
||||
|
||||
func (a maxAlgo) Target(s pool.Stats) (int, time.Duration) {
|
||||
if s.Idle <= int(a) {
|
||||
return s.Idle, 0
|
||||
}
|
||||
|
||||
return int(a), 0
|
||||
}
|
||||
|
||||
func (a *timeoutAlgo) Target(s pool.Stats) (int, time.Duration) {
|
||||
now := time.Now()
|
||||
for len(a.items) < s.Idle {
|
||||
a.items = append(a.items, now)
|
||||
}
|
||||
|
||||
t := s.Idle
|
||||
for len(a.items) > 0 && a.items[0].Add(a.to).Before(now) {
|
||||
t--
|
||||
a.items = a.items[1:]
|
||||
}
|
||||
|
||||
if t < 0 {
|
||||
t = 0
|
||||
}
|
||||
|
||||
if len(a.items) == 0 {
|
||||
return t, 0
|
||||
}
|
||||
|
||||
return t, a.items[0].Add(a.to).Sub(now)
|
||||
}
|
||||
1
event_test.go
Normal file
1
event_test.go
Normal file
@ -0,0 +1 @@
|
||||
package pool_test
|
||||
3
go.mod
Normal file
3
go.mod
Normal file
@ -0,0 +1,3 @@
|
||||
module code.squareroundforest.org/arpio/pool
|
||||
|
||||
go 1.25.6
|
||||
114
lib.go
Normal file
114
lib.go
Normal file
@ -0,0 +1,114 @@
|
||||
// pool with support for:
|
||||
// - finalizing items
|
||||
// - monitoring: events and/or stats
|
||||
// - adaptive shrinking algorithm
|
||||
// - custom shrinking algorithms
|
||||
package pool
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Stats struct {
|
||||
Idle int
|
||||
Active int
|
||||
Get int
|
||||
Put int
|
||||
Alloc int
|
||||
Free int
|
||||
}
|
||||
|
||||
type EventType int
|
||||
|
||||
const (
|
||||
None EventType = 0
|
||||
GetOperation EventType = 1 << iota
|
||||
PutOperation
|
||||
AllocateOperation
|
||||
FreeOperation
|
||||
AllocateError
|
||||
)
|
||||
|
||||
type Event struct {
|
||||
Type EventType
|
||||
Stats Stats
|
||||
}
|
||||
|
||||
type Algo interface {
|
||||
|
||||
// always called
|
||||
// desired idle items
|
||||
// implementations should consider the cost of freeing the stored resources
|
||||
// must support being called from a goroutine other than it was created in
|
||||
// a single pool instance only calls it from a single goroutine at a time
|
||||
// items need to be allocated always by calling Get
|
||||
// second return argument for requested next check
|
||||
Target(Stats) (int, time.Duration)
|
||||
}
|
||||
|
||||
type Options struct {
|
||||
|
||||
// events can be dropped if the consumer is blocked
|
||||
Events chan<- Event
|
||||
|
||||
EventMask EventType
|
||||
Algo Algo
|
||||
}
|
||||
|
||||
type Pool[R any] struct {
|
||||
pool pool[R]
|
||||
}
|
||||
|
||||
var ErrEmptyPool = errors.New("empty pool")
|
||||
|
||||
// zero-config
|
||||
func Adaptive() Algo {
|
||||
return makeAdaptiveAlgo()
|
||||
}
|
||||
|
||||
// enfoces a max pool size and a timeout for the items
|
||||
// when adding items to the pool via Put that were not fetched via Get, there can discrepancies occur in which
|
||||
// items get timed out, but the general pool limitations get still consistently enforced eventually
|
||||
func MaxTimeout(max int, to time.Duration) Algo {
|
||||
return makeMaxTimeout(max, to)
|
||||
}
|
||||
|
||||
// like MaxTimeout but without enforcing timeouts
|
||||
func Max(max int) Algo {
|
||||
return makeMaxTimeout(max, 0)
|
||||
}
|
||||
|
||||
// like MaxTimeout but without enforcing max pool size
|
||||
func Timeout(to time.Duration) Algo {
|
||||
return makeMaxTimeout(0, to)
|
||||
}
|
||||
|
||||
// the user code can decide not to put back items to the pool
|
||||
func NoShrink() Algo {
|
||||
return makeMaxTimeout(0, 0)
|
||||
}
|
||||
|
||||
// alloc and free need to support calls from goroutines other than they were created in
|
||||
// a single pool instance only calls them from a single goroutine at a time
|
||||
// free happens synchronously, user code may execute it in the background, in which case it is the user code's
|
||||
// responsibility to ensure that free is fully carried out before the application exits, if that's necessary
|
||||
func Make[R any](alloc func() (R, error), free func(R), o Options) Pool[R] {
|
||||
return Pool[R]{pool: makePool(alloc, free, o)}
|
||||
}
|
||||
|
||||
func (p Pool[R]) Get() (R, error) {
|
||||
return p.pool.get()
|
||||
}
|
||||
|
||||
func (p Pool[R]) Put(i R) {
|
||||
p.pool.put(i)
|
||||
}
|
||||
|
||||
func (p Pool[R]) Stats() Stats {
|
||||
return p.pool.stats()
|
||||
}
|
||||
|
||||
func (p Pool[R]) Free() {
|
||||
p.pool.freePool()
|
||||
}
|
||||
21
license
Normal file
21
license
Normal file
@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2018 Arpad Ryszka
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
226
pool.go
Normal file
226
pool.go
Normal file
@ -0,0 +1,226 @@
|
||||
package pool
|
||||
|
||||
import "time"
|
||||
|
||||
type state[R any] struct {
|
||||
items []R
|
||||
stats Stats
|
||||
forcedCheckPending bool
|
||||
}
|
||||
|
||||
type pool[R any] struct {
|
||||
state chan state[R]
|
||||
alloc func() (R, error)
|
||||
free func(r R)
|
||||
options Options
|
||||
}
|
||||
|
||||
func makePool[R any](alloc func() (R, error), free func(r R), o Options) pool[R] {
|
||||
if o.Algo == nil {
|
||||
o.Algo = Adaptive()
|
||||
}
|
||||
|
||||
s := make(chan state[R], 1)
|
||||
s <- state[R]{}
|
||||
return pool[R]{
|
||||
state: s,
|
||||
alloc: alloc,
|
||||
free: free,
|
||||
options: o,
|
||||
}
|
||||
}
|
||||
|
||||
func (p pool[R]) stats() Stats {
|
||||
s := <-p.state
|
||||
defer func() {
|
||||
p.state <- s
|
||||
}()
|
||||
|
||||
return s.stats
|
||||
}
|
||||
|
||||
func (p pool[R]) sendEvent(e EventType, s Stats) {
|
||||
if p.options.Events == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if p.options.EventMask&e == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
ev := Event{
|
||||
Type: e,
|
||||
Stats: s,
|
||||
}
|
||||
|
||||
select {
|
||||
case p.options.Events <- ev:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (p pool[R]) get() (R, error) {
|
||||
s := <-p.state
|
||||
defer func() {
|
||||
p.state <- s
|
||||
}()
|
||||
|
||||
var (
|
||||
r R
|
||||
event EventType
|
||||
err error
|
||||
)
|
||||
|
||||
event |= GetOperation
|
||||
s.stats.Get++
|
||||
switch {
|
||||
case len(s.items) == 0 && p.alloc == nil:
|
||||
s.stats.Alloc++
|
||||
event |= AllocateOperation
|
||||
event |= AllocateError
|
||||
err = ErrEmptyPool
|
||||
case len(s.items) == 0:
|
||||
s.stats.Alloc++
|
||||
event |= AllocateOperation
|
||||
r, err = p.alloc()
|
||||
if err != nil {
|
||||
event |= AllocateError
|
||||
} else {
|
||||
s.stats.Active++
|
||||
}
|
||||
default:
|
||||
r, s.items = s.items[len(s.items)-1], s.items[:len(s.items)-1]
|
||||
if len(s.items) == 0 {
|
||||
s.items = nil
|
||||
}
|
||||
|
||||
s.stats.Active++
|
||||
s.stats.Idle = len(s.items)
|
||||
}
|
||||
|
||||
p.sendEvent(event, s.stats)
|
||||
return r, err
|
||||
}
|
||||
|
||||
func (p pool[R]) put(r R) {
|
||||
s := <-p.state
|
||||
defer func() {
|
||||
p.state <- s
|
||||
}()
|
||||
|
||||
var event EventType
|
||||
event |= PutOperation
|
||||
s.stats.Put++
|
||||
s.stats.Idle++
|
||||
|
||||
// one may put in items that were allocated outside of the pool:
|
||||
if s.stats.Active > 0 {
|
||||
s.stats.Active--
|
||||
}
|
||||
|
||||
t, f := p.options.Algo.Target(s.stats)
|
||||
switch {
|
||||
case t > len(s.items):
|
||||
s.items = append(s.items, r)
|
||||
default:
|
||||
event |= FreeOperation
|
||||
s = p.freeItems(s, t, r)
|
||||
}
|
||||
|
||||
// fix provisioned idle count:
|
||||
s.stats.Idle = len(s.items)
|
||||
p.sendEvent(event, s.stats)
|
||||
if f > 0 {
|
||||
s = p.forcedCheck(s, f)
|
||||
}
|
||||
}
|
||||
|
||||
func (p pool[R]) forcedCheck(s state[R], timeout time.Duration) state[R] {
|
||||
if s.forcedCheckPending {
|
||||
return s
|
||||
}
|
||||
|
||||
s.forcedCheckPending = true
|
||||
go func(to time.Duration) {
|
||||
<-time.After(to)
|
||||
p.freeIdle()
|
||||
}(timeout)
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (p pool[R]) freeItems(s state[R], target int, additional ...R) state[R] {
|
||||
if len(s.items)+len(additional) <= target {
|
||||
return s
|
||||
}
|
||||
|
||||
s.stats.Free += len(additional)
|
||||
if p.free == nil && len(s.items) <= target {
|
||||
return s
|
||||
}
|
||||
|
||||
var f []R
|
||||
if p.free != nil {
|
||||
f = additional
|
||||
if len(s.items) > target {
|
||||
f = append(f, s.items[:len(s.items)-target]...)
|
||||
}
|
||||
}
|
||||
|
||||
if len(s.items) > target {
|
||||
s.stats.Free += len(s.items) - target
|
||||
|
||||
var zero R
|
||||
for i := 0; i < len(s.items)-target; i++ {
|
||||
s.items[i] = zero
|
||||
}
|
||||
|
||||
s.items = s.items[len(s.items)-target:]
|
||||
if len(s.items) == 0 {
|
||||
s.items = nil
|
||||
}
|
||||
}
|
||||
|
||||
if p.free == nil {
|
||||
return s
|
||||
}
|
||||
|
||||
for _, fi := range f {
|
||||
p.free(fi)
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (p pool[R]) freeIdle() {
|
||||
s := <-p.state
|
||||
defer func() {
|
||||
p.state <- s
|
||||
}()
|
||||
|
||||
s.forcedCheckPending = false
|
||||
t, f := p.options.Algo.Target(s.stats)
|
||||
prev := s.stats.Free
|
||||
s = p.freeItems(s, t)
|
||||
if s.stats.Free > prev {
|
||||
p.sendEvent(FreeOperation, s.stats)
|
||||
}
|
||||
|
||||
if f > 0 {
|
||||
s = p.forcedCheck(s, f)
|
||||
}
|
||||
}
|
||||
|
||||
func (p pool[R]) freePool() {
|
||||
s := <-p.state
|
||||
defer func() {
|
||||
p.state <- s
|
||||
}()
|
||||
|
||||
s.stats.Idle = 0
|
||||
prev := s.stats.Free
|
||||
s = p.freeItems(s, 0)
|
||||
if s.stats.Free > prev {
|
||||
p.sendEvent(FreeOperation, s.stats)
|
||||
}
|
||||
}
|
||||
22
pool_test.go
Normal file
22
pool_test.go
Normal file
@ -0,0 +1,22 @@
|
||||
package pool_test
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestPool(t *testing.T) {
|
||||
// initial stats
|
||||
// get empty
|
||||
// get pooled own
|
||||
// get pooled foreign
|
||||
// get own no alloc
|
||||
// get foreign no alloc
|
||||
// put own
|
||||
// put foreign empty
|
||||
// put foreign not empty
|
||||
// release on put no free
|
||||
// release on put with free
|
||||
// release all no free
|
||||
// release all with free
|
||||
// release on timeout no free
|
||||
// release on timeout with free
|
||||
// use default algo
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user