To avoid importing all of grpc when consuming events, the types of events have been split in to a separate package. This should allow a reduction in memory usage in cases where a package is consuming events but not using the gprc service directly. Signed-off-by: Stephen J Day <stephen.day@docker.com>
		
			
				
	
	
		
			412 lines
		
	
	
		
			9.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			412 lines
		
	
	
		
			9.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// +build windows
 | 
						|
 | 
						|
package windows
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/Microsoft/hcsshim"
 | 
						|
	eventstypes "github.com/containerd/containerd/api/events"
 | 
						|
	"github.com/containerd/containerd/errdefs"
 | 
						|
	"github.com/containerd/containerd/events"
 | 
						|
	"github.com/containerd/containerd/runtime"
 | 
						|
	"github.com/containerd/containerd/windows/hcsshimtypes"
 | 
						|
	"github.com/containerd/typeurl"
 | 
						|
	"github.com/gogo/protobuf/types"
 | 
						|
	specs "github.com/opencontainers/runtime-spec/specs-go"
 | 
						|
	"github.com/pkg/errors"
 | 
						|
)
 | 
						|
 | 
						|
type task struct {
 | 
						|
	sync.Mutex
 | 
						|
 | 
						|
	id        string
 | 
						|
	namespace string
 | 
						|
	pid       uint32
 | 
						|
	io        *pipeSet
 | 
						|
	status    runtime.Status
 | 
						|
	spec      *specs.Spec
 | 
						|
	processes map[string]*process
 | 
						|
	hyperV    bool
 | 
						|
 | 
						|
	publisher events.Publisher
 | 
						|
	rwLayer   string
 | 
						|
 | 
						|
	pidPool           *pidPool
 | 
						|
	hcsContainer      hcsshim.Container
 | 
						|
	terminateDuration time.Duration
 | 
						|
	servicing         bool
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) ID() string {
 | 
						|
	return t.id
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) State(ctx context.Context) (runtime.State, error) {
 | 
						|
	var (
 | 
						|
		status     runtime.Status
 | 
						|
		exitStatus uint32
 | 
						|
		exitedAt   time.Time
 | 
						|
	)
 | 
						|
 | 
						|
	if p := t.getProcess(t.id); p != nil {
 | 
						|
		status = p.Status()
 | 
						|
		exitStatus = p.exitCode
 | 
						|
		exitedAt = p.exitTime
 | 
						|
	} else {
 | 
						|
		status = t.getStatus()
 | 
						|
	}
 | 
						|
 | 
						|
	return runtime.State{
 | 
						|
		Status:     status,
 | 
						|
		Pid:        t.pid,
 | 
						|
		Stdin:      t.io.src.Stdin,
 | 
						|
		Stdout:     t.io.src.Stdout,
 | 
						|
		Stderr:     t.io.src.Stderr,
 | 
						|
		Terminal:   t.io.src.Terminal,
 | 
						|
		ExitStatus: exitStatus,
 | 
						|
		ExitedAt:   exitedAt,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) Kill(ctx context.Context, signal uint32, all bool) error {
 | 
						|
	p := t.getProcess(t.id)
 | 
						|
	if p == nil {
 | 
						|
		return errors.Wrapf(errdefs.ErrFailedPrecondition, "task is not running")
 | 
						|
	}
 | 
						|
 | 
						|
	if p.Status() == runtime.StoppedStatus {
 | 
						|
		return errors.Wrapf(errdefs.ErrNotFound, "process is stopped")
 | 
						|
	}
 | 
						|
 | 
						|
	return p.Kill(ctx, signal, all)
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
 | 
						|
	p := t.getProcess(t.id)
 | 
						|
	if p == nil {
 | 
						|
		return errors.Wrap(errdefs.ErrFailedPrecondition, "task not started")
 | 
						|
	}
 | 
						|
 | 
						|
	return p.ResizePty(ctx, size)
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) CloseIO(ctx context.Context) error {
 | 
						|
	p := t.getProcess(t.id)
 | 
						|
	if p == nil {
 | 
						|
		return errors.Wrap(errdefs.ErrFailedPrecondition, "task not started")
 | 
						|
	}
 | 
						|
 | 
						|
	return p.hcs.CloseStdin()
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) Info() runtime.TaskInfo {
 | 
						|
	return runtime.TaskInfo{
 | 
						|
		ID:        t.id,
 | 
						|
		Runtime:   pluginID,
 | 
						|
		Namespace: t.namespace,
 | 
						|
		// TODO(mlaventure): what about Spec? I think this could be removed from the info, the id is enough since it matches the one from the container
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) Start(ctx context.Context) error {
 | 
						|
	p := t.getProcess(t.id)
 | 
						|
	if p == nil {
 | 
						|
		panic("init process is missing")
 | 
						|
	}
 | 
						|
 | 
						|
	if p.Status() != runtime.CreatedStatus {
 | 
						|
		return errors.Wrap(errdefs.ErrFailedPrecondition, "process was already started")
 | 
						|
	}
 | 
						|
 | 
						|
	if err := p.Start(ctx); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	t.publisher.Publish(ctx,
 | 
						|
		runtime.TaskStartEventTopic,
 | 
						|
		&eventstypes.TaskStart{
 | 
						|
			ContainerID: t.id,
 | 
						|
			Pid:         t.pid,
 | 
						|
		})
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) Pause(ctx context.Context) error {
 | 
						|
	if t.hyperV {
 | 
						|
		err := t.hcsContainer.Pause()
 | 
						|
		if err == nil {
 | 
						|
			t.Lock()
 | 
						|
			t.status = runtime.PausedStatus
 | 
						|
			t.Unlock()
 | 
						|
 | 
						|
			t.publisher.Publish(ctx,
 | 
						|
				runtime.TaskPausedEventTopic,
 | 
						|
				&eventstypes.TaskPaused{
 | 
						|
					ContainerID: t.id,
 | 
						|
				})
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		return errors.Wrap(err, "hcsshim failed to pause task")
 | 
						|
	}
 | 
						|
 | 
						|
	return errors.Wrap(errdefs.ErrFailedPrecondition, "not an hyperV task")
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) Resume(ctx context.Context) error {
 | 
						|
	if t.hyperV {
 | 
						|
		err := t.hcsContainer.Resume()
 | 
						|
		if err == nil {
 | 
						|
			t.Lock()
 | 
						|
			t.status = runtime.RunningStatus
 | 
						|
			t.Unlock()
 | 
						|
 | 
						|
			t.publisher.Publish(ctx,
 | 
						|
				runtime.TaskResumedEventTopic,
 | 
						|
				&eventstypes.TaskResumed{
 | 
						|
					ContainerID: t.id,
 | 
						|
				})
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		return errors.Wrap(err, "hcsshim failed to resume task")
 | 
						|
	}
 | 
						|
 | 
						|
	return errors.Wrap(errdefs.ErrFailedPrecondition, "not an hyperV task")
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) {
 | 
						|
	if p := t.getProcess(t.id); p == nil {
 | 
						|
		return nil, errors.Wrap(errdefs.ErrFailedPrecondition, "task not started")
 | 
						|
	}
 | 
						|
 | 
						|
	if p := t.getProcess(id); p != nil {
 | 
						|
		return nil, errors.Wrap(errdefs.ErrAlreadyExists, "id already in use")
 | 
						|
	}
 | 
						|
 | 
						|
	s, err := typeurl.UnmarshalAny(opts.Spec)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	spec := s.(*specs.Process)
 | 
						|
	if spec.Cwd == "" {
 | 
						|
		spec.Cwd = t.spec.Process.Cwd
 | 
						|
	}
 | 
						|
 | 
						|
	var pset *pipeSet
 | 
						|
	if pset, err = newPipeSet(ctx, opts.IO); err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	conf := newWindowsProcessConfig(spec, pset)
 | 
						|
	p, err := t.newProcess(ctx, id, conf, pset)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	t.publisher.Publish(ctx,
 | 
						|
		runtime.TaskExecAddedEventTopic,
 | 
						|
		&eventstypes.TaskExecAdded{
 | 
						|
			ContainerID: t.id,
 | 
						|
			ExecID:      id,
 | 
						|
		})
 | 
						|
 | 
						|
	return p, nil
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) Pids(ctx context.Context) ([]runtime.ProcessInfo, error) {
 | 
						|
	t.Lock()
 | 
						|
	defer t.Unlock()
 | 
						|
 | 
						|
	var processList []runtime.ProcessInfo
 | 
						|
	hcsProcessList, err := t.hcsContainer.ProcessList()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	for _, hcsProcess := range hcsProcessList {
 | 
						|
		info := &hcsshimtypes.ProcessDetails{
 | 
						|
			ImageName:                    hcsProcess.ImageName,
 | 
						|
			CreatedAt:                    hcsProcess.CreateTimestamp,
 | 
						|
			KernelTime_100Ns:             hcsProcess.KernelTime100ns,
 | 
						|
			MemoryCommitBytes:            hcsProcess.MemoryCommitBytes,
 | 
						|
			MemoryWorkingSetPrivateBytes: hcsProcess.MemoryWorkingSetPrivateBytes,
 | 
						|
			MemoryWorkingSetSharedBytes:  hcsProcess.MemoryWorkingSetSharedBytes,
 | 
						|
			ProcessID:                    hcsProcess.ProcessId,
 | 
						|
			UserTime_100Ns:               hcsProcess.UserTime100ns,
 | 
						|
		}
 | 
						|
		for _, p := range t.processes {
 | 
						|
			if p.HcsPid() == hcsProcess.ProcessId {
 | 
						|
				info.ExecID = p.ID()
 | 
						|
				break
 | 
						|
			}
 | 
						|
		}
 | 
						|
		processList = append(processList, runtime.ProcessInfo{
 | 
						|
			Pid:  hcsProcess.ProcessId,
 | 
						|
			Info: info,
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
	return processList, nil
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) Checkpoint(_ context.Context, _ string, _ *types.Any) error {
 | 
						|
	return errors.Wrap(errdefs.ErrUnavailable, "not supported")
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) DeleteProcess(ctx context.Context, id string) (*runtime.Exit, error) {
 | 
						|
	if id == t.id {
 | 
						|
		return nil, errors.Wrapf(errdefs.ErrInvalidArgument,
 | 
						|
			"cannot delete init process")
 | 
						|
	}
 | 
						|
	if p := t.getProcess(id); p != nil {
 | 
						|
		ec, ea, err := p.ExitCode()
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
 | 
						|
		// If we never started the process close the pipes
 | 
						|
		if p.Status() == runtime.CreatedStatus {
 | 
						|
			p.io.Close()
 | 
						|
			ea = time.Now()
 | 
						|
		}
 | 
						|
 | 
						|
		t.removeProcess(id)
 | 
						|
		return &runtime.Exit{
 | 
						|
			Pid:       p.pid,
 | 
						|
			Status:    ec,
 | 
						|
			Timestamp: ea,
 | 
						|
		}, nil
 | 
						|
	}
 | 
						|
	return nil, errors.Wrapf(errdefs.ErrNotFound, "no such process %s", id)
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) Update(ctx context.Context, resources *types.Any) error {
 | 
						|
	return errors.Wrap(errdefs.ErrUnavailable, "not supported")
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) Process(ctx context.Context, id string) (p runtime.Process, err error) {
 | 
						|
	p = t.getProcess(id)
 | 
						|
	if p == nil {
 | 
						|
		err = errors.Wrapf(errdefs.ErrNotFound, "no such process %d", id)
 | 
						|
	}
 | 
						|
 | 
						|
	return p, err
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) Metrics(ctx context.Context) (interface{}, error) {
 | 
						|
	return nil, errors.Wrap(errdefs.ErrUnavailable, "not supported")
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) Wait(ctx context.Context) (*runtime.Exit, error) {
 | 
						|
	p := t.getProcess(t.id)
 | 
						|
	if p == nil {
 | 
						|
		return nil, errors.Wrapf(errdefs.ErrNotFound, "no such process %d", t.id)
 | 
						|
	}
 | 
						|
	return p.Wait(ctx)
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) newProcess(ctx context.Context, id string, conf *hcsshim.ProcessConfig, pset *pipeSet) (*process, error) {
 | 
						|
	var (
 | 
						|
		err error
 | 
						|
		pid uint32
 | 
						|
	)
 | 
						|
 | 
						|
	// If we fail, close the io right now
 | 
						|
	defer func() {
 | 
						|
		if err != nil {
 | 
						|
			pset.Close()
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	t.Lock()
 | 
						|
	if len(t.processes) == 0 {
 | 
						|
		pid = t.pid
 | 
						|
	} else {
 | 
						|
		if pid, err = t.pidPool.Get(); err != nil {
 | 
						|
			t.Unlock()
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		defer func() {
 | 
						|
			if err != nil {
 | 
						|
				t.pidPool.Put(pid)
 | 
						|
			}
 | 
						|
		}()
 | 
						|
	}
 | 
						|
	wp := &process{
 | 
						|
		id:     id,
 | 
						|
		pid:    pid,
 | 
						|
		io:     pset,
 | 
						|
		task:   t,
 | 
						|
		exitCh: make(chan struct{}),
 | 
						|
		conf:   conf,
 | 
						|
	}
 | 
						|
	t.processes[id] = wp
 | 
						|
	t.Unlock()
 | 
						|
 | 
						|
	return wp, nil
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) getProcess(id string) *process {
 | 
						|
	t.Lock()
 | 
						|
	p := t.processes[id]
 | 
						|
	t.Unlock()
 | 
						|
 | 
						|
	return p
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) removeProcessNL(id string) {
 | 
						|
	if p, ok := t.processes[id]; ok {
 | 
						|
		if p.io != nil {
 | 
						|
			p.io.Close()
 | 
						|
		}
 | 
						|
		t.pidPool.Put(p.pid)
 | 
						|
		delete(t.processes, id)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) removeProcess(id string) {
 | 
						|
	t.Lock()
 | 
						|
	t.removeProcessNL(id)
 | 
						|
	t.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) getStatus() runtime.Status {
 | 
						|
	t.Lock()
 | 
						|
	status := t.status
 | 
						|
	t.Unlock()
 | 
						|
 | 
						|
	return status
 | 
						|
}
 | 
						|
 | 
						|
// stop tries to shutdown the task.
 | 
						|
// It will do so by first calling Shutdown on the hcsshim.Container and if
 | 
						|
// that fails, by resorting to caling Terminate
 | 
						|
func (t *task) stop(ctx context.Context) error {
 | 
						|
	if err := t.hcsStop(ctx, t.hcsContainer.Shutdown); err != nil {
 | 
						|
		return t.hcsStop(ctx, t.hcsContainer.Terminate)
 | 
						|
	}
 | 
						|
	t.hcsContainer.Close()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) hcsStop(ctx context.Context, stop func() error) error {
 | 
						|
	err := stop()
 | 
						|
	switch {
 | 
						|
	case hcsshim.IsPending(err):
 | 
						|
		err = t.hcsContainer.WaitTimeout(t.terminateDuration)
 | 
						|
	case hcsshim.IsAlreadyStopped(err):
 | 
						|
		err = nil
 | 
						|
	}
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (t *task) cleanup() {
 | 
						|
	t.Lock()
 | 
						|
	for _, p := range t.processes {
 | 
						|
		t.removeProcessNL(p.id)
 | 
						|
	}
 | 
						|
	removeLayer(context.Background(), t.rwLayer)
 | 
						|
	t.Unlock()
 | 
						|
}
 |