From c80ca4f4a22244d6d7b3f061e8d30ad7cc272439 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Tue, 9 Jan 2018 18:16:07 -0800 Subject: [PATCH] services/tasks, linux: ignore shutdown tasks Because tasks may be deleted while listing containers, we need to ignore errors from state requests that are due to a closed error. All of these get mapped to ErrNotFound, which can be used to filter the entries. There may be a better fix that does a better job of keeping track of the intended state of a backend task. The current condition of assuming that a closed client is a shutdown task may be too naive. Signed-off-by: Stephen J Day --- linux/process.go | 11 ++++++++++- linux/shim/service.go | 4 ++-- linux/task.go | 7 +++---- services/tasks/service.go | 8 +++++--- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/linux/process.go b/linux/process.go index 10acc69d5..94541ec66 100644 --- a/linux/process.go +++ b/linux/process.go @@ -10,6 +10,8 @@ import ( "github.com/containerd/containerd/errdefs" shim "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/runtime" + "github.com/pkg/errors" + "github.com/stevvooe/ttrpc" ) // Process implements a linux process @@ -44,7 +46,14 @@ func (p *Process) State(ctx context.Context) (runtime.State, error) { ID: p.id, }) if err != nil { - return runtime.State{}, errdefs.FromGRPC(err) + if errors.Cause(err) != ttrpc.ErrClosed { + return runtime.State{}, errdefs.FromGRPC(err) + } + + // We treat ttrpc.ErrClosed as the shim being closed, but really this + // likely means that the process no longer exists. We'll have to plumb + // the connection differently if this causes problems. + return runtime.State{}, errdefs.ErrNotFound } var status runtime.Status switch response.Status { diff --git a/linux/shim/service.go b/linux/shim/service.go index 1150d1cc8..96fc0ec4a 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -129,7 +129,7 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi. defer s.mu.Unlock() p := s.processes[r.ID] if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", r.ID) + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s", r.ID) } if err := p.Start(ctx); err != nil { return nil, err @@ -238,7 +238,7 @@ func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi. defer s.mu.Unlock() p := s.processes[r.ID] if p == nil { - return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s not found", r.ID) + return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s", r.ID) } st, err := p.Status(ctx) if err != nil { diff --git a/linux/task.go b/linux/task.go index 4d1e93fb1..023c7b19a 100644 --- a/linux/task.go +++ b/linux/task.go @@ -6,9 +6,6 @@ import ( "context" "sync" - "github.com/pkg/errors" - "google.golang.org/grpc" - "github.com/containerd/cgroups" eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types/task" @@ -20,6 +17,8 @@ import ( "github.com/containerd/containerd/runtime" runc "github.com/containerd/go-runc" "github.com/gogo/protobuf/types" + "github.com/pkg/errors" + "github.com/stevvooe/ttrpc" ) // Task on a linux based system @@ -109,7 +108,7 @@ func (t *Task) State(ctx context.Context) (runtime.State, error) { ID: t.id, }) if err != nil { - if err != grpc.ErrServerStopped { + if errors.Cause(err) != ttrpc.ErrClosed { return runtime.State{}, errdefs.FromGRPC(err) } return runtime.State{}, errdefs.ErrNotFound diff --git a/services/tasks/service.go b/services/tasks/service.go index 21b1813f4..4236f63fd 100644 --- a/services/tasks/service.go +++ b/services/tasks/service.go @@ -224,7 +224,7 @@ func (s *service) DeleteProcess(ctx context.Context, r *api.DeleteProcessRequest func processFromContainerd(ctx context.Context, p runtime.Process) (*task.Process, error) { state, err := p.State(ctx) if err != nil { - return nil, errdefs.ToGRPC(err) + return nil, err } var status task.Status switch state.Status { @@ -267,7 +267,7 @@ func (s *service) Get(ctx context.Context, r *api.GetRequest) (*api.GetResponse, } t, err := processFromContainerd(ctx, p) if err != nil { - return nil, err + return nil, errdefs.ToGRPC(err) } return &api.GetResponse{ Process: t, @@ -290,7 +290,9 @@ func addTasks(ctx context.Context, r *api.ListTasksResponse, tasks []runtime.Tas for _, t := range tasks { tt, err := processFromContainerd(ctx, t) if err != nil { - log.G(ctx).WithError(err).WithField("id", t.ID()).Error("converting task to protobuf") + if !errdefs.IsNotFound(err) { // handle race with deletion + log.G(ctx).WithError(err).WithField("id", t.ID()).Error("converting task to protobuf") + } continue } r.Tasks = append(r.Tasks, tt)