diff --git a/contrib/mesos/pkg/proc/once.go b/contrib/mesos/pkg/proc/once.go index 09993659f7..8bf5c1a0bb 100644 --- a/contrib/mesos/pkg/proc/once.go +++ b/contrib/mesos/pkg/proc/once.go @@ -55,12 +55,22 @@ func (b *errorOnce) Report(err error) { } func (b *errorOnce) Send(errIn <-chan error) ErrorOnce { - go b.forward(errIn) + if errIn == nil { + // don't execute this in a goroutine; save resources AND the caller + // likely wants this executed ASAP because some of some operation + // ordering semantics. forward() will not block here on a nil input + // so this is safe to do. + b.forward(nil) + } else { + go b.forward(errIn) + } return b } func (b *errorOnce) forward(errIn <-chan error) { if errIn == nil { + // important: nil never blocks; Report(nil) is guaranteed to be a + // non-blocking operation. b.Report(nil) return } diff --git a/contrib/mesos/pkg/proc/proc_test.go b/contrib/mesos/pkg/proc/proc_test.go index 19dc8bf465..2548e27e65 100644 --- a/contrib/mesos/pkg/proc/proc_test.go +++ b/contrib/mesos/pkg/proc/proc_test.go @@ -131,7 +131,7 @@ func TestProc_multiAction(t *testing.T) { defer called.Done() log.Infof("deferred action invoked") if next != idx { - t.Fatalf("expected index %d instead of %d", idx, next) + t.Errorf("expected index %d instead of %d", idx, next) } next++ }) @@ -181,7 +181,7 @@ func TestProc_doWith(t *testing.T) { err := decorated.Do(func() { defer close(executed) if !delegated { - t.Fatalf("expected delegated execution") + t.Errorf("expected delegated execution") } }) if err == nil { @@ -215,7 +215,7 @@ func TestProc_doWithNestedTwice(t *testing.T) { err := decorated2.Do(func() { defer close(executed) if !delegated { - t.Fatalf("expected delegated execution") + t.Errorf("expected delegated execution") } }) if err == nil { @@ -254,7 +254,7 @@ func TestProc_doWithNestedErrorPropagation(t *testing.T) { errCh := decorated2.Do(func() { defer close(executed) if !delegated { - t.Fatalf("expected delegated execution") + t.Errorf("expected delegated execution") } errOnce.Report(expectedErr) }) @@ -333,6 +333,9 @@ func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce) <-executed t.Logf("runDelegationTest received executed signal at " + time.Now().String()) + + wg.Wait() + t.Logf("runDelegationTest nested decorators finished at " + time.Now().String()) } func TestProc_doWithNestedX(t *testing.T) { @@ -367,7 +370,7 @@ func TestProc_doWithNestedXConcurrent(t *testing.T) { go func() { err, _ := <-errOnce.Err() if err != nil { - t.Fatalf("delegate %d: unexpected error: %v", i, err) + t.Errorf("delegate %d: unexpected error: %v", i, err) } }() }