Merge pull request #1392 from cpuguy83/wait_async

Make Wait() async
This commit is contained in:
Michael Crosby 2017-08-22 11:33:10 -04:00 committed by GitHub
commit 10460647f2
12 changed files with 381 additions and 360 deletions

View File

@ -208,30 +208,23 @@ func (w *worker) runContainer(ctx context.Context, id string) error {
return err
}
defer task.Delete(ctx, containerd.WithProcessKill)
var (
start sync.WaitGroup
status = make(chan uint32, 1)
)
start.Add(1)
go func() {
start.Done()
s, err := task.Wait(w.waitContext)
if err != nil {
if err == context.DeadlineExceeded ||
err == context.Canceled {
close(status)
return
}
w.failures++
logrus.WithError(err).Errorf("wait task %s", id)
}
status <- s
}()
start.Wait()
statusC, err := task.Wait(ctx)
if err != nil {
return err
}
if err := task.Start(ctx); err != nil {
return err
}
<-status
status := <-statusC
_, _, err = status.Result()
if err != nil {
if err == context.DeadlineExceeded || err == context.Canceled {
return nil
}
w.failures++
}
return nil
}

View File

@ -44,6 +44,12 @@ var taskAttachCommand = cli.Command{
return err
}
defer task.Delete(ctx)
statusC, err := task.Wait(ctx)
if err != nil {
return err
}
if tty {
if err := handleConsoleResize(ctx, task, con); err != nil {
logrus.WithError(err).Error("console resize")
@ -52,12 +58,14 @@ var taskAttachCommand = cli.Command{
sigc := forwardAllSignals(ctx, task)
defer stopCatch(sigc)
}
status, err := task.Wait(ctx)
ec := <-statusC
code, _, err := ec.Result()
if err != nil {
return err
}
if status != 0 {
return cli.NewExitError("", int(status))
if code != 0 {
return cli.NewExitError("", int(code))
}
return nil
},

View File

@ -70,14 +70,11 @@ var taskExecCommand = cli.Command{
}
defer process.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := process.Wait(ctx)
if err != nil {
logrus.WithError(err).Error("wait process")
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
return err
}
var con console.Console
if tty {
con = console.Current()
@ -98,8 +95,12 @@ var taskExecCommand = cli.Command{
defer stopCatch(sigc)
}
status := <-statusC
if status != 0 {
return cli.NewExitError("", int(status))
code, _, err := status.Result()
if err != nil {
return err
}
if code != 0 {
return cli.NewExitError("", int(code))
}
return nil
},

View File

@ -129,14 +129,11 @@ var runCommand = cli.Command{
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
logrus.WithError(err).Error("wait process")
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
return err
}
var con console.Console
if tty {
con = console.Current()
@ -158,11 +155,16 @@ var runCommand = cli.Command{
}
status := <-statusC
code, _, err := status.Result()
if err != nil {
return err
}
if _, err := task.Delete(ctx); err != nil {
return err
}
if status != 0 {
return cli.NewExitError("", int(status))
if code != 0 {
return cli.NewExitError("", int(code))
}
return nil
},

View File

@ -47,14 +47,11 @@ var taskStartCommand = cli.Command{
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
logrus.WithError(err).Error("wait process")
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
return err
}
var con console.Console
if tty {
con = console.Current()
@ -76,11 +73,15 @@ var taskStartCommand = cli.Command{
}
status := <-statusC
code, _, err := status.Result()
if err != nil {
return err
}
if _, err := task.Delete(ctx); err != nil {
return err
}
if status != 0 {
return cli.NewExitError("", int(status))
if code != 0 {
return cli.NewExitError("", int(code))
}
return nil
},

View File

@ -47,14 +47,11 @@ func TestCheckpointRestore(t *testing.T) {
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if err := task.Start(ctx); err != nil {
t.Error(err)
@ -79,13 +76,11 @@ func TestCheckpointRestore(t *testing.T) {
}
defer task.Delete(ctx)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err = task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if err := task.Start(ctx); err != nil {
t.Error(err)
@ -137,14 +132,11 @@ func TestCheckpointRestoreNewContainer(t *testing.T) {
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if err := task.Start(ctx); err != nil {
t.Error(err)
@ -177,13 +169,11 @@ func TestCheckpointRestoreNewContainer(t *testing.T) {
}
defer task.Delete(ctx)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err = task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if err := task.Start(ctx); err != nil {
t.Error(err)
@ -240,14 +230,11 @@ func TestCheckpointLeaveRunning(t *testing.T) {
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if err := task.Start(ctx); err != nil {
t.Error(err)

View File

@ -57,14 +57,11 @@ func TestContainerUpdate(t *testing.T) {
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
// check that the task has a limit of 32mb
cgroup, err := cgroups.Load(cgroups.V1, cgroups.PidPath(int(task.Pid())))
@ -157,14 +154,12 @@ func TestShimInCgroup(t *testing.T) {
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
// check to see if the shim is inside the cgroup
processes, err := cg.Processes(cgroups.Devices, false)
if err != nil {
@ -221,17 +216,11 @@ func TestDaemonRestart(t *testing.T) {
}
defer task.Delete(ctx)
synC := make(chan struct{})
statusC := make(chan uint32, 1)
go func() {
synC <- struct{}{}
status, err := task.Wait(ctx)
if err == nil {
t.Errorf(`first task.Wait() should have failed with "transport is closing"`)
}
statusC <- status
}()
<-synC
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if err := task.Start(ctx); err != nil {
t.Error(err)
@ -242,7 +231,11 @@ func TestDaemonRestart(t *testing.T) {
t.Fatal(err)
}
<-statusC
status := <-statusC
_, _, err = status.Result()
if err == nil {
t.Errorf(`first task.Wait() should have failed with "transport is closing"`)
}
waitCtx, waitCancel := context.WithTimeout(ctx, 2*time.Second)
serving, err := client.IsServing(waitCtx)
@ -251,15 +244,11 @@ func TestDaemonRestart(t *testing.T) {
t.Fatalf("containerd did not start within 2s: %v", err)
}
go func() {
synC <- struct{}{}
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
<-synC
statusC, err = task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
t.Fatal(err)

View File

@ -124,14 +124,11 @@ func TestContainerStart(t *testing.T) {
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if pid := task.Pid(); pid <= 0 {
t.Errorf("invalid task pid %d", pid)
@ -142,15 +139,22 @@ func TestContainerStart(t *testing.T) {
return
}
status := <-statusC
if status != 7 {
t.Errorf("expected status 7 from wait but received %d", status)
}
if status, err = task.Delete(ctx); err != nil {
code, _, err := status.Result()
if err != nil {
t.Error(err)
return
}
if status != 7 {
t.Errorf("expected status 7 from delete but received %d", status)
if code != 7 {
t.Errorf("expected status 7 from wait but received %d", code)
}
deleteStatus, err := task.Delete(ctx)
if err != nil {
t.Error(err)
return
}
if deleteStatus != 7 {
t.Errorf("expected status 7 from delete but received %d", deleteStatus)
}
}
@ -199,14 +203,11 @@ func TestContainerOutput(t *testing.T) {
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if err := task.Start(ctx); err != nil {
t.Error(err)
@ -214,8 +215,9 @@ func TestContainerOutput(t *testing.T) {
}
status := <-statusC
if status != 0 {
t.Errorf("expected status 0 but received %d", status)
code, _, _ := status.Result()
if code != 0 {
t.Errorf("expected status 0 but received %d", code)
}
if _, err := task.Delete(ctx); err != nil {
t.Error(err)
@ -273,13 +275,11 @@ func TestContainerExec(t *testing.T) {
}
defer task.Delete(ctx)
finished := make(chan struct{}, 1)
go func() {
if _, err := task.Wait(ctx); err != nil {
t.Error(err)
}
close(finished)
}()
finishedC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if err := task.Start(ctx); err != nil {
t.Error(err)
@ -295,14 +295,11 @@ func TestContainerExec(t *testing.T) {
t.Error(err)
return
}
processStatusC := make(chan uint32, 1)
go func() {
status, err := process.Wait(ctx)
if err != nil {
t.Error(err)
}
processStatusC <- status
}()
processStatusC, err := process.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if err := process.Start(ctx); err != nil {
t.Error(err)
@ -311,9 +308,14 @@ func TestContainerExec(t *testing.T) {
// wait for the exec to return
status := <-processStatusC
code, _, err := status.Result()
if err != nil {
t.Error(err)
return
}
if status != 6 {
t.Errorf("expected exec exit code 6 but received %d", status)
if code != 6 {
t.Errorf("expected exec exit code 6 but received %d", code)
}
deleteStatus, err := process.Delete(ctx)
if err != nil {
@ -326,7 +328,7 @@ func TestContainerExec(t *testing.T) {
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
t.Error(err)
}
<-finished
<-finishedC
}
func TestContainerPids(t *testing.T) {
@ -372,14 +374,11 @@ func TestContainerPids(t *testing.T) {
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if err := task.Start(ctx); err != nil {
t.Error(err)
@ -462,14 +461,11 @@ func TestContainerCloseIO(t *testing.T) {
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if err := task.Start(ctx); err != nil {
t.Error(err)
@ -576,14 +572,10 @@ func TestContainerAttach(t *testing.T) {
defer task.Delete(ctx)
originalIO := task.IO()
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
if err := task.Start(ctx); err != nil {
t.Error(err)
@ -620,7 +612,12 @@ func TestContainerAttach(t *testing.T) {
t.Error(err)
}
<-statusC
status := <-statusC
_, _, err = status.Result()
if err != nil {
t.Error(err)
return
}
originalIO.Close()
if _, err := task.Delete(ctx); err != nil {
@ -681,14 +678,11 @@ func TestDeleteRunningContainer(t *testing.T) {
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if err := task.Start(ctx); err != nil {
t.Error(err)
@ -752,14 +746,11 @@ func TestContainerKill(t *testing.T) {
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if err := task.Start(ctx); err != nil {
t.Error(err)
@ -878,19 +869,15 @@ func TestContainerExecNoBinaryExists(t *testing.T) {
}
defer task.Delete(ctx)
finishedC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
if err := task.Start(ctx); err != nil {
t.Error(err)
return
}
finished := make(chan struct{}, 1)
go func() {
if _, err := task.Wait(ctx); err != nil {
t.Error(err)
}
close(finished)
}()
// start an exec process without running the original container process
processSpec := spec.Process
processSpec.Args = []string{
@ -909,7 +896,7 @@ func TestContainerExecNoBinaryExists(t *testing.T) {
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
t.Error(err)
}
<-finished
<-finishedC
}
func TestUserNamespaces(t *testing.T) {
@ -962,14 +949,11 @@ func TestUserNamespaces(t *testing.T) {
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if pid := task.Pid(); pid <= 0 {
t.Errorf("invalid task pid %d", pid)
@ -980,15 +964,21 @@ func TestUserNamespaces(t *testing.T) {
return
}
status := <-statusC
if status != 7 {
t.Errorf("expected status 7 from wait but received %d", status)
}
if status, err = task.Delete(ctx); err != nil {
code, _, err := status.Result()
if err != nil {
t.Error(err)
return
}
if status != 7 {
t.Errorf("expected status 7 from delete but received %d", status)
if code != 7 {
t.Errorf("expected status 7 from wait but received %d", code)
}
deleteStatus, err := task.Delete(ctx)
if err != nil {
t.Error(err)
return
}
if deleteStatus != 7 {
t.Errorf("expected status 7 from delete but received %d", deleteStatus)
}
}
@ -1035,14 +1025,11 @@ func TestWaitStoppedTask(t *testing.T) {
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if pid := task.Pid(); pid <= 0 {
t.Errorf("invalid task pid %d", pid)
@ -1052,15 +1039,22 @@ func TestWaitStoppedTask(t *testing.T) {
task.Delete(ctx)
return
}
// wait for the task to stop then call wait again
<-statusC
status, err := task.Wait(ctx)
statusC, err = task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if status != 7 {
t.Errorf("exit status from stopped task should be 7 but received %d", status)
status := <-statusC
code, _, err := status.Result()
if err != nil {
t.Error(err)
return
}
if code != 7 {
t.Errorf("exit status from stopped task should be 7 but received %d", code)
}
}
@ -1107,13 +1101,10 @@ func TestWaitStoppedProcess(t *testing.T) {
}
defer task.Delete(ctx)
finished := make(chan struct{}, 1)
go func() {
if _, err := task.Wait(ctx); err != nil {
t.Error(err)
}
close(finished)
}()
finishedC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
if err := task.Start(ctx); err != nil {
t.Error(err)
@ -1130,14 +1121,12 @@ func TestWaitStoppedProcess(t *testing.T) {
return
}
defer process.Delete(ctx)
processStatusC := make(chan uint32, 1)
go func() {
status, err := process.Wait(ctx)
if err != nil {
t.Error(err)
}
processStatusC <- status
}()
statusC, err := process.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if err := process.Start(ctx); err != nil {
t.Error(err)
@ -1145,20 +1134,28 @@ func TestWaitStoppedProcess(t *testing.T) {
}
// wait for the exec to return
<-processStatusC
<-statusC
// try to wait on the process after it has stopped
status, err := process.Wait(ctx)
statusC, err = process.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if status != 6 {
t.Errorf("exit status from stopped process should be 6 but received %d", status)
status := <-statusC
code, _, err := status.Result()
if err != nil {
t.Error(err)
return
}
if code != 6 {
t.Errorf("exit status from stopped process should be 6 but received %d", code)
}
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
t.Error(err)
}
<-finished
<-finishedC
}
func TestTaskForceDelete(t *testing.T) {
@ -1256,14 +1253,11 @@ func TestProcessForceDelete(t *testing.T) {
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
// task must be started on windows
if err := task.Start(ctx); err != nil {
@ -1344,14 +1338,11 @@ func TestContainerHostname(t *testing.T) {
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
return
}
if err := task.Start(ctx); err != nil {
t.Error(err)
@ -1359,8 +1350,13 @@ func TestContainerHostname(t *testing.T) {
}
status := <-statusC
if status != 0 {
t.Errorf("expected status 0 but received %d", status)
code, _, err := status.Result()
if err != nil {
t.Error(err)
return
}
if code != 0 {
t.Errorf("expected status 0 but received %d", code)
}
if _, err := task.Delete(ctx); err != nil {
t.Error(err)
@ -1420,14 +1416,10 @@ func TestContainerExitedAtSet(t *testing.T) {
}
defer task.Delete(ctx)
statusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
statusC <- status
}()
statusC, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
startTime := time.Now()
if err := task.Start(ctx); err != nil {
@ -1436,8 +1428,9 @@ func TestContainerExitedAtSet(t *testing.T) {
}
status := <-statusC
if status != 0 {
t.Errorf("expected status 0 but received %d", status)
code, _, _ := status.Result()
if code != 0 {
t.Errorf("expected status 0 but received %d", code)
}
if s, err := task.Status(ctx); err != nil {
@ -1495,13 +1488,10 @@ func TestDeleteContainerExecCreated(t *testing.T) {
}
defer task.Delete(ctx)
finished := make(chan struct{}, 1)
go func() {
if _, err := task.Wait(ctx); err != nil {
t.Error(err)
}
close(finished)
}()
finished, err := task.Wait(ctx)
if err != nil {
t.Error(err)
}
if err := task.Start(ctx); err != nil {
t.Error(err)

View File

@ -163,14 +163,10 @@ You always want to make sure you `Wait` before calling `Start` on a task.
This makes sure that you do not encounter any races if the task has a simple program like `/bin/true` that exits promptly after calling start.
```go
exitStatusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
fmt.Println(err)
}
exitStatusC <- status
}()
exitStatusC, err := task.Wait(ctx)
if err != nil {
return err
}
if err := task.Start(ctx); err != nil {
return err
@ -192,7 +188,11 @@ To do this we will simply call `Kill` on the task after waiting a couple of seco
}
status := <-exitStatusC
fmt.Printf("redis-server exited with status: %d\n", status)
code, exitedAt, err := status.Result()
if err != nil {
return err
}
fmt.Printf("redis-server exited with status: %d\n", code)
```
We wait on our exit status channel that we setup to ensure the task has fully exited and we get the exit status.
@ -271,14 +271,10 @@ func redisExample() error {
defer task.Delete(ctx)
// make sure we wait before calling start
exitStatusC := make(chan uint32, 1)
go func() {
status, err := task.Wait(ctx)
if err != nil {
fmt.Println(err)
}
exitStatusC <- status
}()
exitStatusC, err := task.Wait(ctx)
if err != nil {
fmt.Println(err)
}
// call start on the task to execute the redis server
if err := task.Start(ctx); err != nil {
@ -296,7 +292,11 @@ func redisExample() error {
// wait for the process to fully exit and print out the exit status
status := <-exitStatusC
fmt.Printf("redis-server exited with status: %d\n", status)
code, _, err := status.Result()
if err != nil {
return err
}
fmt.Printf("redis-server exited with status: %d\n", code)
return nil
}

View File

@ -4,6 +4,7 @@ import (
"context"
"strings"
"syscall"
"time"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/api/services/tasks/v1"
@ -24,8 +25,8 @@ type Process interface {
Delete(context.Context, ...ProcessDeleteOpts) (uint32, error)
// Kill sends the provided signal to the process
Kill(context.Context, syscall.Signal) error
// Wait blocks until the process has exited returning the exit status
Wait(context.Context) (uint32, error)
// Wait asynchronously waits for the process to exit, and sends the exit code to the returned channel
Wait(context.Context) (<-chan ExitStatus, error)
// CloseIO allows various pipes to be closed on the process
CloseIO(context.Context, ...IOCloserOpts) error
// Resize changes the width and heigh of the process's terminal
@ -36,6 +37,23 @@ type Process interface {
Status(context.Context) (Status, error)
}
// ExitStatus encapsulates a process' exit status.
// It is used by `Wait()` to return either a process exit code or an error
type ExitStatus struct {
code uint32
exitedAt time.Time
err error
}
// Result returns the exit code and time of the exit status.
// An error may be returned here to which indicates there was an error
// at some point while waiting for the exit status. It does not signify
// an error with the process itself.
// If an error is returned, the process may still be running.
func (s ExitStatus) Result() (uint32, time.Time, error) {
return s.code, s.exitedAt, s.err
}
type process struct {
id string
task *task
@ -79,39 +97,55 @@ func (p *process) Kill(ctx context.Context, s syscall.Signal) error {
return errdefs.FromGRPC(err)
}
func (p *process) Wait(ctx context.Context) (uint32, error) {
func (p *process) Wait(ctx context.Context) (<-chan ExitStatus, error) {
cancellable, cancel := context.WithCancel(ctx)
defer cancel()
eventstream, err := p.task.client.EventService().Subscribe(cancellable, &eventsapi.SubscribeRequest{
Filters: []string{"topic==" + runtime.TaskExitEventTopic},
})
if err != nil {
return UnknownExitStatus, err
cancel()
return nil, err
}
// first check if the task has exited
status, err := p.Status(ctx)
if err != nil {
return UnknownExitStatus, errdefs.FromGRPC(err)
cancel()
return nil, errdefs.FromGRPC(err)
}
chStatus := make(chan ExitStatus, 1)
if status.Status == Stopped {
return status.ExitStatus, nil
cancel()
chStatus <- ExitStatus{code: status.ExitStatus, exitedAt: status.ExitTime}
return chStatus, nil
}
for {
evt, err := eventstream.Recv()
if err != nil {
return UnknownExitStatus, err
}
if typeurl.Is(evt.Event, &eventsapi.TaskExit{}) {
v, err := typeurl.UnmarshalAny(evt.Event)
go func() {
defer cancel()
chStatus <- ExitStatus{} // signal that the goroutine is running
for {
evt, err := eventstream.Recv()
if err != nil {
return UnknownExitStatus, err
chStatus <- ExitStatus{code: UnknownExitStatus, err: err}
return
}
e := v.(*eventsapi.TaskExit)
if e.ID == p.id && e.ContainerID == p.task.id {
return e.ExitStatus, nil
if typeurl.Is(evt.Event, &eventsapi.TaskExit{}) {
v, err := typeurl.UnmarshalAny(evt.Event)
if err != nil {
chStatus <- ExitStatus{code: UnknownExitStatus, err: err}
return
}
e := v.(*eventsapi.TaskExit)
if e.ID == p.id && e.ContainerID == p.task.id {
chStatus <- ExitStatus{code: e.ExitStatus, exitedAt: e.ExitedAt}
return
}
}
}
}
}()
<-chStatus // wait for the goroutine to be running
return chStatus, nil
}
func (p *process) CloseIO(ctx context.Context, opts ...IOCloserOpts) error {

51
task.go
View File

@ -201,15 +201,18 @@ func (t *task) Status(ctx context.Context) (Status, error) {
}, nil
}
func (t *task) Wait(ctx context.Context) (uint32, error) {
func (t *task) Wait(ctx context.Context) (<-chan ExitStatus, error) {
cancellable, cancel := context.WithCancel(ctx)
defer cancel()
eventstream, err := t.client.EventService().Subscribe(cancellable, &eventsapi.SubscribeRequest{
Filters: []string{"topic==" + runtime.TaskExitEventTopic},
})
if err != nil {
return UnknownExitStatus, errdefs.FromGRPC(err)
cancel()
return nil, errdefs.FromGRPC(err)
}
chStatus := make(chan ExitStatus, 1)
t.mu.Lock()
checkpoint := t.deferred != nil
t.mu.Unlock()
@ -217,28 +220,42 @@ func (t *task) Wait(ctx context.Context) (uint32, error) {
// first check if the task has exited
status, err := t.Status(ctx)
if err != nil {
return UnknownExitStatus, errdefs.FromGRPC(err)
cancel()
return nil, errdefs.FromGRPC(err)
}
if status.Status == Stopped {
return status.ExitStatus, nil
cancel()
chStatus <- ExitStatus{code: status.ExitStatus, exitedAt: status.ExitTime}
return chStatus, nil
}
}
for {
evt, err := eventstream.Recv()
if err != nil {
return UnknownExitStatus, errdefs.FromGRPC(err)
}
if typeurl.Is(evt.Event, &eventsapi.TaskExit{}) {
v, err := typeurl.UnmarshalAny(evt.Event)
go func() {
defer cancel()
chStatus <- ExitStatus{} // signal that goroutine is running
for {
evt, err := eventstream.Recv()
if err != nil {
return UnknownExitStatus, err
chStatus <- ExitStatus{code: UnknownExitStatus, err: errdefs.FromGRPC(err)}
return
}
e := v.(*eventsapi.TaskExit)
if e.ContainerID == t.id && e.Pid == t.pid {
return e.ExitStatus, nil
if typeurl.Is(evt.Event, &eventsapi.TaskExit{}) {
v, err := typeurl.UnmarshalAny(evt.Event)
if err != nil {
chStatus <- ExitStatus{code: UnknownExitStatus, err: err}
return
}
e := v.(*eventsapi.TaskExit)
if e.ContainerID == t.id && e.Pid == t.pid {
chStatus <- ExitStatus{code: e.ExitStatus, exitedAt: e.ExitedAt}
return
}
}
}
}
}()
<-chStatus // wait for the goroutine to be running
return chStatus, nil
}
// Delete deletes the task and its runtime state

View File

@ -33,15 +33,14 @@ type ProcessDeleteOpts func(context.Context, Process) error
// WithProcessKill will forcefully kill and delete a process
func WithProcessKill(ctx context.Context, p Process) error {
s := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// ignore errors to wait and kill as we are forcefully killing
// the process and don't care about the exit status
go func() {
p.Wait(ctx)
close(s)
}()
s, err := p.Wait(ctx)
if err != nil {
return err
}
if err := p.Kill(ctx, syscall.SIGKILL); err != nil {
if errdefs.IsFailedPrecondition(err) || errdefs.IsNotFound(err) {
return nil