diff --git a/runtime/v2/runc/epoll.go b/runtime/v2/runc/epoll.go index e57ec7b4c..6aea9b8c1 100644 --- a/runtime/v2/runc/epoll.go +++ b/runtime/v2/runc/epoll.go @@ -62,15 +62,21 @@ func (e *epoller) Close() error { func (e *epoller) run(ctx context.Context) { var events [128]unix.EpollEvent for { - n, err := unix.EpollWait(e.fd, events[:], -1) - if err != nil { - if err == unix.EINTR { - continue + select { + case <-ctx.Done(): + e.Close() + return + default: + n, err := unix.EpollWait(e.fd, events[:], -1) + if err != nil { + if err == unix.EINTR { + continue + } + logrus.WithError(err).Error("cgroups: epoll wait") + } + for i := 0; i < n; i++ { + e.process(ctx, uintptr(events[i].Fd)) } - logrus.WithError(err).Error("cgroups: epoll wait") - } - for i := 0; i < n; i++ { - e.process(ctx, uintptr(events[i].Fd)) } } } diff --git a/runtime/v2/runc/service.go b/runtime/v2/runc/service.go index 55dbeb069..aeb48cfe5 100644 --- a/runtime/v2/runc/service.go +++ b/runtime/v2/runc/service.go @@ -68,6 +68,7 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, if err != nil { return nil, err } + ctx, cancel := context.WithCancel(ctx) go ep.run(ctx) s := &service{ id: id, @@ -76,10 +77,12 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, events: make(chan interface{}, 128), ec: shim.Default.Subscribe(), ep: ep, + cancel: cancel, } go s.processExits() runcC.Monitor = shim.Default if err := s.initPlatform(); err != nil { + cancel() return nil, errors.Wrap(err, "failed to initialized platform behavior") } go s.forward(publisher) @@ -101,6 +104,7 @@ type service struct { id string bundle string cg cgroups.Cgroup + cancel func() } func newCommand(ctx context.Context, containerdBinary, containerdAddress string) (*exec.Cmd, error) { @@ -300,7 +304,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * if err != nil { logrus.WithError(err).Errorf("loading cgroup for %d", pid) } - s.setCgroup(cg) + s.cg = cg } s.task = process return &taskAPI.CreateTaskResponse{ @@ -311,8 +315,6 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ * // Start a process func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) { - s.mu.Lock() - defer s.mu.Unlock() p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -320,7 +322,8 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. if err := p.Start(ctx); err != nil { return nil, err } - if s.cg == nil && p.Pid() > 0 { + // case for restore + if s.getCgroup() == nil && p.Pid() > 0 { cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid())) if err != nil { logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid()) @@ -334,8 +337,6 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. // Delete the initial process and container func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) { - s.mu.Lock() - defer s.mu.Unlock() p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -348,7 +349,9 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP } isTask := r.ExecID == "" if !isTask { + s.mu.Lock() delete(s.processes, r.ExecID) + s.mu.Unlock() } if isTask && s.platform != nil { s.platform.Close() @@ -363,11 +366,12 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP // Exec an additional process inside the container func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) { s.mu.Lock() - defer s.mu.Unlock() - if p := s.processes[r.ExecID]; p != nil { + p := s.processes[r.ExecID] + s.mu.Unlock() + if p != nil { return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID) } - p := s.task + p = s.task if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } @@ -382,14 +386,14 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty if err != nil { return nil, errdefs.ToGRPC(err) } + s.mu.Lock() s.processes[r.ExecID] = process + s.mu.Unlock() return empty, nil } // ResizePty of a process func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) { - s.mu.Lock() - defer s.mu.Unlock() p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -406,8 +410,6 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (* // State returns runtime state information for a process func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) { - s.mu.Lock() - defer s.mu.Unlock() p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -447,8 +449,8 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI. // Pause the container func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) { s.mu.Lock() - defer s.mu.Unlock() p := s.task + s.mu.Unlock() if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } @@ -461,8 +463,8 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E // Resume the container func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) { s.mu.Lock() - defer s.mu.Unlock() p := s.task + s.mu.Unlock() if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } @@ -474,8 +476,6 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes // Kill a process with the provided signal func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) { - s.mu.Lock() - defer s.mu.Unlock() p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -522,8 +522,6 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi // CloseIO of a process func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) { - s.mu.Lock() - defer s.mu.Unlock() p, err := s.getProcess(r.ExecID) if err != nil { return nil, err @@ -539,8 +537,8 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp // Checkpoint the container func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) { s.mu.Lock() - defer s.mu.Unlock() p := s.task + s.mu.Unlock() if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } @@ -579,18 +577,17 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task } func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { + s.cancel() os.Exit(0) return empty, nil } func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) { - s.mu.Lock() - defer s.mu.Unlock() - - if s.cg == nil { + cg := s.getCgroup() + if cg == nil { return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist") } - stats, err := s.cg.Stat(cgroups.IgnoreNotExist) + stats, err := cg.Stat(cgroups.IgnoreNotExist) if err != nil { return nil, err } @@ -606,8 +603,8 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI. // Update a running container func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) { s.mu.Lock() - defer s.mu.Unlock() p := s.task + s.mu.Unlock() if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } @@ -619,9 +616,7 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*pt // Wait for a process to exit func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) { - s.mu.Lock() p, err := s.getProcess(r.ExecID) - s.mu.Unlock() if err != nil { return nil, err } @@ -643,9 +638,6 @@ func (s *service) processExits() { } func (s *service) checkProcesses(e runcC.Exit) { - s.mu.Lock() - defer s.mu.Unlock() - for _, p := range s.allProcesses() { if p.Pid() == e.Pid { if ip, ok := p.(*proc.Init); ok { @@ -669,6 +661,8 @@ func (s *service) checkProcesses(e runcC.Exit) { } func (s *service) allProcesses() (o []rproc.Process) { + s.mu.Lock() + defer s.mu.Unlock() for _, p := range s.processes { o = append(o, p) } @@ -680,8 +674,8 @@ func (s *service) allProcesses() (o []rproc.Process) { func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) { s.mu.Lock() - defer s.mu.Unlock() p := s.task + s.mu.Unlock() if p == nil { return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created") } @@ -698,13 +692,18 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er func (s *service) forward(publisher events.Publisher) { for e := range s.events { - if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil { + ctx, cancel := context.WithTimeout(s.context, 5*time.Second) + err := publisher.Publish(ctx, getTopic(e), e) + cancel() + if err != nil { logrus.WithError(err).Error("post event") } } } func (s *service) getProcess(execID string) (rproc.Process, error) { + s.mu.Lock() + defer s.mu.Unlock() if execID == "" { return s.task, nil } @@ -715,14 +714,22 @@ func (s *service) getProcess(execID string) (rproc.Process, error) { return p, nil } +func (s *service) getCgroup() cgroups.Cgroup { + s.mu.Lock() + defer s.mu.Unlock() + return s.cg +} + func (s *service) setCgroup(cg cgroups.Cgroup) { + s.mu.Lock() s.cg = cg + s.mu.Unlock() if err := s.ep.add(s.id, cg); err != nil { logrus.WithError(err).Error("add cg to OOM monitor") } } -func getTopic(ctx context.Context, e interface{}) string { +func getTopic(e interface{}) string { switch e.(type) { case *eventstypes.TaskCreate: return runtime.TaskCreateEventTopic