containerd/windows/task.go
Stephen J Day 09b5ca1072
api/events: split event types from events service
To avoid importing all of grpc when consuming events, the types of
events have been split in to a separate package. This should allow a
reduction in memory usage in cases where a package is consuming events
but not using the gprc service directly.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-11-16 15:20:46 -08:00

412 lines
9.0 KiB
Go

// +build windows
package windows
import (
"context"
"sync"
"time"
"github.com/Microsoft/hcsshim"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/windows/hcsshimtypes"
"github.com/containerd/typeurl"
"github.com/gogo/protobuf/types"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)
type task struct {
sync.Mutex
id string
namespace string
pid uint32
io *pipeSet
status runtime.Status
spec *specs.Spec
processes map[string]*process
hyperV bool
publisher events.Publisher
rwLayer string
pidPool *pidPool
hcsContainer hcsshim.Container
terminateDuration time.Duration
servicing bool
}
func (t *task) ID() string {
return t.id
}
func (t *task) State(ctx context.Context) (runtime.State, error) {
var (
status runtime.Status
exitStatus uint32
exitedAt time.Time
)
if p := t.getProcess(t.id); p != nil {
status = p.Status()
exitStatus = p.exitCode
exitedAt = p.exitTime
} else {
status = t.getStatus()
}
return runtime.State{
Status: status,
Pid: t.pid,
Stdin: t.io.src.Stdin,
Stdout: t.io.src.Stdout,
Stderr: t.io.src.Stderr,
Terminal: t.io.src.Terminal,
ExitStatus: exitStatus,
ExitedAt: exitedAt,
}, nil
}
func (t *task) Kill(ctx context.Context, signal uint32, all bool) error {
p := t.getProcess(t.id)
if p == nil {
return errors.Wrapf(errdefs.ErrFailedPrecondition, "task is not running")
}
if p.Status() == runtime.StoppedStatus {
return errors.Wrapf(errdefs.ErrNotFound, "process is stopped")
}
return p.Kill(ctx, signal, all)
}
func (t *task) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
p := t.getProcess(t.id)
if p == nil {
return errors.Wrap(errdefs.ErrFailedPrecondition, "task not started")
}
return p.ResizePty(ctx, size)
}
func (t *task) CloseIO(ctx context.Context) error {
p := t.getProcess(t.id)
if p == nil {
return errors.Wrap(errdefs.ErrFailedPrecondition, "task not started")
}
return p.hcs.CloseStdin()
}
func (t *task) Info() runtime.TaskInfo {
return runtime.TaskInfo{
ID: t.id,
Runtime: pluginID,
Namespace: t.namespace,
// TODO(mlaventure): what about Spec? I think this could be removed from the info, the id is enough since it matches the one from the container
}
}
func (t *task) Start(ctx context.Context) error {
p := t.getProcess(t.id)
if p == nil {
panic("init process is missing")
}
if p.Status() != runtime.CreatedStatus {
return errors.Wrap(errdefs.ErrFailedPrecondition, "process was already started")
}
if err := p.Start(ctx); err != nil {
return err
}
t.publisher.Publish(ctx,
runtime.TaskStartEventTopic,
&eventstypes.TaskStart{
ContainerID: t.id,
Pid: t.pid,
})
return nil
}
func (t *task) Pause(ctx context.Context) error {
if t.hyperV {
err := t.hcsContainer.Pause()
if err == nil {
t.Lock()
t.status = runtime.PausedStatus
t.Unlock()
t.publisher.Publish(ctx,
runtime.TaskPausedEventTopic,
&eventstypes.TaskPaused{
ContainerID: t.id,
})
return nil
}
return errors.Wrap(err, "hcsshim failed to pause task")
}
return errors.Wrap(errdefs.ErrFailedPrecondition, "not an hyperV task")
}
func (t *task) Resume(ctx context.Context) error {
if t.hyperV {
err := t.hcsContainer.Resume()
if err == nil {
t.Lock()
t.status = runtime.RunningStatus
t.Unlock()
t.publisher.Publish(ctx,
runtime.TaskResumedEventTopic,
&eventstypes.TaskResumed{
ContainerID: t.id,
})
return nil
}
return errors.Wrap(err, "hcsshim failed to resume task")
}
return errors.Wrap(errdefs.ErrFailedPrecondition, "not an hyperV task")
}
func (t *task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) {
if p := t.getProcess(t.id); p == nil {
return nil, errors.Wrap(errdefs.ErrFailedPrecondition, "task not started")
}
if p := t.getProcess(id); p != nil {
return nil, errors.Wrap(errdefs.ErrAlreadyExists, "id already in use")
}
s, err := typeurl.UnmarshalAny(opts.Spec)
if err != nil {
return nil, err
}
spec := s.(*specs.Process)
if spec.Cwd == "" {
spec.Cwd = t.spec.Process.Cwd
}
var pset *pipeSet
if pset, err = newPipeSet(ctx, opts.IO); err != nil {
return nil, err
}
conf := newWindowsProcessConfig(spec, pset)
p, err := t.newProcess(ctx, id, conf, pset)
if err != nil {
return nil, err
}
t.publisher.Publish(ctx,
runtime.TaskExecAddedEventTopic,
&eventstypes.TaskExecAdded{
ContainerID: t.id,
ExecID: id,
})
return p, nil
}
func (t *task) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) {
t.Lock()
defer t.Unlock()
var processList []runtime.ProcessInfo
hcsProcessList, err := t.hcsContainer.ProcessList()
if err != nil {
return nil, err
}
for _, hcsProcess := range hcsProcessList {
info := &hcsshimtypes.ProcessDetails{
ImageName: hcsProcess.ImageName,
CreatedAt: hcsProcess.CreateTimestamp,
KernelTime_100Ns: hcsProcess.KernelTime100ns,
MemoryCommitBytes: hcsProcess.MemoryCommitBytes,
MemoryWorkingSetPrivateBytes: hcsProcess.MemoryWorkingSetPrivateBytes,
MemoryWorkingSetSharedBytes: hcsProcess.MemoryWorkingSetSharedBytes,
ProcessID: hcsProcess.ProcessId,
UserTime_100Ns: hcsProcess.UserTime100ns,
}
for _, p := range t.processes {
if p.HcsPid() == hcsProcess.ProcessId {
info.ExecID = p.ID()
break
}
}
processList = append(processList, runtime.ProcessInfo{
Pid: hcsProcess.ProcessId,
Info: info,
})
}
return processList, nil
}
func (t *task) Checkpoint(_ context.Context, _ string, _ *types.Any) error {
return errors.Wrap(errdefs.ErrUnavailable, "not supported")
}
func (t *task) DeleteProcess(ctx context.Context, id string) (*runtime.Exit, error) {
if id == t.id {
return nil, errors.Wrapf(errdefs.ErrInvalidArgument,
"cannot delete init process")
}
if p := t.getProcess(id); p != nil {
ec, ea, err := p.ExitCode()
if err != nil {
return nil, err
}
// If we never started the process close the pipes
if p.Status() == runtime.CreatedStatus {
p.io.Close()
ea = time.Now()
}
t.removeProcess(id)
return &runtime.Exit{
Pid: p.pid,
Status: ec,
Timestamp: ea,
}, nil
}
return nil, errors.Wrapf(errdefs.ErrNotFound, "no such process %s", id)
}
func (t *task) Update(ctx context.Context, resources *types.Any) error {
return errors.Wrap(errdefs.ErrUnavailable, "not supported")
}
func (t *task) Process(ctx context.Context, id string) (p runtime.Process, err error) {
p = t.getProcess(id)
if p == nil {
err = errors.Wrapf(errdefs.ErrNotFound, "no such process %d", id)
}
return p, err
}
func (t *task) Metrics(ctx context.Context) (interface{}, error) {
return nil, errors.Wrap(errdefs.ErrUnavailable, "not supported")
}
func (t *task) Wait(ctx context.Context) (*runtime.Exit, error) {
p := t.getProcess(t.id)
if p == nil {
return nil, errors.Wrapf(errdefs.ErrNotFound, "no such process %d", t.id)
}
return p.Wait(ctx)
}
func (t *task) newProcess(ctx context.Context, id string, conf *hcsshim.ProcessConfig, pset *pipeSet) (*process, error) {
var (
err error
pid uint32
)
// If we fail, close the io right now
defer func() {
if err != nil {
pset.Close()
}
}()
t.Lock()
if len(t.processes) == 0 {
pid = t.pid
} else {
if pid, err = t.pidPool.Get(); err != nil {
t.Unlock()
return nil, err
}
defer func() {
if err != nil {
t.pidPool.Put(pid)
}
}()
}
wp := &process{
id: id,
pid: pid,
io: pset,
task: t,
exitCh: make(chan struct{}),
conf: conf,
}
t.processes[id] = wp
t.Unlock()
return wp, nil
}
func (t *task) getProcess(id string) *process {
t.Lock()
p := t.processes[id]
t.Unlock()
return p
}
func (t *task) removeProcessNL(id string) {
if p, ok := t.processes[id]; ok {
if p.io != nil {
p.io.Close()
}
t.pidPool.Put(p.pid)
delete(t.processes, id)
}
}
func (t *task) removeProcess(id string) {
t.Lock()
t.removeProcessNL(id)
t.Unlock()
}
func (t *task) getStatus() runtime.Status {
t.Lock()
status := t.status
t.Unlock()
return status
}
// stop tries to shutdown the task.
// It will do so by first calling Shutdown on the hcsshim.Container and if
// that fails, by resorting to caling Terminate
func (t *task) stop(ctx context.Context) error {
if err := t.hcsStop(ctx, t.hcsContainer.Shutdown); err != nil {
return t.hcsStop(ctx, t.hcsContainer.Terminate)
}
t.hcsContainer.Close()
return nil
}
func (t *task) hcsStop(ctx context.Context, stop func() error) error {
err := stop()
switch {
case hcsshim.IsPending(err):
err = t.hcsContainer.WaitTimeout(t.terminateDuration)
case hcsshim.IsAlreadyStopped(err):
err = nil
}
return err
}
func (t *task) cleanup() {
t.Lock()
for _, p := range t.processes {
t.removeProcessNL(p.id)
}
removeLayer(context.Background(), t.rwLayer)
t.Unlock()
}