syncbus/syncbus.go
2018-03-18 20:42:20 +01:00

176 lines
3.8 KiB
Go

/*
Package syncbus provides an event bus for testing.
SyncBus can be used to execute test instructions in a concurrent program in a predefined order. It is expected
to be used as a test hook shared between the production and the test code, or left nil if not required.
It provides a wait function for processes in goroutines that should continue only when a predefined signal is
set, or the timeout expires. If the predefined signals are already set at the time of calling wait, the
calling goroutine continues immediately. Once a signal is set, it stays so until it's cleared.
Wait can expect one or more signals represented by keys. The signals don't need to be set simultaneously in
order to release a waiting goroutine. A wait continues once all the signals that it depends on were set.
*/
package syncbus
import (
"errors"
"time"
)
type waitItem struct {
keys []string
deadline time.Time
signal chan error
}
// SyncBus can be used to synchronize goroutines through signals.
type SyncBus struct {
timeout time.Duration
waiting []waitItem
signals map[string]bool
wait chan waitItem
signal chan []string
quit chan struct{}
}
// ErrTimeout is returned by Wait() when failed to receive all the signals in time.
var ErrTimeout = errors.New("timeout")
// New creates and initializes a new SyncBus. It uses a shared timeout for all the Wait calls.
func New(timeout time.Duration) *SyncBus {
b := &SyncBus{
timeout: timeout,
signals: make(map[string]bool),
wait: make(chan waitItem),
signal: make(chan []string),
quit: make(chan struct{}),
}
go b.run()
return b
}
func (b *SyncBus) nextTimeout(now time.Time) <-chan time.Time {
if len(b.waiting) == 0 {
return nil
}
to := b.waiting[0].deadline.Sub(time.Now())
return time.After(to)
}
func (b *SyncBus) addWaiting(now time.Time, w waitItem) {
w.deadline = now.Add(b.timeout)
b.waiting = append(b.waiting, w)
}
func (b *SyncBus) setSignal(keys []string) {
for _, key := range keys {
b.signals[key] = true
}
}
func (b *SyncBus) timeoutWaiting(now time.Time) {
for i, w := range b.waiting {
if w.deadline.After(now) {
b.waiting = b.waiting[i:]
return
}
w.signal <- ErrTimeout
}
b.waiting = nil
}
func (b *SyncBus) signalWaiting(now time.Time) {
var keep []waitItem
for _, w := range b.waiting {
var keepItem bool
for _, key := range w.keys {
if !b.signals[key] {
keepItem = true
break
}
}
if keepItem {
keep = append(keep, w)
continue
}
w.signal <- nil
}
b.waiting = keep
}
func (b *SyncBus) run() {
var to <-chan time.Time
for {
select {
case <-to:
now := time.Now()
b.timeoutWaiting(now)
to = b.nextTimeout(now)
case wait := <-b.wait:
now := time.Now()
b.addWaiting(now, wait)
b.signalWaiting(now)
to = b.nextTimeout(now)
case signal := <-b.signal:
now := time.Now()
b.setSignal(signal)
b.signalWaiting(now)
to = b.nextTimeout(now)
case <-b.quit:
return
}
}
}
// Wait blocks until all the signals represented by the keys are set, or
// returns an ErrTimeout if the timeout, counted from the call to Wait,
// expires.
//
// It returns only ErrTimeout or nil.
//
// If the receiver *SyncBus is nil, or no key argument is passed to it,
// it is a noop.
func (b *SyncBus) Wait(keys ...string) error {
if b == nil || len(keys) == 0 {
return nil
}
w := waitItem{
keys: keys,
signal: make(chan error, 1),
}
b.wait <- w
err := <-w.signal
return err
}
// Signal sets one or more signals represented by the keys.
//
// If the receiver *SyncBus is nil, or no key argument is passed to it,
// it is a noop.
func (b *SyncBus) Signal(keys ...string) {
if b == nil || len(keys) == 0 {
return
}
b.signal <- keys
}
// Close tears down the SyncBus.
func (b *SyncBus) Close() {
if b == nil {
return
}
close(b.quit)
}