From 951c129bf1dde8c3fba2d0400b540edb55b59fc4 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 7 Sep 2017 14:29:58 -0400 Subject: [PATCH] Handle locking and errors for process state ref: #1464 This tries to solve issues with races around process state. First it adds the process mutex around the state call so that any state changes, deletions, etc will be handled in order. Second, for IsNoExist errors from the runtime, return a stopped state if a process has been removed from the underlying OCI runtime but not from the shim yet. This shouldn't happen with the lock from above but its hare to verify this issue. Third, handle shim disconnections and return an ErrNotFound. Forth, don't abort returning all tasks if one task is unable to return its state. Signed-off-by: Michael Crosby --- linux/shim/init.go | 5 +++++ linux/task.go | 7 ++++++- services/tasks/service.go | 19 ++++++++++++------- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/linux/shim/init.go b/linux/shim/init.go index 34aee992b..19a76bbbf 100644 --- a/linux/shim/init.go +++ b/linux/shim/init.go @@ -222,8 +222,13 @@ func (p *initProcess) ExitedAt() time.Time { } func (p *initProcess) Status(ctx context.Context) (string, error) { + p.mu.Lock() + defer p.mu.Unlock() c, err := p.runtime.State(ctx, p.id) if err != nil { + if os.IsNotExist(err) { + return "stopped", nil + } return "", p.runtimeError(err, "OCI runtime state failed") } return c.Status, nil diff --git a/linux/task.go b/linux/task.go index f18ec529a..4b5ac3a01 100644 --- a/linux/task.go +++ b/linux/task.go @@ -5,6 +5,8 @@ package linux import ( "context" + "google.golang.org/grpc" + "github.com/containerd/cgroups" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" @@ -63,7 +65,10 @@ func (t *Task) State(ctx context.Context) (runtime.State, error) { ID: t.id, }) if err != nil { - return runtime.State{}, errdefs.FromGRPC(err) + if err != grpc.ErrServerStopped { + return runtime.State{}, errdefs.FromGRPC(err) + } + return runtime.State{}, errdefs.ErrNotFound } var status runtime.Status switch response.Status { diff --git a/services/tasks/service.go b/services/tasks/service.go index 3cb6288a1..62dd90d1d 100644 --- a/services/tasks/service.go +++ b/services/tasks/service.go @@ -274,17 +274,22 @@ func (s *Service) List(ctx context.Context, r *api.ListTasksRequest) (*api.ListT if err != nil { return nil, errdefs.ToGRPC(err) } - for _, t := range tasks { - tt, err := processFromContainerd(ctx, t) - if err != nil { - return nil, err - } - resp.Tasks = append(resp.Tasks, tt) - } + addTasks(ctx, resp, tasks) } return resp, nil } +func addTasks(ctx context.Context, r *api.ListTasksResponse, tasks []runtime.Task) { + for _, t := range tasks { + tt, err := processFromContainerd(ctx, t) + if err != nil { + log.G(ctx).WithError(err).WithField("id", t.ID()).Error("converting task to protobuf") + continue + } + r.Tasks = append(r.Tasks, tt) + } +} + func (s *Service) Pause(ctx context.Context, r *api.PauseTaskRequest) (*google_protobuf.Empty, error) { t, err := s.getTask(ctx, r.ContainerID) if err != nil {