Close io when Start fails
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
parent
4e8943f7bb
commit
96b041e1f9
@ -796,3 +796,104 @@ func TestContainerUpdate(t *testing.T) {
|
|||||||
|
|
||||||
<-statusC
|
<-statusC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestContainerNoBinaryExists(t *testing.T) {
|
||||||
|
client, err := newClient(t, address)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
var (
|
||||||
|
ctx, cancel = testContext()
|
||||||
|
id = t.Name()
|
||||||
|
)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
image, err := client.GetImage(ctx, testImage)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
spec, err := GenerateSpec(WithImageConfig(ctx, image), WithProcessArgs("nothing"))
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
container, err := client.NewContainer(ctx, id, WithSpec(spec), WithNewRootFS(id, image))
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer container.Delete(ctx, WithRootFSDeletion)
|
||||||
|
|
||||||
|
if _, err := container.NewTask(ctx, Stdio); err == nil {
|
||||||
|
t.Error("NewTask should return an error when binary does not exist")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestContainerExecNoBinaryExists(t *testing.T) {
|
||||||
|
client, err := newClient(t, address)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
var (
|
||||||
|
ctx, cancel = testContext()
|
||||||
|
id = t.Name()
|
||||||
|
)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
image, err := client.GetImage(ctx, testImage)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
spec, err := GenerateSpec(WithImageConfig(ctx, image), WithProcessArgs("sleep", "100"))
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
container, err := client.NewContainer(ctx, id, WithSpec(spec), WithNewRootFS(id, image))
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer container.Delete(ctx, WithRootFSDeletion)
|
||||||
|
|
||||||
|
task, err := container.NewTask(ctx, empty())
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer task.Delete(ctx)
|
||||||
|
|
||||||
|
finished := make(chan struct{}, 1)
|
||||||
|
go func() {
|
||||||
|
if _, err := task.Wait(ctx); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
close(finished)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// start an exec process without running the original container process info
|
||||||
|
processSpec := spec.Process
|
||||||
|
processSpec.Args = []string{
|
||||||
|
"none",
|
||||||
|
}
|
||||||
|
execID := t.Name() + "_exec"
|
||||||
|
process, err := task.Exec(ctx, execID, processSpec, empty())
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer process.Delete(ctx)
|
||||||
|
if err := process.Start(ctx); err == nil {
|
||||||
|
t.Error("Process.Start should fail when process does not exist")
|
||||||
|
}
|
||||||
|
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
<-finished
|
||||||
|
}
|
||||||
|
18
io.go
18
io.go
@ -15,7 +15,14 @@ type IO struct {
|
|||||||
Stdout string
|
Stdout string
|
||||||
Stderr string
|
Stderr string
|
||||||
|
|
||||||
closer io.Closer
|
closer *wgCloser
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *IO) Wait() {
|
||||||
|
if i.closer == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
i.closer.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *IO) Close() error {
|
func (i *IO) Close() error {
|
||||||
@ -129,10 +136,17 @@ type ioSet struct {
|
|||||||
type wgCloser struct {
|
type wgCloser struct {
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
dir string
|
dir string
|
||||||
|
set []io.Closer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *wgCloser) Wait() {
|
||||||
|
g.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *wgCloser) Close() error {
|
func (g *wgCloser) Close() error {
|
||||||
g.wg.Wait()
|
for _, f := range g.set {
|
||||||
|
f.Close()
|
||||||
|
}
|
||||||
if g.dir != "" {
|
if g.dir != "" {
|
||||||
return os.RemoveAll(g.dir)
|
return os.RemoveAll(g.dir)
|
||||||
}
|
}
|
||||||
|
32
io_unix.go
32
io_unix.go
@ -11,21 +11,25 @@ import (
|
|||||||
"github.com/containerd/fifo"
|
"github.com/containerd/fifo"
|
||||||
)
|
)
|
||||||
|
|
||||||
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (closer io.Closer, err error) {
|
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
|
||||||
var (
|
var (
|
||||||
f io.ReadWriteCloser
|
f io.ReadWriteCloser
|
||||||
|
set []io.Closer
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
wg = &sync.WaitGroup{}
|
wg = &sync.WaitGroup{}
|
||||||
)
|
)
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
for _, f := range set {
|
||||||
|
f.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer func(c io.Closer) {
|
set = append(set, f)
|
||||||
if err != nil {
|
|
||||||
c.Close()
|
|
||||||
}
|
|
||||||
}(f)
|
|
||||||
go func(w io.WriteCloser) {
|
go func(w io.WriteCloser) {
|
||||||
io.Copy(w, ioset.in)
|
io.Copy(w, ioset.in)
|
||||||
w.Close()
|
w.Close()
|
||||||
@ -34,11 +38,7 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (closer io.Closer, err error
|
|||||||
if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer func(c io.Closer) {
|
set = append(set, f)
|
||||||
if err != nil {
|
|
||||||
c.Close()
|
|
||||||
}
|
|
||||||
}(f)
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(r io.ReadCloser) {
|
go func(r io.ReadCloser) {
|
||||||
io.Copy(ioset.out, r)
|
io.Copy(ioset.out, r)
|
||||||
@ -49,23 +49,19 @@ func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (closer io.Closer, err error
|
|||||||
if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer func(c io.Closer) {
|
set = append(set, f)
|
||||||
if err != nil {
|
|
||||||
c.Close()
|
|
||||||
}
|
|
||||||
}(f)
|
|
||||||
|
|
||||||
if !tty {
|
if !tty {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(r io.ReadCloser) {
|
go func(r io.ReadCloser) {
|
||||||
io.Copy(ioset.err, r)
|
io.Copy(ioset.err, r)
|
||||||
r.Close()
|
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
r.Close()
|
||||||
}(f)
|
}(f)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &wgCloser{
|
return &wgCloser{
|
||||||
wg: wg,
|
wg: wg,
|
||||||
dir: fifos.Dir,
|
dir: fifos.Dir,
|
||||||
|
set: set,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,7 @@ import (
|
|||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (closer io.Closer, err error) {
|
func copyIO(fifos *FIFOSet, ioset *ioSet, tty bool) (_ *wgCloser, err error) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
if fifos.In != "" {
|
if fifos.In != "" {
|
||||||
|
@ -45,6 +45,7 @@ func (p *process) Start(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
response, err := p.task.client.TaskService().Exec(ctx, request)
|
response, err := p.task.client.TaskService().Exec(ctx, request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
p.io.Close()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
p.pid = response.Pid
|
p.pid = response.Pid
|
||||||
@ -114,7 +115,10 @@ func (p *process) Resize(ctx context.Context, w, h uint32) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *process) Delete(ctx context.Context) (uint32, error) {
|
func (p *process) Delete(ctx context.Context) (uint32, error) {
|
||||||
cerr := p.io.Close()
|
if p.io != nil {
|
||||||
|
p.io.Wait()
|
||||||
|
p.io.Close()
|
||||||
|
}
|
||||||
r, err := p.task.client.TaskService().DeleteProcess(ctx, &tasks.DeleteProcessRequest{
|
r, err := p.task.client.TaskService().DeleteProcess(ctx, &tasks.DeleteProcessRequest{
|
||||||
ContainerID: p.task.id,
|
ContainerID: p.task.id,
|
||||||
ExecID: p.id,
|
ExecID: p.id,
|
||||||
@ -122,5 +126,5 @@ func (p *process) Delete(ctx context.Context) (uint32, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return UnknownExitStatus, err
|
return UnknownExitStatus, err
|
||||||
}
|
}
|
||||||
return r.ExitStatus, cerr
|
return r.ExitStatus, nil
|
||||||
}
|
}
|
||||||
|
10
task.go
10
task.go
@ -93,6 +93,7 @@ func (t *task) Start(ctx context.Context) error {
|
|||||||
response, err := t.client.TaskService().Create(ctx, t.deferred)
|
response, err := t.client.TaskService().Create(ctx, t.deferred)
|
||||||
t.deferred = nil
|
t.deferred = nil
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
t.io.closer.Close()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
t.pid = response.Pid
|
t.pid = response.Pid
|
||||||
@ -101,6 +102,9 @@ func (t *task) Start(ctx context.Context) error {
|
|||||||
_, err := t.client.TaskService().Start(ctx, &tasks.StartTaskRequest{
|
_, err := t.client.TaskService().Start(ctx, &tasks.StartTaskRequest{
|
||||||
ContainerID: t.id,
|
ContainerID: t.id,
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.io.closer.Close()
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -173,9 +177,9 @@ func (t *task) Wait(ctx context.Context) (uint32, error) {
|
|||||||
// it returns the exit status of the task and any errors that were encountered
|
// it returns the exit status of the task and any errors that were encountered
|
||||||
// during cleanup
|
// during cleanup
|
||||||
func (t *task) Delete(ctx context.Context) (uint32, error) {
|
func (t *task) Delete(ctx context.Context) (uint32, error) {
|
||||||
var cerr error
|
|
||||||
if t.io != nil {
|
if t.io != nil {
|
||||||
cerr = t.io.Close()
|
t.io.Wait()
|
||||||
|
t.io.Close()
|
||||||
}
|
}
|
||||||
r, err := t.client.TaskService().Delete(ctx, &tasks.DeleteTaskRequest{
|
r, err := t.client.TaskService().Delete(ctx, &tasks.DeleteTaskRequest{
|
||||||
ContainerID: t.id,
|
ContainerID: t.id,
|
||||||
@ -183,7 +187,7 @@ func (t *task) Delete(ctx context.Context) (uint32, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return UnknownExitStatus, err
|
return UnknownExitStatus, err
|
||||||
}
|
}
|
||||||
return r.ExitStatus, cerr
|
return r.ExitStatus, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreate IOCreation) (Process, error) {
|
func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreate IOCreation) (Process, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user