Merge pull request #3365 from crosbymichael/exec-lk

Reserve exec id to prevent race
This commit is contained in:
Phil Estes 2019-06-25 08:59:41 +08:00 committed by GitHub
commit 287582585f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 36 additions and 13 deletions

View File

@ -125,10 +125,11 @@ func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTa
return nil, errdefs.ToGRPC(err)
}
container := &Container{
ID: r.ID,
Bundle: r.Bundle,
process: process,
processes: make(map[string]rproc.Process),
ID: r.ID,
Bundle: r.Bundle,
process: process,
processes: make(map[string]rproc.Process),
reservedProcess: make(map[string]struct{}),
}
pid := process.Pid()
if pid > 0 {
@ -189,9 +190,10 @@ type Container struct {
// Bundle path
Bundle string
cgroup cgroups.Cgroup
process rproc.Process
processes map[string]rproc.Process
cgroup cgroups.Cgroup
process rproc.Process
processes map[string]rproc.Process
reservedProcess map[string]struct{}
}
// All processes in the container
@ -256,18 +258,35 @@ func (c *Container) Process(id string) (rproc.Process, error) {
return p, nil
}
// ProcessExists returns true if the process by id exists
func (c *Container) ProcessExists(id string) bool {
// ReserveProcess checks for the existence of an id and atomically
// reserves the process id if it does not already exist
//
// Returns true if the process id was sucessfully reserved and a
// cancel func to release the reservation
func (c *Container) ReserveProcess(id string) (bool, func()) {
c.mu.Lock()
defer c.mu.Unlock()
_, ok := c.processes[id]
return ok
if _, ok := c.processes[id]; ok {
return false, nil
}
if _, ok := c.reservedProcess[id]; ok {
return false, nil
}
c.reservedProcess[id] = struct{}{}
return true, func() {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.reservedProcess, id)
}
}
// ProcessAdd adds a new process to the container
func (c *Container) ProcessAdd(process rproc.Process) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.reservedProcess, process.ID())
c.processes[process.ID()] = process
}

View File

@ -326,11 +326,13 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
if err != nil {
return nil, err
}
if container.ProcessExists(r.ExecID) {
ok, cancel := container.ReserveProcess(r.ExecID)
if !ok {
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
}
process, err := container.Exec(ctx, r)
if err != nil {
cancel()
return nil, errdefs.ToGRPC(err)
}

View File

@ -375,11 +375,13 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
if err != nil {
return nil, err
}
if container.ProcessExists(r.ExecID) {
ok, cancel := container.ReserveProcess(r.ExecID)
if !ok {
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
}
process, err := container.Exec(ctx, r)
if err != nil {
cancel()
return nil, errdefs.ToGRPC(err)
}