From 5fe5d8d8fba5523176afe750b3e6de30f2e6bcf1 Mon Sep 17 00:00:00 2001 From: Arpad Ryszka Date: Sun, 18 Mar 2018 20:42:20 +0100 Subject: [PATCH] import initial version --- .gitignore | 1 + .travis.yml | 12 +++ LICENSE | 21 +++++ Makefile | 39 +++++++++ README.md | 13 +++ example_test.go | 45 ++++++++++ syncbus.go | 175 +++++++++++++++++++++++++++++++++++++++ syncbus_test.go | 214 ++++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 520 insertions(+) create mode 100644 .gitignore create mode 100644 .travis.yml create mode 100644 LICENSE create mode 100644 Makefile create mode 100644 README.md create mode 100644 example_test.go create mode 100644 syncbus.go create mode 100644 syncbus_test.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..223cec9 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.coverprofile diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..10c14f7 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,12 @@ +sudo: false +language: go +go: +- 1.x +branches: + except: + - "/^v\\d+[.]\\d+[.]\\d+.*/" +script: +- make ci-trigger +env: + global: +- secure: i9SWpbE9UXbPQljlqVoRt7mGAr3GTIgcRZ/sNDWxGa8YYtdQMPI8ooCtSGdN0YJHyrqmYw+LbW8WypLkaxtdyeWGmoeMTKgJpYcDV7trCGBoeLIK+8kiKkiggCd+JmOpiMJpHIkSG3QPwRdWzxVr5hjVDxWpnlzX0eoYvYzDGCPaOZ4zbE9lAddDJnsRIt4gP3ZWuf+28ep7ivIKDW1ZTCewjiXt/tQmlX1WJ843TvWWmrJ//UOZM99nsHt1Tmai6+Trwszi7/AjIAPo8hzdAXvJNakx46XmKhJLGmDYEqOEwZhWWknL0/W8KSW9dASpb8lcPVdJUbIDdktQUrrK3c9qWlaQ8RDC9hPqRc2nJkXJIJCLhGkFAAZz3vP75/FBNco0NT73W4PyfKuk8fJwHA+T7in/+NGsuytSV0ZR7uXivsQmsySq/cSD+66estCjCgvQiCZwdOIL/LNojMWI1DsBBaesEOM7j9v5Xcf3FGRU1kT8S37+3MGjcX0XB+xJ6a16K23COtlSiMUahSwNMnAna/QkVmXrSTt14kMZZIHluBCUepuAX2yf8E3qeH35hoLAsGuBog/aMg/ny6SBmguRfptE9neimWCHZuCcB9hHTNlcfa1B+5ZUVw9tb0867m3giUC8RrgFnFquVW4kPNglrMrYxzm0KedCJJXbW6c= diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..00e6290 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 Arpad Ryszka + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f59ca51 --- /dev/null +++ b/Makefile @@ -0,0 +1,39 @@ +SOURCES = $(shell find . -name '*.go') + +.PHONY: .coverprofile + +default: build + +build: $(SOURCES) + go build + +check: build + go test + +.coverprofile: + go test -coverprofile .coverprofile + +cover: .coverprofile + go tool cover -func .coverprofile + +publishcoverage: .coverprofile + curl -s https://codecov.io/bash -o codecov + bash codecov -Zf .coverprofile + +showcover: .coverprofile + go tool cover -html .coverprofile + +fmt: + gofmt -s -w $(SOURCES) + +checkfmt: $(SOURCES) + @echo check fmt + @if [ "$$(gofmt -s -d $(SOURCES))" != "" ]; then false; else true; fi + +ci-trigger: checkfmt build check +ifeq ($(TRAVIS_BRANCH)_$(TRAVIS_PULL_REQUEST), master_false) + make publishcoverage +endif + +clean: + go clean -i -cache diff --git a/README.md b/README.md new file mode 100644 index 0000000..bc54cbc --- /dev/null +++ b/README.md @@ -0,0 +1,13 @@ +[![License](https://img.shields.io/badge/MIT-License-green.svg)](https://opensource.org/licenses/MIT) +[![Build Status](https://travis-ci.org/aryszka/treerack.svg)](https://travis-ci.org/aryszka/treerack) +[![codecov](https://codecov.io/gh/aryszka/treerack/branch/master/graph/badge.svg)](https://codecov.io/gh/aryszka/treerack) + +# SyncBus + +Event bus for testing concurrent Go programs. + +SyncBus provides a synchronization hook that can be used by multiple goroutines in Go tests to ensure and verify +the right order of execution of testing and production code. + +Please, find the documentation here: +[https://godoc.org/github.com/aryszka/syncbus](https://godoc.org/github.com/aryszka/syncbus). diff --git a/example_test.go b/example_test.go new file mode 100644 index 0000000..6d58df1 --- /dev/null +++ b/example_test.go @@ -0,0 +1,45 @@ +package syncbus_test + +import ( + "fmt" + "sync" + "time" + + "github.com/aryszka/syncbus" +) + +type Server struct { + resource int + mx sync.Mutex + testBus *syncbus.SyncBus +} + +func (s *Server) AsyncInit() { + go func() { + s.mx.Lock() + defer s.mx.Unlock() + s.resource = 42 + s.testBus.Signal("initialized") + }() +} + +func (s *Server) Resource() int { + s.mx.Lock() + defer s.mx.Unlock() + return s.resource +} + +func Example() { + s := &Server{} + s.testBus = syncbus.New(120 * time.Millisecond) + + s.AsyncInit() + if err := s.testBus.Wait("initialized"); err != nil { + fmt.Println("failed:", err) + } + + fmt.Println(s.Resource()) + + // Output: + // 42 +} diff --git a/syncbus.go b/syncbus.go new file mode 100644 index 0000000..dd1139c --- /dev/null +++ b/syncbus.go @@ -0,0 +1,175 @@ +/* +Package syncbus provides an event bus for testing. + +SyncBus can be used to execute test instructions in a concurrent program in a predefined order. It is expected +to be used as a test hook shared between the production and the test code, or left nil if not required. + +It provides a wait function for processes in goroutines that should continue only when a predefined signal is +set, or the timeout expires. If the predefined signals are already set at the time of calling wait, the +calling goroutine continues immediately. Once a signal is set, it stays so until it's cleared. + +Wait can expect one or more signals represented by keys. The signals don't need to be set simultaneously in +order to release a waiting goroutine. A wait continues once all the signals that it depends on were set. +*/ +package syncbus + +import ( + "errors" + "time" +) + +type waitItem struct { + keys []string + deadline time.Time + signal chan error +} + +// SyncBus can be used to synchronize goroutines through signals. +type SyncBus struct { + timeout time.Duration + waiting []waitItem + signals map[string]bool + wait chan waitItem + signal chan []string + quit chan struct{} +} + +// ErrTimeout is returned by Wait() when failed to receive all the signals in time. +var ErrTimeout = errors.New("timeout") + +// New creates and initializes a new SyncBus. It uses a shared timeout for all the Wait calls. +func New(timeout time.Duration) *SyncBus { + b := &SyncBus{ + timeout: timeout, + signals: make(map[string]bool), + wait: make(chan waitItem), + signal: make(chan []string), + quit: make(chan struct{}), + } + + go b.run() + return b +} + +func (b *SyncBus) nextTimeout(now time.Time) <-chan time.Time { + if len(b.waiting) == 0 { + return nil + } + + to := b.waiting[0].deadline.Sub(time.Now()) + return time.After(to) +} + +func (b *SyncBus) addWaiting(now time.Time, w waitItem) { + w.deadline = now.Add(b.timeout) + b.waiting = append(b.waiting, w) +} + +func (b *SyncBus) setSignal(keys []string) { + for _, key := range keys { + b.signals[key] = true + } +} + +func (b *SyncBus) timeoutWaiting(now time.Time) { + for i, w := range b.waiting { + if w.deadline.After(now) { + b.waiting = b.waiting[i:] + return + } + + w.signal <- ErrTimeout + } + + b.waiting = nil +} + +func (b *SyncBus) signalWaiting(now time.Time) { + var keep []waitItem + for _, w := range b.waiting { + var keepItem bool + for _, key := range w.keys { + if !b.signals[key] { + keepItem = true + break + } + } + + if keepItem { + keep = append(keep, w) + continue + } + + w.signal <- nil + } + + b.waiting = keep +} + +func (b *SyncBus) run() { + var to <-chan time.Time + for { + select { + case <-to: + now := time.Now() + b.timeoutWaiting(now) + to = b.nextTimeout(now) + case wait := <-b.wait: + now := time.Now() + b.addWaiting(now, wait) + b.signalWaiting(now) + to = b.nextTimeout(now) + case signal := <-b.signal: + now := time.Now() + b.setSignal(signal) + b.signalWaiting(now) + to = b.nextTimeout(now) + case <-b.quit: + return + } + } +} + +// Wait blocks until all the signals represented by the keys are set, or +// returns an ErrTimeout if the timeout, counted from the call to Wait, +// expires. +// +// It returns only ErrTimeout or nil. +// +// If the receiver *SyncBus is nil, or no key argument is passed to it, +// it is a noop. +func (b *SyncBus) Wait(keys ...string) error { + if b == nil || len(keys) == 0 { + return nil + } + + w := waitItem{ + keys: keys, + signal: make(chan error, 1), + } + + b.wait <- w + err := <-w.signal + return err +} + +// Signal sets one or more signals represented by the keys. +// +// If the receiver *SyncBus is nil, or no key argument is passed to it, +// it is a noop. +func (b *SyncBus) Signal(keys ...string) { + if b == nil || len(keys) == 0 { + return + } + + b.signal <- keys +} + +// Close tears down the SyncBus. +func (b *SyncBus) Close() { + if b == nil { + return + } + + close(b.quit) +} diff --git a/syncbus_test.go b/syncbus_test.go new file mode 100644 index 0000000..224015a --- /dev/null +++ b/syncbus_test.go @@ -0,0 +1,214 @@ +package syncbus + +import ( + "errors" + "testing" + "time" +) + +const testWaitTimeout = 120 * time.Millisecond + +type testWait struct { + n int + c chan struct{} + doneAll chan struct{} +} + +var ( + token = struct{}{} + errUnexpectedlyDone = errors.New("unexpectedly done") +) + +func newTestWait(n int) *testWait { + return &testWait{ + n: n, + c: make(chan struct{}, n), + doneAll: make(chan struct{}), + } +} + +func (tw *testWait) wait() error { + for { + if tw.n <= 0 { + close(tw.doneAll) + return nil + } + + select { + case <-tw.c: + tw.n-- + case <-time.After(testWaitTimeout): + return ErrTimeout + } + } +} + +func (tw *testWait) checkWaiting() error { + select { + case <-tw.doneAll: + return errUnexpectedlyDone + default: + return nil + } +} + +func (tw testWait) done() { + tw.c <- token +} + +func TestNilWait(t *testing.T) { + var bus *SyncBus + if err := bus.Wait("test"); err != nil { + t.Error(err) + } +} + +func TestNilSignal(t *testing.T) { + var bus *SyncBus + tw := newTestWait(1) + go func() { + bus.Signal("test") + tw.done() + }() + + if err := tw.wait(); err != nil { + t.Error(err) + } +} + +func TestNilClose(t *testing.T) { + var bus *SyncBus + bus.Close() +} + +func TestEmptyWait(t *testing.T) { + bus := New(120 * time.Millisecond) + if err := bus.Wait(); err != nil { + t.Error(err) + } +} + +func TestEmptySignal(t *testing.T) { + bus := New(120 * time.Millisecond) + tw := newTestWait(1) + go func() { + bus.Signal() + tw.done() + }() + + if err := tw.wait(); err != nil { + t.Error(err) + } +} + +func TestTimeout(t *testing.T) { + bus := New(12 * time.Millisecond) + defer bus.Close() + + if err := bus.Wait("test"); err != ErrTimeout { + t.Error("failed to timeout") + } +} + +func TestTimeoutOneOfTwo(t *testing.T) { + to := 12 * time.Millisecond + bus := New(to) + defer bus.Close() + + tw := newTestWait(2) + + go func() { + if err := bus.Wait("test1"); err != ErrTimeout { + t.Error("failed to timeout") + } + + tw.done() + }() + + go func() { + time.Sleep(2 * to / 3) + if err := bus.Wait("test2"); err != nil { + t.Error("unexpected error:", err) + } + + tw.done() + }() + + time.Sleep(4 * to / 3) + bus.Signal("test2") + if err := tw.wait(); err != nil { + t.Error(err) + } +} + +func TestSingleKeySignal(t *testing.T) { + bus := New(120 * time.Millisecond) + defer bus.Close() + + tw := newTestWait(2) + + go func() { + if err := bus.Wait("test"); err != nil { + t.Error(err) + } + + tw.done() + }() + + go func() { + if err := bus.Wait("test"); err != nil { + t.Error(err) + } + + tw.done() + }() + + bus.Signal("test") + if err := tw.wait(); err != nil { + t.Error(err) + } +} + +func TestMultiKeySignal(t *testing.T) { + bus := New(120 * time.Millisecond) + defer bus.Close() + + tw1 := newTestWait(1) + go func() { + if err := bus.Wait("foo", "bar"); err != nil { + t.Error(err) + } + + tw1.done() + }() + + tw2 := newTestWait(1) + go func() { + if err := bus.Wait("bar", "baz"); err != nil { + t.Error(err) + } + + tw2.done() + }() + + bus.Signal("foo") + if err := tw1.checkWaiting(); err != nil { + t.Error(err) + } + if err := tw2.checkWaiting(); err != nil { + t.Error(err) + } + + bus.Signal("bar") + if err := tw1.wait(); err != nil { + t.Error(err) + } + if err := tw2.checkWaiting(); err != nil { + t.Error(err) + } + + bus.Signal("baz") + if err := tw2.wait(); err != nil { + t.Error(err) + } +}