Update windows with exec create/start actions

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2017-07-31 16:49:25 -04:00
parent 83b27db923
commit e827adaf11
2 changed files with 87 additions and 83 deletions

View File

@ -4,12 +4,17 @@ package windows
import ( import (
"context" "context"
"io"
"syscall"
"time" "time"
"github.com/Microsoft/hcsshim" "github.com/Microsoft/hcsshim"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus"
) )
// process implements containerd.Process and containerd.State // process implements containerd.Process and containerd.State
@ -25,6 +30,7 @@ type process struct {
exitCh chan struct{} exitCh chan struct{}
exitCode uint32 exitCode uint32
exitTime time.Time exitTime time.Time
conf *hcsshim.ProcessConfig
} }
func (p *process) ID() string { func (p *process) ID() string {
@ -81,3 +87,83 @@ func (p *process) ExitCode() (uint32, time.Time, error) {
} }
return p.exitCode, p.exitTime, nil return p.exitCode, p.exitTime, nil
} }
func (p *process) Start(ctx context.Context) (err error) {
var hp hcsshim.Process
if hp, err = p.task.hcsContainer.CreateProcess(p.conf); err != nil {
return errors.Wrapf(err, "failed to create process")
}
stdin, stdout, stderr, err := hp.Stdio()
if err != nil {
hp.Kill()
return errors.Wrapf(err, "failed to retrieve init process stdio")
}
ioCopy := func(name string, dst io.WriteCloser, src io.ReadCloser) {
log.G(ctx).WithFields(logrus.Fields{"id": p.id, "pid": p.pid}).
Debugf("%s: copy started", name)
io.Copy(dst, src)
log.G(ctx).WithFields(logrus.Fields{"id": p.id, "pid": p.pid}).
Debugf("%s: copy done", name)
dst.Close()
src.Close()
}
if p.io.stdin != nil {
go ioCopy("stdin", stdin, p.io.stdin)
}
if p.io.stdout != nil {
go ioCopy("stdout", p.io.stdout, stdout)
}
if p.io.stderr != nil {
go ioCopy("stderr", p.io.stderr, stderr)
}
// Wait for the process to exit to get the exit status
go func() {
if err := hp.Wait(); err != nil {
herr, ok := err.(*hcsshim.ProcessError)
if ok && herr.Err != syscall.ERROR_BROKEN_PIPE {
log.G(ctx).
WithError(err).
WithFields(logrus.Fields{"id": p.id, "pid": p.pid}).
Warnf("hcsshim wait failed (process may have been killed)")
}
// Try to get the exit code nonetheless
}
p.exitTime = time.Now()
ec, err := hp.ExitCode()
if err != nil {
log.G(ctx).
WithError(err).
WithFields(logrus.Fields{"id": p.id, "pid": p.pid}).
Warnf("hcsshim could not retrieve exit code")
// Use the unknown exit code
ec = 255
}
p.exitCode = uint32(ec)
p.task.publisher.Publish(ctx,
runtime.TaskExitEventTopic,
&eventsapi.TaskExit{
ContainerID: p.task.id,
ID: p.id,
Pid: p.pid,
ExitStatus: p.exitCode,
ExitedAt: p.exitTime,
})
close(p.exitCh)
// Ensure io's are closed
p.io.Close()
// Cleanup HCS resources
hp.Close()
}()
p.status = runtime.RunningStatus
p.hcs = hp
return nil
}

View File

@ -4,22 +4,18 @@ package windows
import ( import (
"context" "context"
"io"
"sync" "sync"
"syscall"
"time" "time"
"github.com/Microsoft/hcsshim" "github.com/Microsoft/hcsshim"
eventsapi "github.com/containerd/containerd/api/services/events/v1" eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events" "github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/typeurl" "github.com/containerd/containerd/typeurl"
"github.com/gogo/protobuf/types" "github.com/gogo/protobuf/types"
specs "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus"
) )
type task struct { type task struct {
@ -203,7 +199,6 @@ func (t *task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runt
&eventsapi.TaskExecAdded{ &eventsapi.TaskExecAdded{
ContainerID: t.id, ContainerID: t.id,
ExecID: id, ExecID: id,
Pid: p.Pid(),
}) })
return p, nil return p, nil
@ -290,95 +285,18 @@ func (t *task) newProcess(ctx context.Context, id string, conf *hcsshim.ProcessC
}() }()
} }
t.Unlock() t.Unlock()
var p hcsshim.Process
if p, err = t.hcsContainer.CreateProcess(conf); err != nil {
return nil, errors.Wrapf(err, "failed to create process")
}
stdin, stdout, stderr, err := p.Stdio()
if err != nil {
p.Kill()
return nil, errors.Wrapf(err, "failed to retrieve init process stdio")
}
ioCopy := func(name string, dst io.WriteCloser, src io.ReadCloser) {
log.G(ctx).WithFields(logrus.Fields{"id": id, "pid": pid}).
Debugf("%s: copy started", name)
io.Copy(dst, src)
log.G(ctx).WithFields(logrus.Fields{"id": id, "pid": pid}).
Debugf("%s: copy done", name)
dst.Close()
src.Close()
}
if pset.stdin != nil {
go ioCopy("stdin", stdin, pset.stdin)
}
if pset.stdout != nil {
go ioCopy("stdout", pset.stdout, stdout)
}
if pset.stderr != nil {
go ioCopy("stderr", pset.stderr, stderr)
}
t.Lock() t.Lock()
wp := &process{ wp := &process{
id: id, id: id,
pid: pid, pid: pid,
io: pset, io: pset,
status: runtime.RunningStatus, status: runtime.CreatedStatus,
task: t, task: t,
hcs: p,
exitCh: make(chan struct{}), exitCh: make(chan struct{}),
} }
t.processes[id] = wp t.processes[id] = wp
t.Unlock() t.Unlock()
// Wait for the process to exit to get the exit status
go func() {
if err := p.Wait(); err != nil {
herr, ok := err.(*hcsshim.ProcessError)
if ok && herr.Err != syscall.ERROR_BROKEN_PIPE {
log.G(ctx).
WithError(err).
WithFields(logrus.Fields{"id": id, "pid": pid}).
Warnf("hcsshim wait failed (process may have been killed)")
}
// Try to get the exit code nonetheless
}
wp.exitTime = time.Now()
ec, err := p.ExitCode()
if err != nil {
log.G(ctx).
WithError(err).
WithFields(logrus.Fields{"id": id, "pid": pid}).
Warnf("hcsshim could not retrieve exit code")
// Use the unknown exit code
ec = 255
}
wp.exitCode = uint32(ec)
t.publisher.Publish(ctx,
runtime.TaskExitEventTopic,
&eventsapi.TaskExit{
ContainerID: t.id,
ID: id,
Pid: pid,
ExitStatus: wp.exitCode,
ExitedAt: wp.exitTime,
})
close(wp.exitCh)
// Ensure io's are closed
pset.Close()
// Cleanup HCS resources
p.Close()
}()
return wp, nil return wp, nil
} }