windows: Add servicing code

Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
This commit is contained in:
Kenfe-Mickael Laventure 2017-07-18 11:23:22 +02:00
parent 4bb9ac2828
commit 5c8e0efb63
No known key found for this signature in database
GPG Key ID: 40CF16616B361216
2 changed files with 101 additions and 47 deletions

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/Microsoft/hcsshim"
"github.com/Sirupsen/logrus"
"github.com/boltdb/bolt"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
containerdtypes "github.com/containerd/containerd/api/types"
@ -122,27 +123,7 @@ func (r *windowsRuntime) Create(ctx context.Context, id string, opts runtime.Cre
createOpts.TerminateDuration = defaultTerminateDuration
}
t, err := r.newTask(ctx, r.emitter, namespace, id, spec, opts.IO, createOpts)
if err != nil {
return nil, err
}
r.tasks.Add(ctx, t)
r.emitter.Post(events.WithTopic(ctx, "/tasks/create"), &eventsapi.TaskCreate{
ContainerID: id,
IO: &eventsapi.TaskIO{
Stdin: opts.IO.Stdin,
Stdout: opts.IO.Stdout,
Stderr: opts.IO.Stderr,
Terminal: opts.IO.Terminal,
},
Pid: t.pid,
Rootfs: t.rootfs,
// TODO: what should be in Bundle for windows?
})
return t, nil
return r.newTask(ctx, namespace, id, spec, opts.IO, createOpts)
}
func (r *windowsRuntime) Get(ctx context.Context, id string) (runtime.Task, error) {
@ -161,12 +142,29 @@ func (r *windowsRuntime) Delete(ctx context.Context, t runtime.Task) (*runtime.E
// TODO(mlaventure): stop monitor on this task
state, _ := wt.State(ctx)
var (
err error
needServicing bool
state, _ = wt.State(ctx)
)
switch state.Status {
case runtime.StoppedStatus, runtime.CreatedStatus:
case runtime.StoppedStatus:
// Only try to service a container if it was started and it's not a
// servicing task itself
if wt.servicing == false {
needServicing, err = wt.hcsContainer.HasPendingUpdates()
if err != nil {
needServicing = false
log.G(ctx).WithError(err).
WithFields(logrus.Fields{"id": wt.id, "pid": wt.pid}).
Error("failed to check if container needs servicing")
}
}
fallthrough
case runtime.CreatedStatus:
// if it's stopped or in created state, we need to shutdown the
// container before removing it
if err := wt.stop(ctx); err != nil {
if err = wt.stop(ctx); err != nil {
return nil, err
}
default:
@ -203,11 +201,18 @@ func (r *windowsRuntime) Delete(ctx context.Context, t runtime.Task) (*runtime.E
ExitedAt: rtExit.Timestamp,
})
if needServicing {
ns, _ := namespaces.Namespace(ctx)
serviceCtx := log.WithLogger(context.Background(), log.GetLogger(ctx))
serviceCtx = namespaces.WithNamespace(serviceCtx, ns)
r.serviceTask(serviceCtx, ns, wt.id+"_servicing", wt.spec)
}
// We were never started, return failure
return rtExit, nil
}
func (r *windowsRuntime) newTask(ctx context.Context, emitter *events.Emitter, namespace, id string, spec *specs.Spec, io runtime.IO, createOpts *hcsshimopts.CreateOptions) (*task, error) {
func (r *windowsRuntime) newTask(ctx context.Context, namespace, id string, spec *specs.Spec, io runtime.IO, createOpts *hcsshimopts.CreateOptions) (*task, error) {
var (
err error
pset *pipeSet
@ -281,29 +286,45 @@ func (r *windowsRuntime) newTask(ctx context.Context, emitter *events.Emitter, n
return nil, errors.Wrap(err, "hcsshim failed to spawn task")
}
t := &task{
id: id,
namespace: namespace,
pid: pid,
io: pset,
status: runtime.CreatedStatus,
spec: spec,
processes: make(map[string]*process),
hyperV: spec.Windows.HyperV != nil,
emitter: r.emitter,
rwLayer: conf.LayerFolderPath,
pidPool: r.pidPool,
hcsContainer: ctr,
terminateDuration: createOpts.TerminateDuration,
}
r.tasks.Add(ctx, t)
var rootfs []*containerdtypes.Mount
for _, l := range append([]string{conf.LayerFolderPath}, spec.Windows.LayerFolders...) {
for _, l := range append([]string{t.rwLayer}, spec.Windows.LayerFolders...) {
rootfs = append(rootfs, &containerdtypes.Mount{
Type: "windows-layer",
Source: l,
})
}
return &task{
id: id,
namespace: namespace,
pid: pid,
io: pset,
status: runtime.CreatedStatus,
initSpec: spec.Process,
processes: make(map[string]*process),
hyperV: spec.Windows.HyperV != nil,
rootfs: rootfs,
emitter: emitter,
pidPool: r.pidPool,
hcsContainer: ctr,
terminateDuration: createOpts.TerminateDuration,
}, nil
r.emitter.Post(events.WithTopic(ctx, "/tasks/create"), &eventsapi.TaskCreate{
ContainerID: id,
IO: &eventsapi.TaskIO{
Stdin: io.Stdin,
Stdout: io.Stdout,
Stderr: io.Stderr,
Terminal: io.Terminal,
},
Pid: t.pid,
Rootfs: rootfs,
// TODO: what should be in Bundle for windows?
})
return t, nil
}
func (r *windowsRuntime) cleanup(ctx context.Context) {
@ -354,3 +375,36 @@ func (r *windowsRuntime) cleanup(ctx context.Context) {
}
}
func (r *windowsRuntime) serviceTask(ctx context.Context, namespace, id string, spec *specs.Spec) {
var (
err error
t *task
io runtime.IO
createOpts = &hcsshimopts.CreateOptions{
TerminateDuration: defaultTerminateDuration,
}
)
t, err = r.newTask(ctx, namespace, id, spec, io, createOpts)
if err != nil {
log.G(ctx).WithError(err).WithField("id", id).
Warn("failed to created servicing task")
return
}
t.servicing = true
err = t.Start(ctx)
switch err {
case nil:
<-t.getProcess(id).exitCh
default:
log.G(ctx).WithError(err).WithField("id", id).
Warn("failed to start servicing task")
}
if _, err = r.Delete(ctx, t); err != nil {
log.G(ctx).WithError(err).WithField("id", id).
Warn("failed to stop servicing task")
}
}

View File

@ -12,7 +12,6 @@ import (
"github.com/Microsoft/hcsshim"
"github.com/Sirupsen/logrus"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
containerdtypes "github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
@ -31,16 +30,17 @@ type task struct {
pid uint32
io *pipeSet
status runtime.Status
initSpec *specs.Process
spec *specs.Spec
processes map[string]*process
hyperV bool
rootfs []*containerdtypes.Mount
emitter *events.Emitter
rwLayer string
pidPool *pidPool
hcsContainer hcsshim.Container
terminateDuration time.Duration
servicing bool
}
func (t *task) ID() string {
@ -107,7 +107,7 @@ func (t *task) Info() runtime.TaskInfo {
}
func (t *task) Start(ctx context.Context) error {
conf := newProcessConfig(t.initSpec, t.io)
conf := newProcessConfig(t.spec.Process, t.io)
if _, err := t.newProcess(ctx, t.id, conf, t.io); err != nil {
return err
}
@ -173,7 +173,7 @@ func (t *task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runt
}
spec := s.(*specs.Process)
if spec.Cwd == "" {
spec.Cwd = t.initSpec.Cwd
spec.Cwd = t.spec.Process.Cwd
}
var pset *pipeSet
@ -431,6 +431,6 @@ func (t *task) cleanup() {
for _, p := range t.processes {
t.removeProcessNL(p.id)
}
removeLayer(context.Background(), t.rootfs[len(t.rootfs)-1].Source)
removeLayer(context.Background(), t.rwLayer)
t.Unlock()
}