add reset signal feature

This commit is contained in:
Arpad Ryszka 2018-03-18 21:05:53 +01:00
parent 5fe5d8d8fb
commit e0a56abdd1
3 changed files with 136 additions and 17 deletions

View File

@ -30,16 +30,17 @@ func (s *Server) Resource() int {
} }
func Example() { func Example() {
s := &Server{} bus := syncbus.New(120 * time.Millisecond)
s.testBus = syncbus.New(120 * time.Millisecond)
s := &Server{}
s.testBus = bus
s.AsyncInit() s.AsyncInit()
if err := s.testBus.Wait("initialized"); err != nil {
if err := bus.Wait("initialized"); err != nil {
fmt.Println("failed:", err) fmt.Println("failed:", err)
} }
fmt.Println(s.Resource()) fmt.Println(s.Resource())
// Output: // Output:
// 42 // 42
} }

View File

@ -6,7 +6,7 @@ to be used as a test hook shared between the production and the test code, or le
It provides a wait function for processes in goroutines that should continue only when a predefined signal is It provides a wait function for processes in goroutines that should continue only when a predefined signal is
set, or the timeout expires. If the predefined signals are already set at the time of calling wait, the set, or the timeout expires. If the predefined signals are already set at the time of calling wait, the
calling goroutine continues immediately. Once a signal is set, it stays so until it's cleared. calling goroutine continues immediately. Once a signal is set, it stays so until it's cleared (reset).
Wait can expect one or more signals represented by keys. The signals don't need to be set simultaneously in Wait can expect one or more signals represented by keys. The signals don't need to be set simultaneously in
order to release a waiting goroutine. A wait continues once all the signals that it depends on were set. order to release a waiting goroutine. A wait continues once all the signals that it depends on were set.
@ -26,12 +26,14 @@ type waitItem struct {
// SyncBus can be used to synchronize goroutines through signals. // SyncBus can be used to synchronize goroutines through signals.
type SyncBus struct { type SyncBus struct {
timeout time.Duration timeout time.Duration
waiting []waitItem waiting []waitItem
signals map[string]bool signals map[string]bool
wait chan waitItem wait chan waitItem
signal chan []string signal chan []string
quit chan struct{} reset chan []string
resetAll chan struct{}
quit chan struct{}
} }
// ErrTimeout is returned by Wait() when failed to receive all the signals in time. // ErrTimeout is returned by Wait() when failed to receive all the signals in time.
@ -40,11 +42,13 @@ var ErrTimeout = errors.New("timeout")
// New creates and initializes a new SyncBus. It uses a shared timeout for all the Wait calls. // New creates and initializes a new SyncBus. It uses a shared timeout for all the Wait calls.
func New(timeout time.Duration) *SyncBus { func New(timeout time.Duration) *SyncBus {
b := &SyncBus{ b := &SyncBus{
timeout: timeout, timeout: timeout,
signals: make(map[string]bool), signals: make(map[string]bool),
wait: make(chan waitItem), wait: make(chan waitItem),
signal: make(chan []string), signal: make(chan []string),
quit: make(chan struct{}), reset: make(chan []string),
resetAll: make(chan struct{}),
quit: make(chan struct{}),
} }
go b.run() go b.run()
@ -106,6 +110,16 @@ func (b *SyncBus) signalWaiting(now time.Time) {
b.waiting = keep b.waiting = keep
} }
func (b *SyncBus) resetSignals(keys []string) {
for i := range keys {
delete(b.signals, keys[i])
}
}
func (b *SyncBus) resetAllSignals() {
b.signals = make(map[string]bool)
}
func (b *SyncBus) run() { func (b *SyncBus) run() {
var to <-chan time.Time var to <-chan time.Time
for { for {
@ -124,6 +138,10 @@ func (b *SyncBus) run() {
b.setSignal(signal) b.setSignal(signal)
b.signalWaiting(now) b.signalWaiting(now)
to = b.nextTimeout(now) to = b.nextTimeout(now)
case reset := <-b.reset:
b.resetSignals(reset)
case <-b.resetAll:
b.resetAllSignals()
case <-b.quit: case <-b.quit:
return return
} }
@ -165,7 +183,30 @@ func (b *SyncBus) Signal(keys ...string) {
b.signal <- keys b.signal <- keys
} }
// Close tears down the SyncBus. // ResetSignals clears the set signals defined by the provided keys.
//
// If the receiver *SyncBus is nil, or no key argument is passed to it,
// it is a noop.
func (b *SyncBus) ResetSignals(keys ...string) {
if b == nil || len(keys) == 0 {
return
}
b.reset <- keys
}
// ResetAllSignals clears all the set signals.
//
// If the receiver *SyncBus is nil, it is a noop.
func (b *SyncBus) ResetAllSignals() {
if b == nil {
return
}
b.resetAll <- struct{}{}
}
// Close tears down the SyncBus. If the receiver is nil, it is a noop.
func (b *SyncBus) Close() { func (b *SyncBus) Close() {
if b == nil { if b == nil {
return return

View File

@ -76,6 +76,34 @@ func TestNilSignal(t *testing.T) {
} }
} }
func TestNilReset(t *testing.T) {
var bus *SyncBus
tw := newTestWait(1)
go func() {
bus.Signal("test")
tw.done()
}()
bus.ResetSignals("test")
if err := tw.wait(); err != nil {
t.Error(err)
}
}
func TestNilResetAll(t *testing.T) {
var bus *SyncBus
tw := newTestWait(1)
go func() {
bus.Signal("test")
tw.done()
}()
bus.ResetAllSignals()
if err := tw.wait(); err != nil {
t.Error(err)
}
}
func TestNilClose(t *testing.T) { func TestNilClose(t *testing.T) {
var bus *SyncBus var bus *SyncBus
bus.Close() bus.Close()
@ -101,6 +129,20 @@ func TestEmptySignal(t *testing.T) {
} }
} }
func TestEmptyRest(t *testing.T) {
bus := New(120 * time.Millisecond)
tw := newTestWait(1)
go func() {
bus.Signal()
tw.done()
}()
bus.ResetSignals()
if err := tw.wait(); err != nil {
t.Error(err)
}
}
func TestTimeout(t *testing.T) { func TestTimeout(t *testing.T) {
bus := New(12 * time.Millisecond) bus := New(12 * time.Millisecond)
defer bus.Close() defer bus.Close()
@ -212,3 +254,38 @@ func TestMultiKeySignal(t *testing.T) {
t.Error(err) t.Error(err)
} }
} }
func TestSignalBeforeWait(t *testing.T) {
bus := New(120 * time.Millisecond)
defer bus.Close()
bus.Signal("foo")
if err := bus.Wait("foo"); err != nil {
t.Error(err)
}
}
func TestReset(t *testing.T) {
bus := New(12 * time.Millisecond)
defer bus.Close()
bus.Signal("foo")
bus.ResetSignals("foo")
if err := bus.Wait("foo"); err != ErrTimeout {
t.Error("failed to timeout")
}
}
func TestResetAll(t *testing.T) {
bus := New(12 * time.Millisecond)
defer bus.Close()
bus.Signal("foo")
bus.Signal("bar")
bus.Signal("baz")
bus.ResetAllSignals()
if err := bus.Wait("foo", "bar", "baz"); err != ErrTimeout {
t.Error("failed to timeout")
}
}