2018-03-18 20:42:20 +01:00
|
|
|
/*
|
|
|
|
Package syncbus provides an event bus for testing.
|
|
|
|
|
|
|
|
SyncBus can be used to execute test instructions in a concurrent program in a predefined order. It is expected
|
|
|
|
to be used as a test hook shared between the production and the test code, or left nil if not required.
|
|
|
|
|
|
|
|
It provides a wait function for processes in goroutines that should continue only when a predefined signal is
|
|
|
|
set, or the timeout expires. If the predefined signals are already set at the time of calling wait, the
|
2018-03-18 21:05:53 +01:00
|
|
|
calling goroutine continues immediately. Once a signal is set, it stays so until it's cleared (reset).
|
2018-03-18 20:42:20 +01:00
|
|
|
|
|
|
|
Wait can expect one or more signals represented by keys. The signals don't need to be set simultaneously in
|
|
|
|
order to release a waiting goroutine. A wait continues once all the signals that it depends on were set.
|
|
|
|
*/
|
|
|
|
package syncbus
|
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
|
|
|
type waitItem struct {
|
|
|
|
keys []string
|
|
|
|
deadline time.Time
|
|
|
|
signal chan error
|
|
|
|
}
|
|
|
|
|
|
|
|
// SyncBus can be used to synchronize goroutines through signals.
|
|
|
|
type SyncBus struct {
|
2018-03-18 21:05:53 +01:00
|
|
|
timeout time.Duration
|
|
|
|
waiting []waitItem
|
|
|
|
signals map[string]bool
|
|
|
|
wait chan waitItem
|
|
|
|
signal chan []string
|
|
|
|
reset chan []string
|
|
|
|
resetAll chan struct{}
|
|
|
|
quit chan struct{}
|
2018-03-18 20:42:20 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// ErrTimeout is returned by Wait() when failed to receive all the signals in time.
|
|
|
|
var ErrTimeout = errors.New("timeout")
|
|
|
|
|
|
|
|
// New creates and initializes a new SyncBus. It uses a shared timeout for all the Wait calls.
|
|
|
|
func New(timeout time.Duration) *SyncBus {
|
|
|
|
b := &SyncBus{
|
2018-03-18 21:05:53 +01:00
|
|
|
timeout: timeout,
|
|
|
|
signals: make(map[string]bool),
|
|
|
|
wait: make(chan waitItem),
|
|
|
|
signal: make(chan []string),
|
|
|
|
reset: make(chan []string),
|
|
|
|
resetAll: make(chan struct{}),
|
|
|
|
quit: make(chan struct{}),
|
2018-03-18 20:42:20 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
go b.run()
|
|
|
|
return b
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *SyncBus) nextTimeout(now time.Time) <-chan time.Time {
|
|
|
|
if len(b.waiting) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
to := b.waiting[0].deadline.Sub(time.Now())
|
|
|
|
return time.After(to)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *SyncBus) addWaiting(now time.Time, w waitItem) {
|
|
|
|
w.deadline = now.Add(b.timeout)
|
|
|
|
b.waiting = append(b.waiting, w)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *SyncBus) setSignal(keys []string) {
|
|
|
|
for _, key := range keys {
|
|
|
|
b.signals[key] = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *SyncBus) timeoutWaiting(now time.Time) {
|
|
|
|
for i, w := range b.waiting {
|
|
|
|
if w.deadline.After(now) {
|
|
|
|
b.waiting = b.waiting[i:]
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
w.signal <- ErrTimeout
|
|
|
|
}
|
|
|
|
|
|
|
|
b.waiting = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *SyncBus) signalWaiting(now time.Time) {
|
|
|
|
var keep []waitItem
|
|
|
|
for _, w := range b.waiting {
|
|
|
|
var keepItem bool
|
|
|
|
for _, key := range w.keys {
|
|
|
|
if !b.signals[key] {
|
|
|
|
keepItem = true
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if keepItem {
|
|
|
|
keep = append(keep, w)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
w.signal <- nil
|
|
|
|
}
|
|
|
|
|
|
|
|
b.waiting = keep
|
|
|
|
}
|
|
|
|
|
2018-03-18 21:05:53 +01:00
|
|
|
func (b *SyncBus) resetSignals(keys []string) {
|
|
|
|
for i := range keys {
|
|
|
|
delete(b.signals, keys[i])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (b *SyncBus) resetAllSignals() {
|
|
|
|
b.signals = make(map[string]bool)
|
|
|
|
}
|
|
|
|
|
2018-03-18 20:42:20 +01:00
|
|
|
func (b *SyncBus) run() {
|
|
|
|
var to <-chan time.Time
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-to:
|
|
|
|
now := time.Now()
|
|
|
|
b.timeoutWaiting(now)
|
|
|
|
to = b.nextTimeout(now)
|
|
|
|
case wait := <-b.wait:
|
|
|
|
now := time.Now()
|
|
|
|
b.addWaiting(now, wait)
|
|
|
|
b.signalWaiting(now)
|
|
|
|
to = b.nextTimeout(now)
|
|
|
|
case signal := <-b.signal:
|
|
|
|
now := time.Now()
|
|
|
|
b.setSignal(signal)
|
|
|
|
b.signalWaiting(now)
|
|
|
|
to = b.nextTimeout(now)
|
2018-03-18 21:05:53 +01:00
|
|
|
case reset := <-b.reset:
|
|
|
|
b.resetSignals(reset)
|
|
|
|
case <-b.resetAll:
|
|
|
|
b.resetAllSignals()
|
2018-03-18 20:42:20 +01:00
|
|
|
case <-b.quit:
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Wait blocks until all the signals represented by the keys are set, or
|
|
|
|
// returns an ErrTimeout if the timeout, counted from the call to Wait,
|
|
|
|
// expires.
|
|
|
|
//
|
|
|
|
// It returns only ErrTimeout or nil.
|
|
|
|
//
|
|
|
|
// If the receiver *SyncBus is nil, or no key argument is passed to it,
|
|
|
|
// it is a noop.
|
|
|
|
func (b *SyncBus) Wait(keys ...string) error {
|
|
|
|
if b == nil || len(keys) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
w := waitItem{
|
|
|
|
keys: keys,
|
|
|
|
signal: make(chan error, 1),
|
|
|
|
}
|
|
|
|
|
|
|
|
b.wait <- w
|
|
|
|
err := <-w.signal
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Signal sets one or more signals represented by the keys.
|
|
|
|
//
|
|
|
|
// If the receiver *SyncBus is nil, or no key argument is passed to it,
|
|
|
|
// it is a noop.
|
|
|
|
func (b *SyncBus) Signal(keys ...string) {
|
|
|
|
if b == nil || len(keys) == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
b.signal <- keys
|
|
|
|
}
|
|
|
|
|
2018-03-18 21:05:53 +01:00
|
|
|
// ResetSignals clears the set signals defined by the provided keys.
|
|
|
|
//
|
|
|
|
// If the receiver *SyncBus is nil, or no key argument is passed to it,
|
|
|
|
// it is a noop.
|
|
|
|
func (b *SyncBus) ResetSignals(keys ...string) {
|
|
|
|
if b == nil || len(keys) == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
b.reset <- keys
|
|
|
|
}
|
|
|
|
|
|
|
|
// ResetAllSignals clears all the set signals.
|
|
|
|
//
|
|
|
|
// If the receiver *SyncBus is nil, it is a noop.
|
|
|
|
func (b *SyncBus) ResetAllSignals() {
|
|
|
|
if b == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
b.resetAll <- struct{}{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close tears down the SyncBus. If the receiver is nil, it is a noop.
|
2018-03-18 20:42:20 +01:00
|
|
|
func (b *SyncBus) Close() {
|
|
|
|
if b == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
close(b.quit)
|
|
|
|
}
|