Merge pull request #2589 from crosbymichael/shim-robo
Shim locking improvements and context cancel
This commit is contained in:
commit
0649e38c57
@ -62,6 +62,11 @@ func (e *epoller) Close() error {
|
|||||||
func (e *epoller) run(ctx context.Context) {
|
func (e *epoller) run(ctx context.Context) {
|
||||||
var events [128]unix.EpollEvent
|
var events [128]unix.EpollEvent
|
||||||
for {
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
e.Close()
|
||||||
|
return
|
||||||
|
default:
|
||||||
n, err := unix.EpollWait(e.fd, events[:], -1)
|
n, err := unix.EpollWait(e.fd, events[:], -1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == unix.EINTR {
|
if err == unix.EINTR {
|
||||||
@ -73,6 +78,7 @@ func (e *epoller) run(ctx context.Context) {
|
|||||||
e.process(ctx, uintptr(events[i].Fd))
|
e.process(ctx, uintptr(events[i].Fd))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *epoller) add(id string, cg cgroups.Cgroup) error {
|
func (e *epoller) add(id string, cg cgroups.Cgroup) error {
|
||||||
|
@ -68,6 +68,7 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
go ep.run(ctx)
|
go ep.run(ctx)
|
||||||
s := &service{
|
s := &service{
|
||||||
id: id,
|
id: id,
|
||||||
@ -76,10 +77,12 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim,
|
|||||||
events: make(chan interface{}, 128),
|
events: make(chan interface{}, 128),
|
||||||
ec: shim.Default.Subscribe(),
|
ec: shim.Default.Subscribe(),
|
||||||
ep: ep,
|
ep: ep,
|
||||||
|
cancel: cancel,
|
||||||
}
|
}
|
||||||
go s.processExits()
|
go s.processExits()
|
||||||
runcC.Monitor = shim.Default
|
runcC.Monitor = shim.Default
|
||||||
if err := s.initPlatform(); err != nil {
|
if err := s.initPlatform(); err != nil {
|
||||||
|
cancel()
|
||||||
return nil, errors.Wrap(err, "failed to initialized platform behavior")
|
return nil, errors.Wrap(err, "failed to initialized platform behavior")
|
||||||
}
|
}
|
||||||
go s.forward(publisher)
|
go s.forward(publisher)
|
||||||
@ -101,6 +104,7 @@ type service struct {
|
|||||||
id string
|
id string
|
||||||
bundle string
|
bundle string
|
||||||
cg cgroups.Cgroup
|
cg cgroups.Cgroup
|
||||||
|
cancel func()
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCommand(ctx context.Context, containerdBinary, containerdAddress string) (*exec.Cmd, error) {
|
func newCommand(ctx context.Context, containerdBinary, containerdAddress string) (*exec.Cmd, error) {
|
||||||
@ -300,7 +304,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Errorf("loading cgroup for %d", pid)
|
logrus.WithError(err).Errorf("loading cgroup for %d", pid)
|
||||||
}
|
}
|
||||||
s.setCgroup(cg)
|
s.cg = cg
|
||||||
}
|
}
|
||||||
s.task = process
|
s.task = process
|
||||||
return &taskAPI.CreateTaskResponse{
|
return &taskAPI.CreateTaskResponse{
|
||||||
@ -311,8 +315,6 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
|
|||||||
|
|
||||||
// Start a process
|
// Start a process
|
||||||
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
|
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
p, err := s.getProcess(r.ExecID)
|
p, err := s.getProcess(r.ExecID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -320,7 +322,8 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
|
|||||||
if err := p.Start(ctx); err != nil {
|
if err := p.Start(ctx); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if s.cg == nil && p.Pid() > 0 {
|
// case for restore
|
||||||
|
if s.getCgroup() == nil && p.Pid() > 0 {
|
||||||
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid()))
|
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid())
|
logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid())
|
||||||
@ -334,8 +337,6 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
|
|||||||
|
|
||||||
// Delete the initial process and container
|
// Delete the initial process and container
|
||||||
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
|
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
p, err := s.getProcess(r.ExecID)
|
p, err := s.getProcess(r.ExecID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -348,7 +349,9 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
|
|||||||
}
|
}
|
||||||
isTask := r.ExecID == ""
|
isTask := r.ExecID == ""
|
||||||
if !isTask {
|
if !isTask {
|
||||||
|
s.mu.Lock()
|
||||||
delete(s.processes, r.ExecID)
|
delete(s.processes, r.ExecID)
|
||||||
|
s.mu.Unlock()
|
||||||
}
|
}
|
||||||
if isTask && s.platform != nil {
|
if isTask && s.platform != nil {
|
||||||
s.platform.Close()
|
s.platform.Close()
|
||||||
@ -363,11 +366,12 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
|
|||||||
// Exec an additional process inside the container
|
// Exec an additional process inside the container
|
||||||
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
|
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
p := s.processes[r.ExecID]
|
||||||
if p := s.processes[r.ExecID]; p != nil {
|
s.mu.Unlock()
|
||||||
|
if p != nil {
|
||||||
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
|
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
|
||||||
}
|
}
|
||||||
p := s.task
|
p = s.task
|
||||||
if p == nil {
|
if p == nil {
|
||||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||||
}
|
}
|
||||||
@ -382,14 +386,14 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errdefs.ToGRPC(err)
|
return nil, errdefs.ToGRPC(err)
|
||||||
}
|
}
|
||||||
|
s.mu.Lock()
|
||||||
s.processes[r.ExecID] = process
|
s.processes[r.ExecID] = process
|
||||||
|
s.mu.Unlock()
|
||||||
return empty, nil
|
return empty, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResizePty of a process
|
// ResizePty of a process
|
||||||
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
|
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
p, err := s.getProcess(r.ExecID)
|
p, err := s.getProcess(r.ExecID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -406,8 +410,6 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*
|
|||||||
|
|
||||||
// State returns runtime state information for a process
|
// State returns runtime state information for a process
|
||||||
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
|
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
p, err := s.getProcess(r.ExecID)
|
p, err := s.getProcess(r.ExecID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -447,8 +449,8 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
|
|||||||
// Pause the container
|
// Pause the container
|
||||||
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
|
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
|
||||||
p := s.task
|
p := s.task
|
||||||
|
s.mu.Unlock()
|
||||||
if p == nil {
|
if p == nil {
|
||||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||||
}
|
}
|
||||||
@ -461,8 +463,8 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
|
|||||||
// Resume the container
|
// Resume the container
|
||||||
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
|
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
|
||||||
p := s.task
|
p := s.task
|
||||||
|
s.mu.Unlock()
|
||||||
if p == nil {
|
if p == nil {
|
||||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||||
}
|
}
|
||||||
@ -474,8 +476,6 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
|
|||||||
|
|
||||||
// Kill a process with the provided signal
|
// Kill a process with the provided signal
|
||||||
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
|
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
p, err := s.getProcess(r.ExecID)
|
p, err := s.getProcess(r.ExecID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -522,8 +522,6 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
|
|||||||
|
|
||||||
// CloseIO of a process
|
// CloseIO of a process
|
||||||
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
|
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
p, err := s.getProcess(r.ExecID)
|
p, err := s.getProcess(r.ExecID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -539,8 +537,8 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp
|
|||||||
// Checkpoint the container
|
// Checkpoint the container
|
||||||
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
|
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
|
||||||
p := s.task
|
p := s.task
|
||||||
|
s.mu.Unlock()
|
||||||
if p == nil {
|
if p == nil {
|
||||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||||
}
|
}
|
||||||
@ -579,18 +577,17 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
|
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
|
||||||
|
s.cancel()
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
return empty, nil
|
return empty, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
|
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
|
||||||
s.mu.Lock()
|
cg := s.getCgroup()
|
||||||
defer s.mu.Unlock()
|
if cg == nil {
|
||||||
|
|
||||||
if s.cg == nil {
|
|
||||||
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
|
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
|
||||||
}
|
}
|
||||||
stats, err := s.cg.Stat(cgroups.IgnoreNotExist)
|
stats, err := cg.Stat(cgroups.IgnoreNotExist)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -606,8 +603,8 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
|
|||||||
// Update a running container
|
// Update a running container
|
||||||
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
|
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
|
||||||
p := s.task
|
p := s.task
|
||||||
|
s.mu.Unlock()
|
||||||
if p == nil {
|
if p == nil {
|
||||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||||
}
|
}
|
||||||
@ -619,9 +616,7 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*pt
|
|||||||
|
|
||||||
// Wait for a process to exit
|
// Wait for a process to exit
|
||||||
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
|
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
|
||||||
s.mu.Lock()
|
|
||||||
p, err := s.getProcess(r.ExecID)
|
p, err := s.getProcess(r.ExecID)
|
||||||
s.mu.Unlock()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -643,9 +638,6 @@ func (s *service) processExits() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) checkProcesses(e runcC.Exit) {
|
func (s *service) checkProcesses(e runcC.Exit) {
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
for _, p := range s.allProcesses() {
|
for _, p := range s.allProcesses() {
|
||||||
if p.Pid() == e.Pid {
|
if p.Pid() == e.Pid {
|
||||||
if ip, ok := p.(*proc.Init); ok {
|
if ip, ok := p.(*proc.Init); ok {
|
||||||
@ -669,6 +661,8 @@ func (s *service) checkProcesses(e runcC.Exit) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) allProcesses() (o []rproc.Process) {
|
func (s *service) allProcesses() (o []rproc.Process) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
for _, p := range s.processes {
|
for _, p := range s.processes {
|
||||||
o = append(o, p)
|
o = append(o, p)
|
||||||
}
|
}
|
||||||
@ -680,8 +674,8 @@ func (s *service) allProcesses() (o []rproc.Process) {
|
|||||||
|
|
||||||
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
|
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
|
||||||
p := s.task
|
p := s.task
|
||||||
|
s.mu.Unlock()
|
||||||
if p == nil {
|
if p == nil {
|
||||||
return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created")
|
return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||||
}
|
}
|
||||||
@ -698,13 +692,18 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er
|
|||||||
|
|
||||||
func (s *service) forward(publisher events.Publisher) {
|
func (s *service) forward(publisher events.Publisher) {
|
||||||
for e := range s.events {
|
for e := range s.events {
|
||||||
if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil {
|
ctx, cancel := context.WithTimeout(s.context, 5*time.Second)
|
||||||
|
err := publisher.Publish(ctx, getTopic(e), e)
|
||||||
|
cancel()
|
||||||
|
if err != nil {
|
||||||
logrus.WithError(err).Error("post event")
|
logrus.WithError(err).Error("post event")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) getProcess(execID string) (rproc.Process, error) {
|
func (s *service) getProcess(execID string) (rproc.Process, error) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
if execID == "" {
|
if execID == "" {
|
||||||
return s.task, nil
|
return s.task, nil
|
||||||
}
|
}
|
||||||
@ -715,14 +714,22 @@ func (s *service) getProcess(execID string) (rproc.Process, error) {
|
|||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *service) getCgroup() cgroups.Cgroup {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
return s.cg
|
||||||
|
}
|
||||||
|
|
||||||
func (s *service) setCgroup(cg cgroups.Cgroup) {
|
func (s *service) setCgroup(cg cgroups.Cgroup) {
|
||||||
|
s.mu.Lock()
|
||||||
s.cg = cg
|
s.cg = cg
|
||||||
|
s.mu.Unlock()
|
||||||
if err := s.ep.add(s.id, cg); err != nil {
|
if err := s.ep.add(s.id, cg); err != nil {
|
||||||
logrus.WithError(err).Error("add cg to OOM monitor")
|
logrus.WithError(err).Error("add cg to OOM monitor")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTopic(ctx context.Context, e interface{}) string {
|
func getTopic(e interface{}) string {
|
||||||
switch e.(type) {
|
switch e.(type) {
|
||||||
case *eventstypes.TaskCreate:
|
case *eventstypes.TaskCreate:
|
||||||
return runtime.TaskCreateEventTopic
|
return runtime.TaskCreateEventTopic
|
||||||
|
Loading…
Reference in New Issue
Block a user