From e0a56abdd1cb4aefd093366bd2b068f26c405061 Mon Sep 17 00:00:00 2001 From: Arpad Ryszka Date: Sun, 18 Mar 2018 21:05:53 +0100 Subject: [PATCH] add reset signal feature --- example_test.go | 9 +++--- syncbus.go | 67 +++++++++++++++++++++++++++++++++--------- syncbus_test.go | 77 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 136 insertions(+), 17 deletions(-) diff --git a/example_test.go b/example_test.go index 6d58df1..52d0813 100644 --- a/example_test.go +++ b/example_test.go @@ -30,16 +30,17 @@ func (s *Server) Resource() int { } func Example() { - s := &Server{} - s.testBus = syncbus.New(120 * time.Millisecond) + bus := syncbus.New(120 * time.Millisecond) + s := &Server{} + s.testBus = bus s.AsyncInit() - if err := s.testBus.Wait("initialized"); err != nil { + + if err := bus.Wait("initialized"); err != nil { fmt.Println("failed:", err) } fmt.Println(s.Resource()) - // Output: // 42 } diff --git a/syncbus.go b/syncbus.go index dd1139c..d1d1ea5 100644 --- a/syncbus.go +++ b/syncbus.go @@ -6,7 +6,7 @@ to be used as a test hook shared between the production and the test code, or le It provides a wait function for processes in goroutines that should continue only when a predefined signal is set, or the timeout expires. If the predefined signals are already set at the time of calling wait, the -calling goroutine continues immediately. Once a signal is set, it stays so until it's cleared. +calling goroutine continues immediately. Once a signal is set, it stays so until it's cleared (reset). Wait can expect one or more signals represented by keys. The signals don't need to be set simultaneously in order to release a waiting goroutine. A wait continues once all the signals that it depends on were set. @@ -26,12 +26,14 @@ type waitItem struct { // SyncBus can be used to synchronize goroutines through signals. type SyncBus struct { - timeout time.Duration - waiting []waitItem - signals map[string]bool - wait chan waitItem - signal chan []string - quit chan struct{} + timeout time.Duration + waiting []waitItem + signals map[string]bool + wait chan waitItem + signal chan []string + reset chan []string + resetAll chan struct{} + quit chan struct{} } // ErrTimeout is returned by Wait() when failed to receive all the signals in time. @@ -40,11 +42,13 @@ var ErrTimeout = errors.New("timeout") // New creates and initializes a new SyncBus. It uses a shared timeout for all the Wait calls. func New(timeout time.Duration) *SyncBus { b := &SyncBus{ - timeout: timeout, - signals: make(map[string]bool), - wait: make(chan waitItem), - signal: make(chan []string), - quit: make(chan struct{}), + timeout: timeout, + signals: make(map[string]bool), + wait: make(chan waitItem), + signal: make(chan []string), + reset: make(chan []string), + resetAll: make(chan struct{}), + quit: make(chan struct{}), } go b.run() @@ -106,6 +110,16 @@ func (b *SyncBus) signalWaiting(now time.Time) { b.waiting = keep } +func (b *SyncBus) resetSignals(keys []string) { + for i := range keys { + delete(b.signals, keys[i]) + } +} + +func (b *SyncBus) resetAllSignals() { + b.signals = make(map[string]bool) +} + func (b *SyncBus) run() { var to <-chan time.Time for { @@ -124,6 +138,10 @@ func (b *SyncBus) run() { b.setSignal(signal) b.signalWaiting(now) to = b.nextTimeout(now) + case reset := <-b.reset: + b.resetSignals(reset) + case <-b.resetAll: + b.resetAllSignals() case <-b.quit: return } @@ -165,7 +183,30 @@ func (b *SyncBus) Signal(keys ...string) { b.signal <- keys } -// Close tears down the SyncBus. +// ResetSignals clears the set signals defined by the provided keys. +// +// If the receiver *SyncBus is nil, or no key argument is passed to it, +// it is a noop. +func (b *SyncBus) ResetSignals(keys ...string) { + if b == nil || len(keys) == 0 { + return + } + + b.reset <- keys +} + +// ResetAllSignals clears all the set signals. +// +// If the receiver *SyncBus is nil, it is a noop. +func (b *SyncBus) ResetAllSignals() { + if b == nil { + return + } + + b.resetAll <- struct{}{} +} + +// Close tears down the SyncBus. If the receiver is nil, it is a noop. func (b *SyncBus) Close() { if b == nil { return diff --git a/syncbus_test.go b/syncbus_test.go index 224015a..9934af1 100644 --- a/syncbus_test.go +++ b/syncbus_test.go @@ -76,6 +76,34 @@ func TestNilSignal(t *testing.T) { } } +func TestNilReset(t *testing.T) { + var bus *SyncBus + tw := newTestWait(1) + go func() { + bus.Signal("test") + tw.done() + }() + + bus.ResetSignals("test") + if err := tw.wait(); err != nil { + t.Error(err) + } +} + +func TestNilResetAll(t *testing.T) { + var bus *SyncBus + tw := newTestWait(1) + go func() { + bus.Signal("test") + tw.done() + }() + + bus.ResetAllSignals() + if err := tw.wait(); err != nil { + t.Error(err) + } +} + func TestNilClose(t *testing.T) { var bus *SyncBus bus.Close() @@ -101,6 +129,20 @@ func TestEmptySignal(t *testing.T) { } } +func TestEmptyRest(t *testing.T) { + bus := New(120 * time.Millisecond) + tw := newTestWait(1) + go func() { + bus.Signal() + tw.done() + }() + + bus.ResetSignals() + if err := tw.wait(); err != nil { + t.Error(err) + } +} + func TestTimeout(t *testing.T) { bus := New(12 * time.Millisecond) defer bus.Close() @@ -212,3 +254,38 @@ func TestMultiKeySignal(t *testing.T) { t.Error(err) } } + +func TestSignalBeforeWait(t *testing.T) { + bus := New(120 * time.Millisecond) + defer bus.Close() + + bus.Signal("foo") + if err := bus.Wait("foo"); err != nil { + t.Error(err) + } +} + +func TestReset(t *testing.T) { + bus := New(12 * time.Millisecond) + defer bus.Close() + + bus.Signal("foo") + bus.ResetSignals("foo") + if err := bus.Wait("foo"); err != ErrTimeout { + t.Error("failed to timeout") + } +} + +func TestResetAll(t *testing.T) { + bus := New(12 * time.Millisecond) + defer bus.Close() + + bus.Signal("foo") + bus.Signal("bar") + bus.Signal("baz") + + bus.ResetAllSignals() + if err := bus.Wait("foo", "bar", "baz"); err != ErrTimeout { + t.Error("failed to timeout") + } +}