Merge pull request #1165 from crosbymichael/delete-hang
Fix process and task io Wait hang when fails to start
This commit is contained in:
commit
052a8d03ed
@ -23,6 +23,7 @@ var (
|
||||
ErrNoRunningTask = errors.New("no running task")
|
||||
ErrDeleteRunningTask = errors.New("cannot delete container with running task")
|
||||
ErrProcessExited = errors.New("process already exited")
|
||||
ErrNoExecID = errors.New("exec id must be provided")
|
||||
)
|
||||
|
||||
type DeleteOpts func(context.Context, *Client, containers.Container) error
|
||||
|
@ -796,3 +796,104 @@ func TestContainerUpdate(t *testing.T) {
|
||||
|
||||
<-statusC
|
||||
}
|
||||
|
||||
func TestContainerNoBinaryExists(t *testing.T) {
|
||||
client, err := newClient(t, address)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
var (
|
||||
ctx, cancel = testContext()
|
||||
id = t.Name()
|
||||
)
|
||||
defer cancel()
|
||||
|
||||
image, err := client.GetImage(ctx, testImage)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
spec, err := GenerateSpec(WithImageConfig(ctx, image), WithProcessArgs("nothing"))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
container, err := client.NewContainer(ctx, id, WithSpec(spec), WithNewRootFS(id, image))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer container.Delete(ctx, WithRootFSDeletion)
|
||||
|
||||
if _, err := container.NewTask(ctx, Stdio); err == nil {
|
||||
t.Error("NewTask should return an error when binary does not exist")
|
||||
}
|
||||
}
|
||||
|
||||
func TestContainerExecNoBinaryExists(t *testing.T) {
|
||||
client, err := newClient(t, address)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
var (
|
||||
ctx, cancel = testContext()
|
||||
id = t.Name()
|
||||
)
|
||||
defer cancel()
|
||||
|
||||
image, err := client.GetImage(ctx, testImage)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
spec, err := GenerateSpec(WithImageConfig(ctx, image), WithProcessArgs("sleep", "100"))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
container, err := client.NewContainer(ctx, id, WithSpec(spec), WithNewRootFS(id, image))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer container.Delete(ctx, WithRootFSDeletion)
|
||||
|
||||
task, err := container.NewTask(ctx, empty())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer task.Delete(ctx)
|
||||
|
||||
finished := make(chan struct{}, 1)
|
||||
go func() {
|
||||
if _, err := task.Wait(ctx); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
close(finished)
|
||||
}()
|
||||
|
||||
// start an exec process without running the original container process info
|
||||
processSpec := spec.Process
|
||||
processSpec.Args = []string{
|
||||
"none",
|
||||
}
|
||||
execID := t.Name() + "_exec"
|
||||
process, err := task.Exec(ctx, execID, processSpec, empty())
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
defer process.Delete(ctx)
|
||||
if err := process.Start(ctx); err == nil {
|
||||
t.Error("Process.Start should fail when process does not exist")
|
||||
}
|
||||
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
<-finished
|
||||
}
|
||||
|
18
io.go
18
io.go
@ -15,7 +15,14 @@ type IO struct {
|
||||
Stdout string
|
||||
Stderr string
|
||||
|
||||
closer io.Closer
|
||||
closer *wgCloser
|
||||
}
|
||||
|
||||
func (i *IO) Wait() {
|
||||
if i.closer == nil {
|
||||
return
|
||||
}
|
||||
i.closer.Wait()
|
||||
}
|
||||
|
||||
func (i *IO) Close() error {
|
||||
@ -129,10 +136,17 @@ type ioSet struct {
|
||||
type wgCloser struct {
|
||||
wg *sync.WaitGroup
|
||||
dir string
|
||||
set []io.Closer
|
||||
}
|
||||
|
||||
func (g *wgCloser) Wait() {
|
||||
g.wg.Wait()
|
||||
}
|
||||
|
||||
func (g *wgCloser) Close() error {
|
||||
g.wg.Wait()
|
||||
for _, f := range g.set {
|
||||
f.Close()
|
||||
}
|
||||
if g.dir != "" {
|
||||
return os.RemoveAll(g.dir)
|
||||
}
|
||||
|
32
io_unix.go
32
io_unix.go
@ -11,21 +11,25 @@ import (
|
||||
"github.com/containerd/fifo"
|
||||
)
|
||||
|
||||
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (closer io.Closer, err error) {
|
||||
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
|
||||
var (
|
||||
f io.ReadWriteCloser
|
||||
set []io.Closer
|
||||
ctx = context.Background()
|
||||
wg = &sync.WaitGroup{}
|
||||
)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
for _, f := range set {
|
||||
f.Close()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func(c io.Closer) {
|
||||
if err != nil {
|
||||
c.Close()
|
||||
}
|
||||
}(f)
|
||||
set = append(set, f)
|
||||
go func(w io.WriteCloser) {
|
||||
io.Copy(w, ioset.in)
|
||||
w.Close()
|
||||
@ -34,11 +38,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (closer io.Closer, err error
|
||||
if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func(c io.Closer) {
|
||||
if err != nil {
|
||||
c.Close()
|
||||
}
|
||||
}(f)
|
||||
set = append(set, f)
|
||||
wg.Add(1)
|
||||
go func(r io.ReadCloser) {
|
||||
io.Copy(ioset.out, r)
|
||||
@ -49,23 +49,19 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (closer io.Closer, err error
|
||||
if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func(c io.Closer) {
|
||||
if err != nil {
|
||||
c.Close()
|
||||
}
|
||||
}(f)
|
||||
set = append(set, f)
|
||||
|
||||
if !tty {
|
||||
wg.Add(1)
|
||||
go func(r io.ReadCloser) {
|
||||
io.Copy(ioset.err, r)
|
||||
r.Close()
|
||||
wg.Done()
|
||||
r.Close()
|
||||
}(f)
|
||||
}
|
||||
|
||||
return &wgCloser{
|
||||
wg: wg,
|
||||
dir: fifos.Dir,
|
||||
set: set,
|
||||
}, nil
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (closer io.Closer, err error) {
|
||||
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
if fifos.In != "" {
|
||||
|
@ -45,6 +45,7 @@ func (p *process) Start(ctx context.Context) error {
|
||||
}
|
||||
response, err := p.task.client.TaskService().Exec(ctx, request)
|
||||
if err != nil {
|
||||
p.io.Close()
|
||||
return err
|
||||
}
|
||||
p.pid = response.Pid
|
||||
@ -110,7 +111,10 @@ func (p *process) Resize(ctx context.Context, w, h uint32) error {
|
||||
}
|
||||
|
||||
func (p *process) Delete(ctx context.Context) (uint32, error) {
|
||||
cerr := p.io.Close()
|
||||
if p.io != nil {
|
||||
p.io.Wait()
|
||||
p.io.Close()
|
||||
}
|
||||
r, err := p.task.client.TaskService().DeleteProcess(ctx, &tasks.DeleteProcessRequest{
|
||||
ContainerID: p.task.id,
|
||||
ExecID: p.id,
|
||||
@ -118,5 +122,5 @@ func (p *process) Delete(ctx context.Context) (uint32, error) {
|
||||
if err != nil {
|
||||
return UnknownExitStatus, err
|
||||
}
|
||||
return r.ExitStatus, cerr
|
||||
return r.ExitStatus, nil
|
||||
}
|
||||
|
13
task.go
13
task.go
@ -93,6 +93,7 @@ func (t *task) Start(ctx context.Context) error {
|
||||
response, err := t.client.TaskService().Create(ctx, t.deferred)
|
||||
t.deferred = nil
|
||||
if err != nil {
|
||||
t.io.closer.Close()
|
||||
return err
|
||||
}
|
||||
t.pid = response.Pid
|
||||
@ -101,6 +102,9 @@ func (t *task) Start(ctx context.Context) error {
|
||||
_, err := t.client.TaskService().Start(ctx, &tasks.StartTaskRequest{
|
||||
ContainerID: t.id,
|
||||
})
|
||||
if err != nil {
|
||||
t.io.closer.Close()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -170,9 +174,9 @@ func (t *task) Wait(ctx context.Context) (uint32, error) {
|
||||
// it returns the exit status of the task and any errors that were encountered
|
||||
// during cleanup
|
||||
func (t *task) Delete(ctx context.Context) (uint32, error) {
|
||||
var cerr error
|
||||
if t.io != nil {
|
||||
cerr = t.io.Close()
|
||||
t.io.Wait()
|
||||
t.io.Close()
|
||||
}
|
||||
r, err := t.client.TaskService().Delete(ctx, &tasks.DeleteTaskRequest{
|
||||
ContainerID: t.id,
|
||||
@ -180,10 +184,13 @@ func (t *task) Delete(ctx context.Context) (uint32, error) {
|
||||
if err != nil {
|
||||
return UnknownExitStatus, err
|
||||
}
|
||||
return r.ExitStatus, cerr
|
||||
return r.ExitStatus, nil
|
||||
}
|
||||
|
||||
func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreate IOCreation) (Process, error) {
|
||||
if id == "" {
|
||||
return nil, ErrNoExecID
|
||||
}
|
||||
i, err := ioCreate()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
Loading…
Reference in New Issue
Block a user