Improve shim locking

Only lock around shim state and not on actions

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2018-08-28 12:49:27 -04:00
parent d50e25360c
commit 2205e8d67a

View File

@ -304,7 +304,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
if err != nil {
logrus.WithError(err).Errorf("loading cgroup for %d", pid)
}
s.setCgroup(cg)
s.cg = cg
}
s.task = process
return &taskAPI.CreateTaskResponse{
@ -315,8 +315,6 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
@ -324,7 +322,8 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
if err := p.Start(ctx); err != nil {
return nil, err
}
if s.cg == nil && p.Pid() > 0 {
// case for restore
if s.getCgroup() == nil && p.Pid() > 0 {
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(p.Pid()))
if err != nil {
logrus.WithError(err).Errorf("loading cgroup for %d", p.Pid())
@ -338,8 +337,6 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
// Delete the initial process and container
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
@ -352,7 +349,9 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
}
isTask := r.ExecID == ""
if !isTask {
s.mu.Lock()
delete(s.processes, r.ExecID)
s.mu.Unlock()
}
if isTask && s.platform != nil {
s.platform.Close()
@ -367,11 +366,12 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
// Exec an additional process inside the container
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
if p := s.processes[r.ExecID]; p != nil {
p := s.processes[r.ExecID]
s.mu.Unlock()
if p != nil {
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
}
p := s.task
p = s.task
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
@ -386,14 +386,14 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
if err != nil {
return nil, errdefs.ToGRPC(err)
}
s.mu.Lock()
s.processes[r.ExecID] = process
s.mu.Unlock()
return empty, nil
}
// ResizePty of a process
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
@ -410,8 +410,6 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*
// State returns runtime state information for a process
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
@ -451,8 +449,8 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
// Pause the container
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.task
s.mu.Unlock()
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
@ -465,8 +463,8 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
// Resume the container
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.task
s.mu.Unlock()
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
@ -478,8 +476,6 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
// Kill a process with the provided signal
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
@ -526,8 +522,6 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
// CloseIO of a process
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
@ -543,8 +537,8 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp
// Checkpoint the container
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.task
s.mu.Unlock()
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
@ -589,13 +583,11 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt
}
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.cg == nil {
cg := s.getCgroup()
if cg == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "cgroup does not exist")
}
stats, err := s.cg.Stat(cgroups.IgnoreNotExist)
stats, err := cg.Stat(cgroups.IgnoreNotExist)
if err != nil {
return nil, err
}
@ -611,8 +603,8 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
// Update a running container
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.task
s.mu.Unlock()
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
@ -624,9 +616,7 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*pt
// Wait for a process to exit
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
s.mu.Lock()
p, err := s.getProcess(r.ExecID)
s.mu.Unlock()
if err != nil {
return nil, err
}
@ -648,9 +638,6 @@ func (s *service) processExits() {
}
func (s *service) checkProcesses(e runcC.Exit) {
s.mu.Lock()
defer s.mu.Unlock()
for _, p := range s.allProcesses() {
if p.Pid() == e.Pid {
if ip, ok := p.(*proc.Init); ok {
@ -674,6 +661,8 @@ func (s *service) checkProcesses(e runcC.Exit) {
}
func (s *service) allProcesses() (o []rproc.Process) {
s.mu.Lock()
defer s.mu.Unlock()
for _, p := range s.processes {
o = append(o, p)
}
@ -685,8 +674,8 @@ func (s *service) allProcesses() (o []rproc.Process) {
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.task
s.mu.Unlock()
if p == nil {
return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created")
}
@ -713,6 +702,8 @@ func (s *service) forward(publisher events.Publisher) {
}
func (s *service) getProcess(execID string) (rproc.Process, error) {
s.mu.Lock()
defer s.mu.Unlock()
if execID == "" {
return s.task, nil
}
@ -723,8 +714,16 @@ func (s *service) getProcess(execID string) (rproc.Process, error) {
return p, nil
}
func (s *service) getCgroup() cgroups.Cgroup {
s.mu.Lock()
defer s.mu.Unlock()
return s.cg
}
func (s *service) setCgroup(cg cgroups.Cgroup) {
s.mu.Lock()
s.cg = cg
s.mu.Unlock()
if err := s.ep.add(s.id, cg); err != nil {
logrus.WithError(err).Error("add cg to OOM monitor")
}