fix intermittent deadlock w/ nested concurrent action scheduling and
give each delegate its own errOnce
This commit is contained in:
@@ -39,7 +39,8 @@ func errorAfter(errOnce ErrorOnce, done <-chan struct{}, d time.Duration, msg st
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(d):
|
||||
errOnce.Reportf(msg, args...)
|
||||
//errOnce.Reportf(msg, args...)
|
||||
panic(fmt.Sprintf(msg, args...))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -277,7 +278,8 @@ func TestProc_doWithNestedErrorPropagation(t *testing.T) {
|
||||
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
|
||||
}
|
||||
|
||||
func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce) {
|
||||
func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce, timeout time.Duration) {
|
||||
t.Logf("starting test case " + name + " at " + time.Now().String())
|
||||
defer func() {
|
||||
t.Logf("runDelegationTest finished at " + time.Now().String())
|
||||
}()
|
||||
@@ -293,11 +295,11 @@ func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce)
|
||||
x := x
|
||||
nextp := DoWith(decorated, DoerFunc(func(a Action) <-chan error {
|
||||
if x == 1 {
|
||||
t.Logf("delegate chain invoked for " + name)
|
||||
t.Logf("delegate chain invoked for " + name + " at " + time.Now().String())
|
||||
}
|
||||
y++
|
||||
if y != x {
|
||||
return ErrorChanf("out of order delegated execution")
|
||||
return ErrorChanf("out of order delegated execution for " + name)
|
||||
}
|
||||
defer wg.Done()
|
||||
a()
|
||||
@@ -310,7 +312,7 @@ func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce)
|
||||
errCh := decorated.Do(func() {
|
||||
defer close(executed)
|
||||
if y != DEPTH {
|
||||
errOnce.Reportf("expected delegated execution")
|
||||
errOnce.Reportf("expected delegated execution for " + name)
|
||||
}
|
||||
t.Logf("executing deferred action: " + name + " at " + time.Now().String())
|
||||
errOnce.Send(nil) // we completed without error, let the listener know
|
||||
@@ -323,7 +325,7 @@ func runDelegationTest(t *testing.T, p Process, name string, errOnce ErrorOnce)
|
||||
// from errCh after this point
|
||||
errOnce.Send(errCh)
|
||||
|
||||
errorAfter(errOnce, executed, 5*time.Second, "timed out waiting deferred execution")
|
||||
errorAfter(errOnce, executed, timeout, "timed out waiting deferred execution of "+name)
|
||||
t.Logf("runDelegationTest received executed signal at " + time.Now().String())
|
||||
}
|
||||
|
||||
@@ -331,14 +333,15 @@ func TestProc_doWithNestedX(t *testing.T) {
|
||||
t.Logf("starting test case at " + time.Now().String())
|
||||
p := New()
|
||||
errOnce := NewErrorOnce(p.Done())
|
||||
runDelegationTest(t, p, "nested", errOnce)
|
||||
timeout := 5 * time.Second
|
||||
runDelegationTest(t, p, "nested", errOnce, timeout)
|
||||
<-p.End()
|
||||
select {
|
||||
case err := <-errOnce.Err():
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-time.After(2 * timeout):
|
||||
t.Fatalf("timed out waiting for doer result")
|
||||
}
|
||||
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
|
||||
@@ -346,28 +349,53 @@ func TestProc_doWithNestedX(t *testing.T) {
|
||||
|
||||
// intended to be run with -race
|
||||
func TestProc_doWithNestedXConcurrent(t *testing.T) {
|
||||
p := New()
|
||||
errOnce := NewErrorOnce(p.Done())
|
||||
config := defaultConfig
|
||||
config.actionQueueDepth = 0
|
||||
p := newConfigured(config)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
const CONC = 20
|
||||
wg.Add(CONC)
|
||||
timeout := 3 * time.Second
|
||||
|
||||
for i := 0; i < CONC; i++ {
|
||||
i := i
|
||||
runtime.After(func() { runDelegationTest(t, p, fmt.Sprintf("nested%d", i), errOnce) }).Then(wg.Done)
|
||||
errOnce := NewErrorOnce(p.Done())
|
||||
runtime.After(func() { runDelegationTest(t, p, fmt.Sprintf("nested%d", i), errOnce, timeout) }).Then(wg.Done)
|
||||
go func() {
|
||||
select {
|
||||
case err := <-errOnce.Err():
|
||||
if err != nil {
|
||||
t.Fatalf("delegate %d: unexpected error: %v", i, err)
|
||||
}
|
||||
case <-time.After(2 * timeout):
|
||||
t.Fatalf("delegate %d: timed out waiting for doer result", i)
|
||||
}
|
||||
}()
|
||||
}
|
||||
ch := runtime.After(wg.Wait)
|
||||
fatalAfter(t, ch, 10*time.Second, "timed out waiting for concurrent delegates")
|
||||
fatalAfter(t, ch, 2*timeout, "timed out waiting for concurrent delegates")
|
||||
|
||||
<-p.End()
|
||||
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
|
||||
}
|
||||
|
||||
func TestProcWithExceededActionQueueDepth(t *testing.T) {
|
||||
config := defaultConfig
|
||||
config.actionQueueDepth = 0
|
||||
p := newConfigured(config)
|
||||
|
||||
errOnce := NewErrorOnce(p.Done())
|
||||
timeout := 5 * time.Second
|
||||
runDelegationTest(t, p, "nested", errOnce, timeout)
|
||||
<-p.End()
|
||||
select {
|
||||
case err := <-errOnce.Err():
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-time.After(2 * timeout):
|
||||
t.Fatalf("timed out waiting for doer result")
|
||||
}
|
||||
|
||||
fatalAfter(t, p.Done(), 5*time.Second, "timed out waiting for process death")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user