Merge pull request #15023 from mesosphere/jdef-fix14898-miniontestflake2
Auto commit by PR queue bot
This commit is contained in:
		@@ -281,7 +281,7 @@ func taskRunning(t *Task) taskStateFn {
 | 
			
		||||
 | 
			
		||||
	select {
 | 
			
		||||
	case <-t.shouldQuit:
 | 
			
		||||
		t.tryComplete(t.awaitDeath(defaultKillGracePeriod, waitCh))
 | 
			
		||||
		t.tryComplete(t.awaitDeath(&realTimer{}, defaultKillGracePeriod, waitCh))
 | 
			
		||||
	case wr := <-waitCh:
 | 
			
		||||
		t.tryComplete(wr)
 | 
			
		||||
	}
 | 
			
		||||
@@ -290,7 +290,9 @@ func taskRunning(t *Task) taskStateFn {
 | 
			
		||||
 | 
			
		||||
// awaitDeath waits for the process to complete, or else for a "quit" signal on the task-
 | 
			
		||||
// at which point we'll attempt to kill manually.
 | 
			
		||||
func (t *Task) awaitDeath(gracePeriod time.Duration, waitCh <-chan *Completion) *Completion {
 | 
			
		||||
func (t *Task) awaitDeath(timer timer, gracePeriod time.Duration, waitCh <-chan *Completion) *Completion {
 | 
			
		||||
	defer timer.discard()
 | 
			
		||||
 | 
			
		||||
	select {
 | 
			
		||||
	case wr := <-waitCh:
 | 
			
		||||
		// got a signal to quit, but we're already finished
 | 
			
		||||
@@ -318,10 +320,11 @@ waitLoop:
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// Wait for the kill to be processed, and child proc resources cleaned up; try to avoid zombies!
 | 
			
		||||
		timer.set(gracePeriod)
 | 
			
		||||
		select {
 | 
			
		||||
		case wr = <-waitCh:
 | 
			
		||||
			break waitLoop
 | 
			
		||||
		case <-time.After(gracePeriod):
 | 
			
		||||
		case <-timer.await():
 | 
			
		||||
			// want a timeout, but a shorter one than we used initially.
 | 
			
		||||
			// using /= 2 is deterministic and yields the desirable effect.
 | 
			
		||||
			gracePeriod /= 2
 | 
			
		||||
 
 | 
			
		||||
@@ -24,6 +24,7 @@ import (
 | 
			
		||||
	"sync"
 | 
			
		||||
	"syscall"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	log "github.com/golang/glog"
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
@@ -222,15 +223,28 @@ func TestMergeOutput(t *testing.T) {
 | 
			
		||||
	<-te.Done() // wait for the merge to complete
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type fakeTimer struct {
 | 
			
		||||
	ch chan time.Time
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *fakeTimer) set(d time.Duration)     {}
 | 
			
		||||
func (t *fakeTimer) discard()                {}
 | 
			
		||||
func (t *fakeTimer) await() <-chan time.Time { return t.ch }
 | 
			
		||||
func (t *fakeTimer) expire()                 { t.ch = make(chan time.Time); close(t.ch) }
 | 
			
		||||
func (t *fakeTimer) reset()                  { t.ch = nil }
 | 
			
		||||
 | 
			
		||||
func TestAfterDeath(t *testing.T) {
 | 
			
		||||
	// test kill escalation since that's not covered by other unit tests
 | 
			
		||||
	t1 := New("foo", "", nil, nil, devNull)
 | 
			
		||||
	kills := 0
 | 
			
		||||
	waitCh := make(chan *Completion, 1)
 | 
			
		||||
	timer := &fakeTimer{}
 | 
			
		||||
	timer.expire()
 | 
			
		||||
	t1.killFunc = func(force bool) (int, error) {
 | 
			
		||||
		// > 0 is intentional, multiple calls to close() should panic
 | 
			
		||||
		if kills > 0 {
 | 
			
		||||
			assert.True(t, force)
 | 
			
		||||
			timer.reset() // don't want to race w/ waitCh
 | 
			
		||||
			waitCh <- &Completion{name: t1.name, code: 123}
 | 
			
		||||
			close(waitCh)
 | 
			
		||||
		} else {
 | 
			
		||||
@@ -239,7 +253,7 @@ func TestAfterDeath(t *testing.T) {
 | 
			
		||||
		kills++
 | 
			
		||||
		return 0, nil
 | 
			
		||||
	}
 | 
			
		||||
	wr := t1.awaitDeath(0, waitCh)
 | 
			
		||||
	wr := t1.awaitDeath(timer, 0, waitCh)
 | 
			
		||||
	assert.Equal(t, "foo", wr.name)
 | 
			
		||||
	assert.Equal(t, 123, wr.code)
 | 
			
		||||
	assert.NoError(t, wr.err)
 | 
			
		||||
@@ -252,7 +266,9 @@ func TestAfterDeath(t *testing.T) {
 | 
			
		||||
		t.Fatalf("should not attempt to kill a task that has already reported completion")
 | 
			
		||||
		return 0, nil
 | 
			
		||||
	}
 | 
			
		||||
	wr = t1.awaitDeath(0, waitCh)
 | 
			
		||||
 | 
			
		||||
	timer.reset() // don't race w/ waitCh
 | 
			
		||||
	wr = t1.awaitDeath(timer, 0, waitCh)
 | 
			
		||||
	assert.Equal(t, 456, wr.code)
 | 
			
		||||
	assert.NoError(t, wr.err)
 | 
			
		||||
 | 
			
		||||
@@ -270,7 +286,8 @@ func TestAfterDeath(t *testing.T) {
 | 
			
		||||
		kills++
 | 
			
		||||
		return 0, nil
 | 
			
		||||
	}
 | 
			
		||||
	wr = t1.awaitDeath(0, nil)
 | 
			
		||||
	timer.expire()
 | 
			
		||||
	wr = t1.awaitDeath(timer, 0, nil)
 | 
			
		||||
	assert.Equal(t, "foo", wr.name)
 | 
			
		||||
	assert.Error(t, wr.err)
 | 
			
		||||
 | 
			
		||||
@@ -287,7 +304,8 @@ func TestAfterDeath(t *testing.T) {
 | 
			
		||||
		kills++
 | 
			
		||||
		return 0, killFailed
 | 
			
		||||
	}
 | 
			
		||||
	wr = t1.awaitDeath(0, nil)
 | 
			
		||||
	timer.expire()
 | 
			
		||||
	wr = t1.awaitDeath(timer, 0, nil)
 | 
			
		||||
	assert.Equal(t, "foo", wr.name)
 | 
			
		||||
	assert.Error(t, wr.err)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										52
									
								
								contrib/mesos/pkg/minion/tasks/timer.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										52
									
								
								contrib/mesos/pkg/minion/tasks/timer.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,52 @@
 | 
			
		||||
/*
 | 
			
		||||
Copyright 2015 The Kubernetes Authors All rights reserved.
 | 
			
		||||
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package tasks
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type timer interface {
 | 
			
		||||
	set(time.Duration)
 | 
			
		||||
	discard()
 | 
			
		||||
	await() <-chan time.Time
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type realTimer struct {
 | 
			
		||||
	*time.Timer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *realTimer) set(d time.Duration) {
 | 
			
		||||
	if t.Timer == nil {
 | 
			
		||||
		t.Timer = time.NewTimer(d)
 | 
			
		||||
	} else {
 | 
			
		||||
		t.Reset(d)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *realTimer) await() <-chan time.Time {
 | 
			
		||||
	if t.Timer == nil {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	return t.C
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *realTimer) discard() {
 | 
			
		||||
	if t.Timer != nil {
 | 
			
		||||
		t.Stop()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user