Merge pull request #2743 from Ace-Tang/shim_lock
bugfix: optimize shim lock in runtime v1 avoid dead lock
This commit is contained in:
commit
6f8edc2308
@ -114,9 +114,6 @@ type Service struct {
|
||||
|
||||
// Create a new initial process and container with the underlying OCI runtime
|
||||
func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ *shimapi.CreateTaskResponse, err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
var mounts []proc.Mount
|
||||
for _, m := range r.Rootfs {
|
||||
mounts = append(mounts, proc.Mount{
|
||||
@ -158,6 +155,10 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ *
|
||||
return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m)
|
||||
}
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
process, err := newInit(
|
||||
ctx,
|
||||
s.config.Path,
|
||||
@ -187,11 +188,9 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (_ *
|
||||
|
||||
// Start a process
|
||||
func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.StartResponse, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
p := s.processes[r.ID]
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s", r.ID)
|
||||
p, err := s.getExecProcess(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := p.Start(ctx); err != nil {
|
||||
return nil, err
|
||||
@ -204,16 +203,16 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.
|
||||
|
||||
// Delete the initial process and container
|
||||
func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteResponse, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
p := s.processes[s.id]
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
p, err := s.getInitProcess()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := p.Delete(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.mu.Lock()
|
||||
delete(s.processes, s.id)
|
||||
s.mu.Unlock()
|
||||
s.platform.Close()
|
||||
return &shimapi.DeleteResponse{
|
||||
ExitStatus: uint32(p.ExitStatus()),
|
||||
@ -227,11 +226,9 @@ func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessReq
|
||||
if r.ID == s.id {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "cannot delete init process with DeleteProcess")
|
||||
}
|
||||
s.mu.Lock()
|
||||
p := s.processes[r.ID]
|
||||
s.mu.Unlock()
|
||||
if p == nil {
|
||||
return nil, errors.Wrapf(errdefs.ErrNotFound, "process %s", r.ID)
|
||||
p, err := s.getExecProcess(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := p.Delete(ctx); err != nil {
|
||||
return nil, err
|
||||
@ -249,13 +246,14 @@ func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessReq
|
||||
// Exec an additional process inside the container
|
||||
func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*ptypes.Empty, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if p := s.processes[r.ID]; p != nil {
|
||||
s.mu.Unlock()
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ID)
|
||||
}
|
||||
|
||||
p := s.processes[s.id]
|
||||
s.mu.Unlock()
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
}
|
||||
@ -271,14 +269,14 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*pty
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
s.mu.Lock()
|
||||
s.processes[r.ID] = process
|
||||
s.mu.Unlock()
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
// ResizePty of a process
|
||||
func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*ptypes.Empty, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if r.ID == "" {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "id not provided")
|
||||
}
|
||||
@ -286,7 +284,9 @@ func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*
|
||||
Width: uint16(r.Width),
|
||||
Height: uint16(r.Height),
|
||||
}
|
||||
s.mu.Lock()
|
||||
p := s.processes[r.ID]
|
||||
s.mu.Unlock()
|
||||
if p == nil {
|
||||
return nil, errors.Errorf("process does not exist %s", r.ID)
|
||||
}
|
||||
@ -298,11 +298,9 @@ func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*
|
||||
|
||||
// State returns runtime state information for a process
|
||||
func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
p := s.processes[r.ID]
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s", r.ID)
|
||||
p, err := s.getExecProcess(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
st, err := p.Status(ctx)
|
||||
if err != nil {
|
||||
@ -338,11 +336,9 @@ func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.
|
||||
|
||||
// Pause the container
|
||||
func (s *Service) Pause(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
p := s.processes[s.id]
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
p, err := s.getInitProcess()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := p.(*proc.Init).Pause(ctx); err != nil {
|
||||
return nil, err
|
||||
@ -352,11 +348,9 @@ func (s *Service) Pause(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, er
|
||||
|
||||
// Resume the container
|
||||
func (s *Service) Resume(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
p := s.processes[s.id]
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
p, err := s.getInitProcess()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := p.(*proc.Init).Resume(ctx); err != nil {
|
||||
return nil, err
|
||||
@ -366,12 +360,10 @@ func (s *Service) Resume(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, e
|
||||
|
||||
// Kill a process with the provided signal
|
||||
func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*ptypes.Empty, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if r.ID == "" {
|
||||
p := s.processes[s.id]
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
p, err := s.getInitProcess()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := p.Kill(ctx, r.Signal, r.All); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
@ -379,9 +371,9 @@ func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*ptypes.Emp
|
||||
return empty, nil
|
||||
}
|
||||
|
||||
p := s.processes[r.ID]
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s not found", r.ID)
|
||||
p, err := s.getExecProcess(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := p.Kill(ctx, r.Signal, r.All); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
@ -422,11 +414,9 @@ func (s *Service) ListPids(ctx context.Context, r *shimapi.ListPidsRequest) (*sh
|
||||
|
||||
// CloseIO of a process
|
||||
func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*ptypes.Empty, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
p := s.processes[r.ID]
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process does not exist %s", r.ID)
|
||||
p, err := s.getExecProcess(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if stdin := p.Stdin(); stdin != nil {
|
||||
if err := stdin.Close(); err != nil {
|
||||
@ -438,11 +428,9 @@ func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*ptyp
|
||||
|
||||
// Checkpoint the container
|
||||
func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) (*ptypes.Empty, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
p := s.processes[s.id]
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
p, err := s.getInitProcess()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var options runctypes.CheckpointOptions
|
||||
if r.Options != nil {
|
||||
@ -475,11 +463,9 @@ func (s *Service) ShimInfo(ctx context.Context, r *ptypes.Empty) (*shimapi.ShimI
|
||||
|
||||
// Update a running container
|
||||
func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*ptypes.Empty, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
p := s.processes[s.id]
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
p, err := s.getInitProcess()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := p.(*proc.Init).Update(ctx, r.Resources); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
@ -489,11 +475,9 @@ func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*pt
|
||||
|
||||
// Wait for a process to exit
|
||||
func (s *Service) Wait(ctx context.Context, r *shimapi.WaitRequest) (*shimapi.WaitResponse, error) {
|
||||
s.mu.Lock()
|
||||
p := s.processes[r.ID]
|
||||
s.mu.Unlock()
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
p, err := s.getExecProcess(r.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.Wait()
|
||||
|
||||
@ -563,11 +547,9 @@ func shouldKillAllOnExit(bundlePath string) (bool, error) {
|
||||
}
|
||||
|
||||
func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
p := s.processes[s.id]
|
||||
if p == nil {
|
||||
return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
p, err := s.getInitProcess()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ps, err := p.(*proc.Init).Runtime().Ps(ctx, id)
|
||||
@ -589,6 +571,30 @@ func (s *Service) forward(publisher events.Publisher) {
|
||||
}
|
||||
}
|
||||
|
||||
// getInitProcess returns initial process
|
||||
func (s *Service) getInitProcess() (rproc.Process, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
p := s.processes[s.id]
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// getExecProcess returns exec process
|
||||
func (s *Service) getExecProcess(id string) (rproc.Process, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
p := s.processes[id]
|
||||
if p == nil {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s does not exist", id)
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func getTopic(ctx context.Context, e interface{}) string {
|
||||
switch e.(type) {
|
||||
case *eventstypes.TaskCreate:
|
||||
|
Loading…
Reference in New Issue
Block a user