syncbus/syncbus.go

217 lines
4.6 KiB
Go
Raw Permalink Normal View History

2018-03-18 20:42:20 +01:00
/*
Package syncbus provides an event bus for testing.
SyncBus can be used to execute test instructions in a concurrent program in a predefined order. It is expected
to be used as a test hook shared between the production and the test code, or left nil if not required.
It provides a wait function for processes in goroutines that should continue only when a predefined signal is
set, or the timeout expires. If the predefined signals are already set at the time of calling wait, the
2018-03-18 21:05:53 +01:00
calling goroutine continues immediately. Once a signal is set, it stays so until it's cleared (reset).
2018-03-18 20:42:20 +01:00
Wait can expect one or more signals represented by keys. The signals don't need to be set simultaneously in
order to release a waiting goroutine. A wait continues once all the signals that it depends on were set.
*/
package syncbus
import (
"errors"
"time"
)
type waitItem struct {
keys []string
deadline time.Time
signal chan error
}
// SyncBus can be used to synchronize goroutines through signals.
type SyncBus struct {
2018-03-18 21:05:53 +01:00
timeout time.Duration
waiting []waitItem
signals map[string]bool
wait chan waitItem
signal chan []string
reset chan []string
resetAll chan struct{}
quit chan struct{}
2018-03-18 20:42:20 +01:00
}
// ErrTimeout is returned by Wait() when failed to receive all the signals in time.
var ErrTimeout = errors.New("timeout")
// New creates and initializes a new SyncBus. It uses a shared timeout for all the Wait calls.
func New(timeout time.Duration) *SyncBus {
b := &SyncBus{
2018-03-18 21:05:53 +01:00
timeout: timeout,
signals: make(map[string]bool),
wait: make(chan waitItem),
signal: make(chan []string),
reset: make(chan []string),
resetAll: make(chan struct{}),
quit: make(chan struct{}),
2018-03-18 20:42:20 +01:00
}
go b.run()
return b
}
func (b *SyncBus) nextTimeout(now time.Time) <-chan time.Time {
if len(b.waiting) == 0 {
return nil
}
to := b.waiting[0].deadline.Sub(time.Now())
return time.After(to)
}
func (b *SyncBus) addWaiting(now time.Time, w waitItem) {
w.deadline = now.Add(b.timeout)
b.waiting = append(b.waiting, w)
}
func (b *SyncBus) setSignal(keys []string) {
for _, key := range keys {
b.signals[key] = true
}
}
func (b *SyncBus) timeoutWaiting(now time.Time) {
for i, w := range b.waiting {
if w.deadline.After(now) {
b.waiting = b.waiting[i:]
return
}
w.signal <- ErrTimeout
}
b.waiting = nil
}
func (b *SyncBus) signalWaiting(now time.Time) {
var keep []waitItem
for _, w := range b.waiting {
var keepItem bool
for _, key := range w.keys {
if !b.signals[key] {
keepItem = true
break
}
}
if keepItem {
keep = append(keep, w)
continue
}
w.signal <- nil
}
b.waiting = keep
}
2018-03-18 21:05:53 +01:00
func (b *SyncBus) resetSignals(keys []string) {
for i := range keys {
delete(b.signals, keys[i])
}
}
func (b *SyncBus) resetAllSignals() {
b.signals = make(map[string]bool)
}
2018-03-18 20:42:20 +01:00
func (b *SyncBus) run() {
var to <-chan time.Time
for {
select {
case <-to:
now := time.Now()
b.timeoutWaiting(now)
to = b.nextTimeout(now)
case wait := <-b.wait:
now := time.Now()
b.addWaiting(now, wait)
b.signalWaiting(now)
to = b.nextTimeout(now)
case signal := <-b.signal:
now := time.Now()
b.setSignal(signal)
b.signalWaiting(now)
to = b.nextTimeout(now)
2018-03-18 21:05:53 +01:00
case reset := <-b.reset:
b.resetSignals(reset)
case <-b.resetAll:
b.resetAllSignals()
2018-03-18 20:42:20 +01:00
case <-b.quit:
return
}
}
}
// Wait blocks until all the signals represented by the keys are set, or
// returns an ErrTimeout if the timeout, counted from the call to Wait,
// expires.
//
// It returns only ErrTimeout or nil.
//
// If the receiver *SyncBus is nil, or no key argument is passed to it,
// it is a noop.
func (b *SyncBus) Wait(keys ...string) error {
if b == nil || len(keys) == 0 {
return nil
}
w := waitItem{
keys: keys,
signal: make(chan error, 1),
}
b.wait <- w
err := <-w.signal
return err
}
// Signal sets one or more signals represented by the keys.
//
// If the receiver *SyncBus is nil, or no key argument is passed to it,
// it is a noop.
func (b *SyncBus) Signal(keys ...string) {
if b == nil || len(keys) == 0 {
return
}
b.signal <- keys
}
2018-03-18 21:05:53 +01:00
// ResetSignals clears the set signals defined by the provided keys.
//
// If the receiver *SyncBus is nil, or no key argument is passed to it,
// it is a noop.
func (b *SyncBus) ResetSignals(keys ...string) {
if b == nil || len(keys) == 0 {
return
}
b.reset <- keys
}
2018-03-18 22:55:10 +01:00
// Reset clears all the signals.
2018-03-18 21:05:53 +01:00
//
// If the receiver *SyncBus is nil, it is a noop.
2018-03-18 22:55:10 +01:00
func (b *SyncBus) Reset() {
2018-03-18 21:05:53 +01:00
if b == nil {
return
}
b.resetAll <- struct{}{}
}
// Close tears down the SyncBus. If the receiver is nil, it is a noop.
2018-03-18 20:42:20 +01:00
func (b *SyncBus) Close() {
if b == nil {
return
}
close(b.quit)
}