Merge pull request #1982 from stevvooe/deletion-race-handling

services/tasks, linux: ignore shutdown tasks
This commit is contained in:
Phil Estes 2018-01-10 16:53:42 -05:00 committed by GitHub
commit 0b449a4a51
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 20 additions and 10 deletions

View File

@ -10,6 +10,8 @@ import (
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
shim "github.com/containerd/containerd/linux/shim/v1" shim "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime"
"github.com/pkg/errors"
"github.com/stevvooe/ttrpc"
) )
// Process implements a linux process // Process implements a linux process
@ -44,8 +46,15 @@ func (p *Process) State(ctx context.Context) (runtime.State, error) {
ID: p.id, ID: p.id,
}) })
if err != nil { if err != nil {
if errors.Cause(err) != ttrpc.ErrClosed {
return runtime.State{}, errdefs.FromGRPC(err) return runtime.State{}, errdefs.FromGRPC(err)
} }
// We treat ttrpc.ErrClosed as the shim being closed, but really this
// likely means that the process no longer exists. We'll have to plumb
// the connection differently if this causes problems.
return runtime.State{}, errdefs.ErrNotFound
}
var status runtime.Status var status runtime.Status
switch response.Status { switch response.Status {
case task.StatusCreated: case task.StatusCreated:

View File

@ -129,7 +129,7 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.
defer s.mu.Unlock() defer s.mu.Unlock()
p := s.processes[r.ID] p := s.processes[r.ID]
if p == nil { if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", r.ID) return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s", r.ID)
} }
if err := p.Start(ctx); err != nil { if err := p.Start(ctx); err != nil {
return nil, err return nil, err
@ -238,7 +238,7 @@ func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.
defer s.mu.Unlock() defer s.mu.Unlock()
p := s.processes[r.ID] p := s.processes[r.ID]
if p == nil { if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s not found", r.ID) return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s", r.ID)
} }
st, err := p.Status(ctx) st, err := p.Status(ctx)
if err != nil { if err != nil {

View File

@ -6,9 +6,6 @@ import (
"context" "context"
"sync" "sync"
"github.com/pkg/errors"
"google.golang.org/grpc"
"github.com/containerd/cgroups" "github.com/containerd/cgroups"
eventstypes "github.com/containerd/containerd/api/events" eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/api/types/task"
@ -20,6 +17,8 @@ import (
"github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime"
runc "github.com/containerd/go-runc" runc "github.com/containerd/go-runc"
"github.com/gogo/protobuf/types" "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/stevvooe/ttrpc"
) )
// Task on a linux based system // Task on a linux based system
@ -109,7 +108,7 @@ func (t *Task) State(ctx context.Context) (runtime.State, error) {
ID: t.id, ID: t.id,
}) })
if err != nil { if err != nil {
if err != grpc.ErrServerStopped { if errors.Cause(err) != ttrpc.ErrClosed {
return runtime.State{}, errdefs.FromGRPC(err) return runtime.State{}, errdefs.FromGRPC(err)
} }
return runtime.State{}, errdefs.ErrNotFound return runtime.State{}, errdefs.ErrNotFound

View File

@ -224,7 +224,7 @@ func (s *service) DeleteProcess(ctx context.Context, r *api.DeleteProcessRequest
func processFromContainerd(ctx context.Context, p runtime.Process) (*task.Process, error) { func processFromContainerd(ctx context.Context, p runtime.Process) (*task.Process, error) {
state, err := p.State(ctx) state, err := p.State(ctx)
if err != nil { if err != nil {
return nil, errdefs.ToGRPC(err) return nil, err
} }
var status task.Status var status task.Status
switch state.Status { switch state.Status {
@ -267,7 +267,7 @@ func (s *service) Get(ctx context.Context, r *api.GetRequest) (*api.GetResponse,
} }
t, err := processFromContainerd(ctx, p) t, err := processFromContainerd(ctx, p)
if err != nil { if err != nil {
return nil, err return nil, errdefs.ToGRPC(err)
} }
return &api.GetResponse{ return &api.GetResponse{
Process: t, Process: t,
@ -290,7 +290,9 @@ func addTasks(ctx context.Context, r *api.ListTasksResponse, tasks []runtime.Tas
for _, t := range tasks { for _, t := range tasks {
tt, err := processFromContainerd(ctx, t) tt, err := processFromContainerd(ctx, t)
if err != nil { if err != nil {
if !errdefs.IsNotFound(err) { // handle race with deletion
log.G(ctx).WithError(err).WithField("id", t.ID()).Error("converting task to protobuf") log.G(ctx).WithError(err).WithField("id", t.ID()).Error("converting task to protobuf")
}
continue continue
} }
r.Tasks = append(r.Tasks, tt) r.Tasks = append(r.Tasks, tt)