Reserve exec id to prevent race
ref #2820 Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
parent
574bde08ba
commit
1a8df3f237
@ -125,10 +125,11 @@ func NewContainer(ctx context.Context, platform rproc.Platform, r *task.CreateTa
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
container := &Container{
|
||||
ID: r.ID,
|
||||
Bundle: r.Bundle,
|
||||
process: process,
|
||||
processes: make(map[string]rproc.Process),
|
||||
ID: r.ID,
|
||||
Bundle: r.Bundle,
|
||||
process: process,
|
||||
processes: make(map[string]rproc.Process),
|
||||
reservedProcess: make(map[string]struct{}),
|
||||
}
|
||||
pid := process.Pid()
|
||||
if pid > 0 {
|
||||
@ -189,9 +190,10 @@ type Container struct {
|
||||
// Bundle path
|
||||
Bundle string
|
||||
|
||||
cgroup cgroups.Cgroup
|
||||
process rproc.Process
|
||||
processes map[string]rproc.Process
|
||||
cgroup cgroups.Cgroup
|
||||
process rproc.Process
|
||||
processes map[string]rproc.Process
|
||||
reservedProcess map[string]struct{}
|
||||
}
|
||||
|
||||
// All processes in the container
|
||||
@ -256,18 +258,35 @@ func (c *Container) Process(id string) (rproc.Process, error) {
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// ProcessExists returns true if the process by id exists
|
||||
func (c *Container) ProcessExists(id string) bool {
|
||||
// ReserveProcess checks for the existence of an id and atomically
|
||||
// reserves the process id if it does not already exist
|
||||
//
|
||||
// Returns true if the process id was sucessfully reserved and a
|
||||
// cancel func to release the reservation
|
||||
func (c *Container) ReserveProcess(id string) (bool, func()) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
_, ok := c.processes[id]
|
||||
return ok
|
||||
|
||||
if _, ok := c.processes[id]; ok {
|
||||
return false, nil
|
||||
}
|
||||
if _, ok := c.reservedProcess[id]; ok {
|
||||
return false, nil
|
||||
}
|
||||
c.reservedProcess[id] = struct{}{}
|
||||
return true, func() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
delete(c.reservedProcess, id)
|
||||
}
|
||||
}
|
||||
|
||||
// ProcessAdd adds a new process to the container
|
||||
func (c *Container) ProcessAdd(process rproc.Process) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
delete(c.reservedProcess, process.ID())
|
||||
c.processes[process.ID()] = process
|
||||
}
|
||||
|
||||
|
@ -326,11 +326,13 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if container.ProcessExists(r.ExecID) {
|
||||
ok, cancel := container.ReserveProcess(r.ExecID)
|
||||
if !ok {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
|
||||
}
|
||||
process, err := container.Exec(ctx, r)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
|
@ -375,11 +375,13 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if container.ProcessExists(r.ExecID) {
|
||||
ok, cancel := container.ReserveProcess(r.ExecID)
|
||||
if !ok {
|
||||
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
|
||||
}
|
||||
process, err := container.Exec(ctx, r)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user