diff --git a/runtime/task.go b/runtime/task.go index 4b3e47b99..8d3c5b5c5 100644 --- a/runtime/task.go +++ b/runtime/task.go @@ -42,7 +42,7 @@ type Task interface { Pids(context.Context) ([]uint32, error) // Checkpoint checkpoints a container to an image with live system data Checkpoint(context.Context, string, *types.Any) error - // DeleteProcess deletes a specific exec process via the pid + // DeleteProcess deletes a specific exec process via its id DeleteProcess(context.Context, string) (*Exit, error) // Update sets the provided resources to a running task Update(context.Context, *types.Any) error diff --git a/windows/container.go b/windows/container.go deleted file mode 100644 index 281184ec2..000000000 --- a/windows/container.go +++ /dev/null @@ -1,239 +0,0 @@ -// +build windows - -package windows - -import ( - "context" - "encoding/json" - "fmt" - "sync" - - "github.com/containerd/containerd/log" - "github.com/containerd/containerd/runtime" - "github.com/containerd/containerd/windows/hcs" - "github.com/containerd/containerd/windows/hcsshimopts" - "github.com/gogo/protobuf/types" - specs "github.com/opencontainers/runtime-spec/specs-go" - "github.com/pkg/errors" - winsys "golang.org/x/sys/windows" -) - -var ErrLoadedContainer = errors.New("loaded container can only be terminated") - -func loadContainers(ctx context.Context, h *hcs.HCS) ([]*container, error) { - hCtr, err := h.LoadContainers(ctx) - if err != nil { - return nil, err - } - - containers := make([]*container, 0) - for _, c := range hCtr { - containers = append(containers, &container{ - ctr: c, - status: runtime.RunningStatus, - }) - } - - return containers, nil -} - -func (r *Runtime) newContainer(ctx context.Context, id string, spec *specs.Spec, createOpts *hcsshimopts.CreateOptions, io runtime.IO) (*container, error) { - cio, err := hcs.NewIO(io.Stdin, io.Stdout, io.Stderr, io.Terminal) - if err != nil { - return nil, err - } - - ctr, err := r.hcs.CreateContainer(ctx, id, spec, createOpts.TerminateDuration, cio) - if err != nil { - cio.Close() - return nil, err - } - //sendEvent(id, events.RuntimeEvent_CREATE, hcsCtr.Pid(), 0, time.Time{}) - - return &container{ - ctr: ctr, - status: runtime.CreatedStatus, - }, nil -} - -type container struct { - sync.Mutex - - ctr *hcs.Container - status runtime.Status -} - -func (c *container) ID() string { - return c.ctr.ID() -} - -func (c *container) Info() runtime.TaskInfo { - return runtime.TaskInfo{ - ID: c.ctr.ID(), - Runtime: runtimeName, - } -} - -func (c *container) Start(ctx context.Context) error { - if c.ctr.Pid() == 0 { - return ErrLoadedContainer - } - - err := c.ctr.Start(ctx) - if err != nil { - return err - } - - c.setStatus(runtime.RunningStatus) - // c.sendEvent(c.ctr.ID(), events.RuntimeEvent_START, c.ctr.Pid(), 0, time.Time{}) - - // Wait for our process to terminate - go func() { - _, err := c.ctr.ExitCode() - if err != nil { - log.G(ctx).Debug(err) - } - c.setStatus(runtime.StoppedStatus) - // c.sendEvent(c.ctr.ID(), events.RuntimeEvent_EXIT, c.ctr.Pid(), ec, c.ctr.Processes()[0].ExitedAt()) - }() - - return nil -} - -func (c *container) Pause(ctx context.Context) error { - if c.ctr.IsHyperV() { - return c.ctr.Pause() - } - return errors.New("Windows non-HyperV containers do not support pause") -} - -func (c *container) Resume(ctx context.Context) error { - if c.ctr.IsHyperV() == false { - return errors.New("Windows non-HyperV containers do not support resume") - } - return c.ctr.Resume() -} - -func (c *container) State(ctx context.Context) (runtime.State, error) { - return runtime.State{ - Pid: c.Pid(), - Status: c.Status(), - }, nil -} - -func (c *container) Kill(ctx context.Context, signal uint32, all bool) error { - if winsys.Signal(signal) == winsys.SIGKILL { - return c.ctr.Kill(ctx) - } - return c.ctr.Stop(ctx) -} - -func (c *container) Process(ctx context.Context, id string) (runtime.Process, error) { - for _, p := range c.ctr.Processes() { - if p.ID() == id { - return &process{p}, nil - } - } - return nil, errors.Errorf("process %s not found", id) -} - -func (c *container) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) { - if c.ctr.Pid() == 0 { - return nil, ErrLoadedContainer - } - - pio, err := hcs.NewIO(opts.IO.Stdin, opts.IO.Stdout, opts.IO.Stderr, opts.IO.Terminal) - if err != nil { - return nil, err - } - - var procSpec specs.Process - if err := json.Unmarshal(opts.Spec.Value, &procSpec); err != nil { - return nil, errors.Wrap(err, "failed to unmarshal oci spec") - } - - p, err := c.ctr.AddProcess(ctx, id, &procSpec, pio) - if err != nil { - return nil, err - } - - go func() { - _, err := p.ExitCode() - if err != nil { - log.G(ctx).Debug(err) - } - //c.sendEvent(c.ctr.ID(), events.RuntimeEvent_EXEC_ADDED, p.Pid(), ec, p.ExitedAt()) - }() - - return &process{p}, nil -} - -func (c *container) CloseIO(ctx context.Context) error { - return c.ctr.CloseIO(ctx) -} - -func (c *container) ResizePty(ctx context.Context, size runtime.ConsoleSize) error { - return c.ctr.ResizePty(ctx, size) -} - -func (c *container) Status() runtime.Status { - return c.getStatus() -} - -func (c *container) Pid() uint32 { - return c.ctr.Pid() -} - -func (c *container) Pids(ctx context.Context) ([]uint32, error) { - pl, err := c.ctr.ProcessList() - if err != nil { - return nil, err - } - pids := make([]uint32, 0, len(pl)) - for _, p := range pl { - pids = append(pids, p.ProcessId) - } - return pids, nil -} - -func (c *container) Checkpoint(ctx context.Context, _ string, _ *types.Any) error { - return fmt.Errorf("Windows containers do not support checkpoint") -} - -func (c *container) DeleteProcess(ctx context.Context, id string) (*runtime.Exit, error) { - var process *hcs.Process - for _, p := range c.ctr.Processes() { - if p.ID() == id { - process = p - break - } - } - if process == nil { - return nil, fmt.Errorf("process %s not found", id) - } - ec, err := process.ExitCode() - if err != nil { - return nil, err - } - process.Delete() - return &runtime.Exit{ - Status: ec, - Timestamp: process.ExitedAt(), - }, nil -} - -func (c *container) Update(ctx context.Context, spec *types.Any) error { - return fmt.Errorf("Windows containers do not support update") -} - -func (c *container) setStatus(status runtime.Status) { - c.Lock() - c.status = status - c.Unlock() -} - -func (c *container) getStatus() runtime.Status { - c.Lock() - defer c.Unlock() - return c.status -} diff --git a/windows/hcs/hcs.go b/windows/hcs/hcs.go deleted file mode 100644 index 664a15f21..000000000 --- a/windows/hcs/hcs.go +++ /dev/null @@ -1,572 +0,0 @@ -// +build windows - -package hcs - -import ( - "context" - "fmt" - "io" - "io/ioutil" - "os" - "path/filepath" - "strings" - "sync" - "time" - - "github.com/Microsoft/hcsshim" - "github.com/Sirupsen/logrus" - "github.com/containerd/containerd/log" - "github.com/containerd/containerd/runtime" - "github.com/containerd/containerd/windows/pid" - "github.com/opencontainers/runtime-spec/specs-go" - "github.com/pkg/errors" -) - -const ( - layerFile = "layer" - defaultTerminateTimeout = 5 * time.Minute -) - -func (s *HCS) LoadContainers(ctx context.Context) ([]*Container, error) { - ctrProps, err := hcsshim.GetContainers(hcsshim.ComputeSystemQuery{}) - if err != nil { - return nil, errors.Wrap(err, "failed to retrieve running containers") - } - - containers := make([]*Container, 0) - for _, p := range ctrProps { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - } - - if p.Owner != s.owner || p.SystemType != "Container" { - continue - } - - container, err := hcsshim.OpenContainer(p.ID) - if err != nil { - return nil, errors.Wrapf(err, "failed open container %s", p.ID) - } - stateDir := filepath.Join(s.stateDir, p.ID) - b, err := ioutil.ReadFile(filepath.Join(stateDir, layerFile)) - containers = append(containers, &Container{ - id: p.ID, - Container: container, - stateDir: stateDir, - hcs: s, - io: &IO{}, - layerFolderPath: string(b), - terminateDuration: defaultTerminateTimeout, - }) - } - - return containers, nil -} - -func New(owner, root string) *HCS { - return &HCS{ - stateDir: root, - owner: owner, - pidPool: pid.NewPool(), - } -} - -type HCS struct { - stateDir string - owner string - pidPool *pid.Pool -} - -func (s *HCS) CreateContainer(ctx context.Context, id string, spec *specs.Spec, terminateDuration time.Duration, io *IO) (c *Container, err error) { - pid, err := s.pidPool.Get() - if err != nil { - return nil, err - } - defer func() { - if err != nil { - s.pidPool.Put(pid) - } - }() - - stateDir := filepath.Join(s.stateDir, id) - if err := os.MkdirAll(stateDir, 0755); err != nil { - return nil, errors.Wrapf(err, "unable to create container state dir %s", stateDir) - } - defer func() { - if err != nil { - os.RemoveAll(stateDir) - } - }() - - if terminateDuration == 0 { - terminateDuration = defaultTerminateTimeout - } - - conf, err := newContainerConfig(ctx, s.owner, id, spec) - if err != nil { - return nil, err - } - - layerPathFile := filepath.Join(stateDir, layerFile) - if err := ioutil.WriteFile(layerPathFile, []byte(conf.LayerFolderPath), 0644); err != nil { - log.G(ctx).WithError(err).Warnf("failed to save active layer %s", conf.LayerFolderPath) - } - - ctr, err := hcsshim.CreateContainer(id, conf) - if err != nil { - removeLayer(ctx, conf.LayerFolderPath) - return nil, errors.Wrapf(err, "failed to create container %s", id) - } - - err = ctr.Start() - if err != nil { - ctr.Terminate() - removeLayer(ctx, conf.LayerFolderPath) - return nil, errors.Wrapf(err, "failed to start container %s", id) - } - - return &Container{ - Container: ctr, - id: id, - pid: pid, - spec: spec, - stateDir: stateDir, - io: io, - hcs: s, - conf: conf, - terminateDuration: terminateDuration, - processes: make([]*Process, 0), - }, nil -} - -type Container struct { - sync.Mutex - hcsshim.Container - - conf *hcsshim.ContainerConfig - terminateDuration time.Duration - - id string - stateDir string - pid uint32 - spec *specs.Spec - io *IO - hcs *HCS - layerFolderPath string - - processes []*Process -} - -func (c *Container) ID() string { - return c.id -} - -func (c *Container) Pid() uint32 { - return c.pid -} - -func (c *Container) Processes() []*Process { - return c.processes -} - -func (c *Container) Start(ctx context.Context) error { - _, err := c.addProcess(ctx, c.id, c.spec.Process, c.io) - return err -} - -func (c *Container) getDeathErr(err error) error { - switch { - case hcsshim.IsPending(err): - err = c.WaitTimeout(c.terminateDuration) - case hcsshim.IsAlreadyStopped(err): - err = nil - } - return err -} - -func (c *Container) Kill(ctx context.Context) error { - return c.getDeathErr(c.Terminate()) -} - -func (c *Container) Stop(ctx context.Context) error { - err := c.getDeathErr(c.Shutdown()) - if err != nil { - log.G(ctx).WithError(err).Debugf("failed to shutdown container %s, calling terminate", c.id) - return c.getDeathErr(c.Terminate()) - } - return nil -} - -func (c *Container) CloseIO(ctx context.Context) error { - var proc *Process - c.Lock() - for _, p := range c.processes { - if p.id == c.id { - proc = p - break - } - } - c.Unlock() - if proc == nil { - return errors.Errorf("no such process %s", c.id) - } - - return proc.CloseStdin() -} - -func (c *Container) ResizePty(ctx context.Context, size runtime.ConsoleSize) error { - var proc *Process - c.Lock() - for _, p := range c.processes { - if p.id == c.id { - proc = p - break - } - } - c.Unlock() - if proc == nil { - return errors.Errorf("no such process %s", c.id) - } - - return proc.ResizeConsole(uint16(size.Width), uint16(size.Height)) -} - -func (c *Container) Delete(ctx context.Context) { - defer func() { - if err := c.Stop(ctx); err != nil { - log.G(ctx).WithError(err).WithField("id", c.id). - Errorf("failed to shutdown/terminate container") - } - - c.Lock() - for _, p := range c.processes { - if err := p.Delete(); err != nil { - log.G(ctx).WithError(err).WithFields(logrus.Fields{"pid": p.Pid(), "id": c.id}). - Errorf("failed to clean process resources") - } - } - c.Unlock() - - if err := c.Close(); err != nil { - log.G(ctx).WithError(err).WithField("id", c.id).Errorf("failed to clean container resources") - } - - c.io.Close() - - // Cleanup folder layer - if err := removeLayer(ctx, c.layerFolderPath); err == nil { - os.RemoveAll(c.stateDir) - } - }() - - if update, err := c.HasPendingUpdates(); err != nil || !update { - return - } - - serviceCtr, err := c.hcs.CreateContainer(ctx, c.id+"_servicing", c.spec, c.terminateDuration, &IO{}) - if err != nil { - log.G(ctx).WithError(err).WithField("id", c.id).Warn("could not create servicing container") - return - } - defer serviceCtr.Close() - - err = serviceCtr.Start(ctx) - if err != nil { - log.G(ctx).WithError(err).WithField("id", c.id).Warn("failed to start servicing container") - serviceCtr.Terminate() - return - } - - err = serviceCtr.processes[0].Wait() - if err == nil { - _, err = serviceCtr.processes[0].ExitCode() - log.G(ctx).WithError(err).WithField("id", c.id).Errorf("failed to retrieve servicing container exit code") - } - - if err != nil { - if err := serviceCtr.Terminate(); err != nil { - log.G(ctx).WithError(err).WithField("id", c.id).Errorf("failed to terminate servicing container") - } - } -} - -func (c *Container) ExitCode() (uint32, error) { - if len(c.processes) == 0 { - return 255, errors.New("container not started") - } - return c.processes[0].ExitCode() -} - -func (c *Container) IsHyperV() bool { - return c.conf.HvPartition -} - -func (c *Container) AddProcess(ctx context.Context, id string, spec *specs.Process, io *IO) (*Process, error) { - if len(c.processes) == 0 { - return nil, errors.New("container not started") - } - return c.addProcess(ctx, id, spec, io) -} - -func (c *Container) addProcess(ctx context.Context, id string, spec *specs.Process, pio *IO) (*Process, error) { - // If we don't have a process yet, reused the container pid - var pid uint32 - if len(c.processes) == 0 { - pid = c.pid - } else { - pid, err := c.hcs.pidPool.Get() - if err != nil { - return nil, err - } - defer func() { - if err != nil { - c.hcs.pidPool.Put(pid) - } - }() - } - - conf := hcsshim.ProcessConfig{ - EmulateConsole: pio.terminal, - CreateStdInPipe: pio.stdin != nil, - CreateStdOutPipe: pio.stdout != nil, - CreateStdErrPipe: pio.stderr != nil, - User: spec.User.Username, - CommandLine: strings.Join(spec.Args, " "), - Environment: ociSpecEnvToHCSEnv(spec.Env), - WorkingDirectory: spec.Cwd, - ConsoleSize: [2]uint{spec.ConsoleSize.Height, spec.ConsoleSize.Width}, - } - - if conf.WorkingDirectory == "" { - conf.WorkingDirectory = c.spec.Process.Cwd - } - - proc, err := c.CreateProcess(&conf) - if err != nil { - return nil, errors.Wrapf(err, "failed to create process") - } - - stdin, stdout, stderr, err := proc.Stdio() - if err != nil { - proc.Kill() - return nil, errors.Wrapf(err, "failed to retrieve process stdio") - } - - if pio.stdin != nil { - go func() { - log.G(ctx).WithFields(logrus.Fields{"id": c.id, "pid": pid}).Debug("stdin: copy started") - io.Copy(stdin, pio.stdin) - log.G(ctx).WithFields(logrus.Fields{"id": c.id, "pid": pid}).Debug("stdin: copy done") - stdin.Close() - pio.stdin.Close() - }() - } else { - proc.CloseStdin() - } - - if pio.stdout != nil { - go func() { - log.G(ctx).WithFields(logrus.Fields{"id": c.id, "pid": pid}).Debug("stdout: copy started") - io.Copy(pio.stdout, stdout) - log.G(ctx).WithFields(logrus.Fields{"id": c.id, "pid": pid}).Debug("stdout: copy done") - stdout.Close() - pio.stdout.Close() - }() - } - - if pio.stderr != nil { - go func() { - log.G(ctx).WithFields(logrus.Fields{"id": c.id, "pid": pid}).Debug("stderr: copy started") - io.Copy(pio.stderr, stderr) - log.G(ctx).WithFields(logrus.Fields{"id": c.id, "pid": pid}).Debug("stderr: copy done") - stderr.Close() - pio.stderr.Close() - }() - } - - p := &Process{ - id: id, - Process: proc, - pid: pid, - io: pio, - ecSync: make(chan struct{}), - } - - c.Lock() - c.processes = append(c.processes, p) - idx := len(c.processes) - 1 - c.Unlock() - - go func() { - p.ec, p.ecErr = processExitCode(c.ID(), p) - close(p.ecSync) - c.Lock() - p.Delete() - // Remove process from slice (but keep the init one around) - if idx > 0 { - c.processes[idx] = c.processes[len(c.processes)-1] - c.processes[len(c.processes)-1] = nil - c.processes = c.processes[:len(c.processes)-1] - } - c.Unlock() - }() - - return p, nil -} - -// newContainerConfig generates a hcsshim container configuration from the -// provided OCI Spec -func newContainerConfig(ctx context.Context, owner, id string, spec *specs.Spec) (*hcsshim.ContainerConfig, error) { - if len(spec.Windows.LayerFolders) == 0 { - return nil, errors.New("LayerFolders cannot be empty") - } - - var ( - layerFolders = spec.Windows.LayerFolders - conf = &hcsshim.ContainerConfig{ - SystemType: "Container", - Name: id, - Owner: owner, - HostName: spec.Hostname, - IgnoreFlushesDuringBoot: spec.Windows.IgnoreFlushesDuringBoot, - AllowUnqualifiedDNSQuery: spec.Windows.Network.AllowUnqualifiedDNSQuery, - EndpointList: spec.Windows.Network.EndpointList, - NetworkSharedContainerName: spec.Windows.Network.NetworkSharedContainerName, - Credentials: spec.Windows.CredentialSpec.(string), - } - ) - - // TODO: use the create request Mount for those - for _, layerPath := range layerFolders { - _, filename := filepath.Split(layerPath) - guid, err := hcsshim.NameToGuid(filename) - if err != nil { - return nil, err - } - conf.Layers = append(conf.Layers, hcsshim.Layer{ - ID: guid.ToString(), - Path: layerPath, - }) - } - - if len(spec.Mounts) > 0 { - mds := make([]hcsshim.MappedDir, len(spec.Mounts)) - for i, mount := range spec.Mounts { - mds[i] = hcsshim.MappedDir{ - HostPath: mount.Source, - ContainerPath: mount.Destination, - ReadOnly: false, - } - for _, o := range mount.Options { - if strings.ToLower(o) == "ro" { - mds[i].ReadOnly = true - } - } - } - conf.MappedDirectories = mds - } - - if spec.Windows.Network.DNSSearchList != nil { - conf.DNSSearchList = strings.Join(spec.Windows.Network.DNSSearchList, ",") - } - - if spec.Windows.HyperV != nil { - conf.HvPartition = true - for _, layerPath := range layerFolders { - utilityVMPath := spec.Windows.HyperV.UtilityVMPath - _, err := os.Stat(utilityVMPath) - if err == nil { - conf.HvRuntime = &hcsshim.HvRuntime{ImagePath: utilityVMPath} - break - } else if !os.IsNotExist(err) { - return nil, errors.Wrapf(err, "failed to access layer %s", layerPath) - } - } - } - - var ( - err error - di = hcsshim.DriverInfo{ - Flavour: 1, // filter driver - HomeDir: filepath.Dir(layerFolders[0]), - } - ) - - // Windows doesn't support creating a container with a readonly - // filesystem, so always create a RW one - if err = hcsshim.CreateSandboxLayer(di, id, layerFolders[0], layerFolders); err != nil { - return nil, errors.Wrapf(err, "failed to create sandbox layer for %s: layers: %#v, driverInfo: %#v", - id, layerFolders, di) - } - conf.LayerFolderPath = filepath.Join(di.HomeDir, id) - defer func() { - if err != nil { - removeLayer(ctx, conf.LayerFolderPath) - } - }() - - if err = hcsshim.ActivateLayer(di, id); err != nil { - return nil, errors.Wrapf(err, "failed to activate layer %s", conf.LayerFolderPath) - } - - if err = hcsshim.PrepareLayer(di, id, layerFolders); err != nil { - return nil, errors.Wrapf(err, "failed to prepare layer %s", conf.LayerFolderPath) - } - - conf.VolumePath, err = hcsshim.GetLayerMountPath(di, id) - if err != nil { - return nil, errors.Wrapf(err, "failed to getmount path for layer %s: driverInfo: %#v", id, di) - } - - return conf, nil -} - -// removeLayer deletes the given layer, all associated containers must have -// been shutdown for this to succeed. -func removeLayer(ctx context.Context, path string) error { - var ( - err error - layerID = filepath.Base(path) - parentPath = filepath.Dir(path) - di = hcsshim.DriverInfo{ - Flavour: 1, // filter driver - HomeDir: parentPath, - } - ) - - if err = hcsshim.UnprepareLayer(di, layerID); err != nil { - log.G(ctx).WithError(err).Warnf("failed to unprepare layer %s for removal", path) - } - - if err = hcsshim.DeactivateLayer(di, layerID); err != nil { - log.G(ctx).WithError(err).Warnf("failed to deactivate layer %s for removal", path) - } - - removePath := filepath.Join(parentPath, fmt.Sprintf("%s-removing", layerID)) - if err = os.Rename(path, removePath); err != nil { - log.G(ctx).WithError(err).Warnf("failed to rename container layer %s for removal", path) - removePath = path - } - - if err = hcsshim.DestroyLayer(di, removePath); err != nil { - log.G(ctx).WithError(err).Errorf("failed to remove container layer %s", removePath) - return err - } - - return nil -} - -// ociSpecEnvToHCSEnv converts from the OCI Spec ENV format to the one -// expected by HCS. -func ociSpecEnvToHCSEnv(a []string) map[string]string { - env := make(map[string]string) - for _, s := range a { - arr := strings.SplitN(s, "=", 2) - if len(arr) == 2 { - env[arr[0]] = arr[1] - } - } - return env -} diff --git a/windows/hcs/process.go b/windows/hcs/process.go deleted file mode 100644 index 5b70a261a..000000000 --- a/windows/hcs/process.go +++ /dev/null @@ -1,76 +0,0 @@ -// +build windows - -package hcs - -import ( - "syscall" - "time" - - "github.com/Microsoft/hcsshim" - "github.com/containerd/containerd/runtime" - "github.com/pkg/errors" -) - -type Process struct { - hcsshim.Process - - id string - pid uint32 - io *IO - ec uint32 - exitedAt time.Time - ecErr error - ecSync chan struct{} -} - -func (p *Process) ID() string { - return p.id -} - -func (p *Process) Pid() uint32 { - return p.pid -} - -func (p *Process) ExitCode() (uint32, error) { - <-p.ecSync - return p.ec, p.ecErr -} - -func (p *Process) ExitedAt() time.Time { - return p.exitedAt -} - -func (p *Process) Status() runtime.Status { - select { - case <-p.ecSync: - return runtime.StoppedStatus - default: - } - - return runtime.RunningStatus -} - -func (p *Process) Delete() error { - p.io.Close() - return p.Close() -} - -func processExitCode(containerID string, p *Process) (uint32, error) { - if err := p.Wait(); err != nil { - if herr, ok := err.(*hcsshim.ProcessError); ok && herr.Err != syscall.ERROR_BROKEN_PIPE { - return 255, errors.Wrapf(err, "failed to wait for container '%s' process %u", containerID, p.pid) - } - // process is probably dead, let's try to get its exit code - } - - ec, err := p.Process.ExitCode() - if err != nil { - if herr, ok := err.(*hcsshim.ProcessError); ok && herr.Err != syscall.ERROR_BROKEN_PIPE { - return 255, errors.Wrapf(err, "failed to get container '%s' process %d exit code", containerID, p.pid) - } - // Well, unknown exit code it is - ec = 255 - } - p.exitedAt = time.Now() - return uint32(ec), err -} diff --git a/windows/hcs/shimio.go b/windows/hcs/shimio.go deleted file mode 100644 index 76e1d57fd..000000000 --- a/windows/hcs/shimio.go +++ /dev/null @@ -1,76 +0,0 @@ -// +build windows - -package hcs - -import ( - "net" - "time" - - "github.com/Microsoft/go-winio" - "github.com/pkg/errors" -) - -type IO struct { - stdin net.Conn - stdout net.Conn - stderr net.Conn - terminal bool -} - -// NewIO connects to the provided pipe addresses -func NewIO(stdin, stdout, stderr string, terminal bool) (*IO, error) { - var ( - c net.Conn - err error - io IO - ) - - defer func() { - if err != nil { - io.Close() - } - }() - - for _, p := range []struct { - name string - open bool - conn *net.Conn - }{ - { - name: stdin, - open: stdin != "", - conn: &io.stdin, - }, - { - name: stdout, - open: stdout != "", - conn: &io.stdout, - }, - { - name: stderr, - open: !terminal && stderr != "", - conn: &io.stderr, - }, - } { - if p.open { - dialTimeout := 3 * time.Second - c, err = winio.DialPipe(p.name, &dialTimeout) - if err != nil { - return nil, errors.Wrapf(err, "failed to connect to %s", p.name) - } - *p.conn = c - } - } - - return &io, nil -} - -// Close terminates all successfully dialed IO connections -func (i *IO) Close() { - for _, cn := range []net.Conn{i.stdin, i.stdout, i.stderr} { - if cn != nil { - cn.Close() - cn = nil - } - } -} diff --git a/windows/hcs/types.go b/windows/hcs/types.go deleted file mode 100644 index e9cf981e7..000000000 --- a/windows/hcs/types.go +++ /dev/null @@ -1,9 +0,0 @@ -// +build windows - -package hcs - -import "time" - -type Configuration struct { - TerminateDuration time.Duration `json:"terminateDuration,omitempty"` -} diff --git a/windows/hcsshim.go b/windows/hcsshim.go new file mode 100644 index 000000000..031a6359f --- /dev/null +++ b/windows/hcsshim.go @@ -0,0 +1,190 @@ +//+build windows + +package windows + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/Microsoft/hcsshim" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" + specs "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" +) + +// newContainerConfig generates a hcsshim container configuration from the +// provided OCI Spec +func newContainerConfig(ctx context.Context, owner, id string, spec *specs.Spec) (*hcsshim.ContainerConfig, error) { + if len(spec.Windows.LayerFolders) == 0 { + return nil, errors.Wrap(errdefs.ErrInvalidArgument, + "spec.Windows.LayerFolders cannot be empty") + } + + var ( + layerFolders = spec.Windows.LayerFolders + conf = &hcsshim.ContainerConfig{ + SystemType: "Container", + Name: id, + Owner: owner, + HostName: spec.Hostname, + IgnoreFlushesDuringBoot: spec.Windows.IgnoreFlushesDuringBoot, + AllowUnqualifiedDNSQuery: spec.Windows.Network.AllowUnqualifiedDNSQuery, + EndpointList: spec.Windows.Network.EndpointList, + NetworkSharedContainerName: spec.Windows.Network.NetworkSharedContainerName, + } + ) + + if spec.Windows.CredentialSpec != nil { + conf.Credentials = spec.Windows.CredentialSpec.(string) + } + + // TODO: use the create request Mount for those + for _, layerPath := range layerFolders { + _, filename := filepath.Split(layerPath) + guid, err := hcsshim.NameToGuid(filename) + if err != nil { + return nil, errors.Wrapf(err, "unable to get GUID for %s", filename) + } + conf.Layers = append(conf.Layers, hcsshim.Layer{ + ID: guid.ToString(), + Path: layerPath, + }) + } + + if len(spec.Mounts) > 0 { + mds := make([]hcsshim.MappedDir, len(spec.Mounts)) + for i, mount := range spec.Mounts { + mds[i] = hcsshim.MappedDir{ + HostPath: mount.Source, + ContainerPath: mount.Destination, + ReadOnly: false, + } + for _, o := range mount.Options { + if strings.ToLower(o) == "ro" { + mds[i].ReadOnly = true + } + } + } + conf.MappedDirectories = mds + } + + if spec.Windows.Network.DNSSearchList != nil { + conf.DNSSearchList = strings.Join(spec.Windows.Network.DNSSearchList, ",") + } + + if spec.Windows.HyperV != nil { + conf.HvPartition = true + for _, layerPath := range layerFolders { + utilityVMPath := spec.Windows.HyperV.UtilityVMPath + _, err := os.Stat(utilityVMPath) + if err == nil { + conf.HvRuntime = &hcsshim.HvRuntime{ImagePath: utilityVMPath} + break + } else if !os.IsNotExist(err) { + return nil, errors.Wrapf(err, "failed to access layer %s", layerPath) + } + } + } + + var ( + err error + di = hcsshim.DriverInfo{ + Flavour: 1, // filter driver + HomeDir: filepath.Dir(layerFolders[0]), + } + ) + + // TODO: Once there is a snapshotter for windows, this can be deleted. + // The R/W Layer should come from the Rootfs Mounts provided + // + // Windows doesn't support creating a container with a readonly + // filesystem, so always create a RW one + if err = hcsshim.CreateSandboxLayer(di, id, layerFolders[0], layerFolders); err != nil { + return nil, errors.Wrapf(err, "failed to create sandbox layer for %s: layers: %#v, driverInfo: %#v", + id, layerFolders, di) + } + conf.LayerFolderPath = filepath.Join(di.HomeDir, id) + defer func() { + if err != nil { + removeLayer(ctx, conf.LayerFolderPath) + } + }() + + if err = hcsshim.ActivateLayer(di, id); err != nil { + return nil, errors.Wrapf(err, "failed to activate layer %s", conf.LayerFolderPath) + } + + if err = hcsshim.PrepareLayer(di, id, layerFolders); err != nil { + return nil, errors.Wrapf(err, "failed to prepare layer %s", conf.LayerFolderPath) + } + + conf.VolumePath, err = hcsshim.GetLayerMountPath(di, id) + if err != nil { + return nil, errors.Wrapf(err, "failed to getmount path for layer %s: driverInfo: %#v", id, di) + } + + return conf, nil +} + +// removeLayer deletes the given layer, all associated containers must have +// been shutdown for this to succeed. +func removeLayer(ctx context.Context, path string) error { + var ( + err error + layerID = filepath.Base(path) + parentPath = filepath.Dir(path) + di = hcsshim.DriverInfo{ + Flavour: 1, // filter driver + HomeDir: parentPath, + } + ) + + if err = hcsshim.UnprepareLayer(di, layerID); err != nil { + log.G(ctx).WithError(err).Warnf("failed to unprepare layer %s for removal", path) + } + + if err = hcsshim.DeactivateLayer(di, layerID); err != nil { + log.G(ctx).WithError(err).Warnf("failed to deactivate layer %s for removal", path) + } + + removePath := filepath.Join(parentPath, fmt.Sprintf("%s-removing", layerID)) + if err = os.Rename(path, removePath); err != nil { + log.G(ctx).WithError(err).Warnf("failed to rename container layer %s for removal", path) + removePath = path + } + + if err = hcsshim.DestroyLayer(di, removePath); err != nil { + log.G(ctx).WithError(err).Errorf("failed to remove container layer %s", removePath) + return err + } + + return nil +} + +func newProcessConfig(spec *specs.Process, pset *pipeSet) *hcsshim.ProcessConfig { + conf := &hcsshim.ProcessConfig{ + EmulateConsole: pset.src.Terminal, + CreateStdInPipe: pset.stdin != nil, + CreateStdOutPipe: pset.stdout != nil, + CreateStdErrPipe: pset.stderr != nil, + User: spec.User.Username, + CommandLine: strings.Join(spec.Args, " "), + Environment: make(map[string]string), + WorkingDirectory: spec.Cwd, + ConsoleSize: [2]uint{spec.ConsoleSize.Height, spec.ConsoleSize.Width}, + } + + // Convert OCI Env format to HCS's + for _, s := range spec.Env { + arr := strings.SplitN(s, "=", 2) + if len(arr) == 2 { + conf.Environment[arr[0]] = arr[1] + } + } + + return conf +} diff --git a/windows/io.go b/windows/io.go new file mode 100644 index 000000000..53e76ee9f --- /dev/null +++ b/windows/io.go @@ -0,0 +1,110 @@ +// +build windows + +package windows + +import ( + "context" + "net" + "sync" + "time" + + "github.com/Microsoft/go-winio" + "github.com/containerd/containerd/runtime" + "github.com/pkg/errors" +) + +type pipeSet struct { + src runtime.IO + stdin net.Conn + stdout net.Conn + stderr net.Conn +} + +// NewIO connects to the provided pipe addresses +func newPipeSet(ctx context.Context, io runtime.IO) (*pipeSet, error) { + var ( + err error + c net.Conn + wg sync.WaitGroup + set = &pipeSet{src: io} + ch = make(chan error) + opened = 0 + ) + + defer func() { + if err != nil { + go func() { + for i := 0; i < opened; i++ { + // Drain the channel to avoid leaking the goroutines + <-ch + } + close(ch) + wg.Wait() + set.Close() + }() + } + }() + + for _, p := range [3]struct { + name string + open bool + conn *net.Conn + }{ + { + name: io.Stdin, + open: io.Stdin != "", + conn: &set.stdin, + }, + { + name: io.Stdout, + open: io.Stdout != "", + conn: &set.stdout, + }, + { + name: io.Stderr, + open: !io.Terminal && io.Stderr != "", + conn: &set.stderr, + }, + } { + if p.open { + wg.Add(1) + opened++ + go func(name string, conn *net.Conn) { + dialTimeout := 3 * time.Second + c, err = winio.DialPipe(name, &dialTimeout) + if err != nil { + ch <- errors.Wrapf(err, "failed to connect to %s", name) + } + *conn = c + ch <- nil + wg.Done() + }(p.name, p.conn) + } + } + + for i := 0; i < opened; i++ { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case e := <-ch: + if e != nil { + if err == nil { + err = e + } else { + err = errors.Wrapf(err, e.Error()) + } + } + } + } + + return set, err +} + +// Close terminates all successfully dialed IO connections +func (p *pipeSet) Close() { + for _, cn := range []net.Conn{p.stdin, p.stdout, p.stderr} { + if cn != nil { + cn.Close() + } + } +} diff --git a/windows/meta.go b/windows/meta.go new file mode 100644 index 000000000..81ab9245e --- /dev/null +++ b/windows/meta.go @@ -0,0 +1,54 @@ +// +build windows + +package windows + +// TODO: remove this file (i.e. meta.go) once we have a snapshotter + +import ( + "github.com/boltdb/bolt" + "github.com/containerd/containerd/errdefs" + "github.com/pkg/errors" +) + +func newLayerFolderStore(tx *bolt.Tx) *layerFolderStore { + return &layerFolderStore{tx} +} + +type layerFolderStore struct { + tx *bolt.Tx +} + +func (s *layerFolderStore) Create(id, layer string) error { + bkt, err := s.tx.CreateBucketIfNotExists([]byte(pluginID)) + if err != nil { + return errors.Wrapf(err, "failed to create bucket %s", pluginID) + } + err = bkt.Put([]byte(id), []byte(layer)) + if err != nil { + return errors.Wrapf(err, "failed to store entry %s:%s", id, layer) + } + + return nil +} + +func (s *layerFolderStore) Get(id string) (string, error) { + bkt := s.tx.Bucket([]byte(pluginID)) + if bkt == nil { + return "", errors.Wrapf(errdefs.ErrNotFound, "bucket %s", pluginID) + } + + return string(bkt.Get([]byte(id))), nil +} + +func (s *layerFolderStore) Delete(id string) error { + bkt := s.tx.Bucket([]byte(pluginID)) + if bkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "bucket %s", pluginID) + } + + if err := bkt.Delete([]byte(id)); err != nil { + return errors.Wrapf(err, "failed to delete entry %s", id) + } + + return nil +} diff --git a/windows/pid/pid.go b/windows/pid_pool.go similarity index 74% rename from windows/pid/pid.go rename to windows/pid_pool.go index 844e78eb4..004ae9c64 100644 --- a/windows/pid/pid.go +++ b/windows/pid_pool.go @@ -1,25 +1,25 @@ // +build windows -package pid +package windows import ( "errors" "sync" ) -type Pool struct { +type pidPool struct { sync.Mutex pool map[uint32]struct{} cur uint32 } -func NewPool() *Pool { - return &Pool{ +func newPidPool() *pidPool { + return &pidPool{ pool: make(map[uint32]struct{}), } } -func (p *Pool) Get() (uint32, error) { +func (p *pidPool) Get() (uint32, error) { p.Lock() defer p.Unlock() @@ -40,7 +40,7 @@ func (p *Pool) Get() (uint32, error) { return 0, errors.New("pid pool exhausted") } -func (p *Pool) Put(pid uint32) { +func (p *pidPool) Put(pid uint32) { p.Lock() delete(p.pool, pid) p.Unlock() diff --git a/windows/process.go b/windows/process.go index 57c67141b..878339fe9 100644 --- a/windows/process.go +++ b/windows/process.go @@ -4,39 +4,80 @@ package windows import ( "context" + "time" + "github.com/Microsoft/hcsshim" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/runtime" - "github.com/containerd/containerd/windows/hcs" + "github.com/pkg/errors" ) // process implements containerd.Process and containerd.State type process struct { - *hcs.Process + hcs hcsshim.Process + + id string + pid uint32 + io *pipeSet + status runtime.Status + task *task + + exitCh chan struct{} + exitCode uint32 + exitTime time.Time +} + +func (p *process) ID() string { + return p.id } func (p *process) State(ctx context.Context) (runtime.State, error) { return runtime.State{ - Pid: p.Pid(), - Status: p.Status(), + Status: p.Status(), + Pid: p.pid, + Stdin: p.io.src.Stdin, + Stdout: p.io.src.Stdout, + Stderr: p.io.src.Stderr, + Terminal: p.io.src.Terminal, }, nil } -func (p *process) Kill(ctx context.Context, sig uint32, all bool) error { - return p.Process.Kill() -} - func (p *process) Status() runtime.Status { - return p.Process.Status() + if p.task.getStatus() == runtime.PausedStatus { + return runtime.PausedStatus + } + + var status runtime.Status + select { + case <-p.exitCh: + status = runtime.StoppedStatus + default: + status = runtime.RunningStatus + } + return status } -func (p *process) Pid() uint32 { - return p.Process.Pid() -} - -func (p *process) CloseIO(ctx context.Context) error { - return p.Process.CloseStdin() +func (p *process) Kill(ctx context.Context, sig uint32, all bool) error { + // On windows all signals kill the process + return errors.Wrap(p.hcs.Kill(), "failed to kill process") } func (p *process) ResizePty(ctx context.Context, size runtime.ConsoleSize) error { - return p.Process.ResizeConsole(uint16(size.Width), uint16(size.Height)) + err := p.hcs.ResizeConsole(uint16(size.Width), uint16(size.Height)) + return errors.Wrap(err, "failed to resize process console") +} + +func (p *process) CloseIO(ctx context.Context) error { + return errors.Wrap(p.hcs.CloseStdin(), "failed to close stdin") +} + +func (p *process) Pid() uint32 { + return p.pid +} + +func (p *process) ExitCode() (uint32, time.Time, error) { + if p.Status() != runtime.StoppedStatus { + return 255, time.Time{}, errors.Wrap(errdefs.ErrFailedPrecondition, "process is not stopped") + } + return p.exitCode, p.exitTime, nil } diff --git a/windows/runtime.go b/windows/runtime.go index a18045803..c48f7e6f6 100644 --- a/windows/runtime.go +++ b/windows/runtime.go @@ -6,173 +6,351 @@ import ( "context" "fmt" "os" - "path/filepath" "sync" + "time" + "github.com/Microsoft/hcsshim" + "github.com/boltdb/bolt" + eventsapi "github.com/containerd/containerd/api/services/events/v1" + containerdtypes "github.com/containerd/containerd/api/types" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/typeurl" - "github.com/containerd/containerd/windows/hcs" "github.com/containerd/containerd/windows/hcsshimopts" - "github.com/containerd/containerd/windows/pid" specs "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" ) const ( - runtimeName = "windows" - owner = "containerd" + runtimeName = "windows" + hcsshimOwner = "containerd" + defaultTerminateDuration = 5 * time.Minute ) -var _ = (runtime.Runtime)(&Runtime{}) +var ( + pluginID = fmt.Sprintf("%s.%s", plugin.RuntimePlugin, runtimeName) +) + +var _ = (runtime.Runtime)(&windowsRuntime{}) func init() { plugin.Register(&plugin.Registration{ - ID: "windows", + ID: runtimeName, Type: plugin.RuntimePlugin, Init: New, + Requires: []plugin.PluginType{ + plugin.MetadataPlugin, + }, }) - typeurl.Register(&RuntimeSpec{}, "windows/Spec") } func New(ic *plugin.InitContext) (interface{}, error) { - rootDir := filepath.Join(ic.Root) - if err := os.MkdirAll(rootDir, 0755); err != nil { - return nil, errors.Wrapf(err, "could not create state directory at %s", rootDir) + if err := os.MkdirAll(ic.Root, 0700); err != nil { + return nil, errors.Wrapf(err, "could not create state directory at %s", ic.Root) } - c, cancel := context.WithCancel(ic.Context) - r := &Runtime{ - pidPool: pid.NewPool(), - containers: make(map[string]*container), - events: make(chan interface{}, 2048), - eventsContext: c, - eventsCancel: cancel, - rootDir: rootDir, - hcs: hcs.New(owner, rootDir), - } - - // Terminate all previous container that we may have started. We don't - // support restoring containers - ctrs, err := loadContainers(ic.Context, r.hcs) + m, err := ic.Get(plugin.MetadataPlugin) if err != nil { return nil, err } - for _, c := range ctrs { - c.ctr.Delete(ic.Context) - //r.sendEvent(c.ctr.ID(), events.RuntimeEvent_EXIT, c.ctr.Pid(), 255, time.Time{}) + r := &windowsRuntime{ + root: ic.Root, + pidPool: newPidPool(), + + events: make(chan interface{}, 4096), + emitter: ic.Emitter, + // TODO(mlaventure): windows needs a stat monitor + monitor: nil, + tasks: runtime.NewTaskList(), + db: m.(*bolt.DB), } - // Try to delete the old state dir and recreate it - stateDir := filepath.Join(ic.Root, "state") - if err := os.RemoveAll(stateDir); err != nil { - log.G(c).WithError(err).Warnf("failed to cleanup old state directory at %s", stateDir) - } - if err := os.MkdirAll(stateDir, 0755); err != nil { - return nil, errors.Wrapf(err, "could not create state directory at %s", stateDir) - } - r.stateDir = stateDir + // Load our existing containers and kill/delete them. We don't support + // reattaching to them + r.cleanup(ic.Context) return r, nil } -type Runtime struct { +type windowsRuntime struct { sync.Mutex - rootDir string - stateDir string - pidPool *pid.Pool + root string + pidPool *pidPool - hcs *hcs.HCS + emitter *events.Emitter + events chan interface{} - containers map[string]*container - - events chan interface{} - eventsContext context.Context - eventsCancel func() + monitor runtime.TaskMonitor + tasks *runtime.TaskList + db *bolt.DB } -type RuntimeSpec struct { - // Spec is the OCI spec - OCISpec specs.Spec - - // HCS specific options - hcs.Configuration +func (r *windowsRuntime) ID() string { + return pluginID } -func (r *Runtime) ID() string { - return fmt.Sprintf("%s.%s", plugin.RuntimePlugin, runtimeName) -} +func (r *windowsRuntime) Create(ctx context.Context, id string, opts runtime.CreateOpts) (runtime.Task, error) { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } -func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts) (runtime.Task, error) { s, err := typeurl.UnmarshalAny(opts.Spec) if err != nil { return nil, err } spec := s.(*specs.Spec) - o, err := typeurl.UnmarshalAny(opts.Options) - if err != nil { - return nil, err + var createOpts *hcsshimopts.CreateOptions + if opts.Options != nil { + o, err := typeurl.UnmarshalAny(opts.Options) + if err != nil { + return nil, err + } + createOpts = o.(*hcsshimopts.CreateOptions) + } else { + createOpts = &hcsshimopts.CreateOptions{} } - createOpts := o.(*hcsshimopts.CreateOptions) - ctr, err := r.newContainer(ctx, id, spec, createOpts, opts.IO) + + if createOpts.TerminateDuration == 0 { + createOpts.TerminateDuration = defaultTerminateDuration + } + + t, err := r.newTask(ctx, r.emitter, namespace, id, spec, opts.IO, createOpts) if err != nil { return nil, err } - r.Lock() - r.containers[id] = ctr - r.Unlock() + r.tasks.Add(ctx, t) - return ctr, nil + r.emitter.Post(events.WithTopic(ctx, "/tasks/create"), &eventsapi.TaskCreate{ + ContainerID: id, + IO: &eventsapi.TaskIO{ + Stdin: opts.IO.Stdin, + Stdout: opts.IO.Stdout, + Stderr: opts.IO.Stderr, + Terminal: opts.IO.Terminal, + }, + Pid: t.pid, + Rootfs: t.rootfs, + // TODO: what should be in Bundle for windows? + }) + + return t, nil } -func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, error) { - wc, ok := c.(*container) +func (r *windowsRuntime) Get(ctx context.Context, id string) (runtime.Task, error) { + return r.tasks.Get(ctx, id) +} + +func (r *windowsRuntime) Tasks(ctx context.Context) ([]runtime.Task, error) { + return r.tasks.GetAll(ctx) +} + +func (r *windowsRuntime) Delete(ctx context.Context, t runtime.Task) (*runtime.Exit, error) { + wt, ok := t.(*task) if !ok { - return nil, fmt.Errorf("container cannot be cast as *windows.container") + return nil, errors.Wrap(errdefs.ErrInvalidArgument, "no a windows task") } - ec, err := wc.ctr.ExitCode() + + // TODO(mlaventure): stop monitor on this task + + state, _ := wt.State(ctx) + switch state.Status { + case runtime.StoppedStatus, runtime.CreatedStatus: + // if it's stopped or in created state, we need to shutdown the + // container before removing it + if err := wt.stop(ctx); err != nil { + return nil, err + } + default: + return nil, errors.Wrap(errdefs.ErrFailedPrecondition, + "cannot delete a non-stopped task") + } + + var rtExit *runtime.Exit + if p := wt.getProcess(t.ID()); p != nil { + ec, ea, err := p.ExitCode() + if err != nil { + return nil, err + } + rtExit = &runtime.Exit{ + Pid: wt.pid, + Status: ec, + Timestamp: ea, + } + } else { + rtExit = &runtime.Exit{ + Pid: wt.pid, + Status: 255, + Timestamp: time.Now(), + } + } + + wt.cleanup() + r.tasks.Delete(ctx, t) + + r.emitter.Post(events.WithTopic(ctx, "/tasks/delete"), &eventsapi.TaskDelete{ + ContainerID: wt.id, + Pid: wt.pid, + ExitStatus: rtExit.Status, + ExitedAt: rtExit.Timestamp, + }) + + // We were never started, return failure + return rtExit, nil +} + +func (r *windowsRuntime) newTask(ctx context.Context, emitter *events.Emitter, namespace, id string, spec *specs.Spec, io runtime.IO, createOpts *hcsshimopts.CreateOptions) (*task, error) { + var ( + err error + pset *pipeSet + ) + + if pset, err = newPipeSet(ctx, io); err != nil { + return nil, err + } + defer func() { + if err != nil { + pset.Close() + } + }() + + var pid uint32 + if pid, err = r.pidPool.Get(); err != nil { + return nil, err + } + defer func() { + if err != nil { + r.pidPool.Put(pid) + } + }() + + var ( + conf *hcsshim.ContainerConfig + nsid = namespace + "-" + id + ) + if conf, err = newContainerConfig(ctx, hcsshimOwner, nsid, spec); err != nil { + return nil, err + } + defer func() { + if err != nil { + removeLayer(ctx, conf.LayerFolderPath) + } + }() + + // TODO: remove this once we have a windows snapshotter + // Store the LayerFolder in the db so we can clean it if we die + if err = r.db.Update(func(tx *bolt.Tx) error { + s := newLayerFolderStore(tx) + return s.Create(nsid, conf.LayerFolderPath) + }); err != nil { + return nil, err + } + defer func() { + if err != nil { + if dbErr := r.db.Update(func(tx *bolt.Tx) error { + s := newLayerFolderStore(tx) + return s.Delete(nsid) + }); dbErr != nil { + log.G(ctx).WithField("id", id). + Error("failed to remove key from metadata") + } + } + }() + + ctr, err := hcsshim.CreateContainer(nsid, conf) if err != nil { - log.G(ctx).WithError(err).Errorf("failed to retrieve exit code for container %s", wc.ctr.ID()) + return nil, errors.Wrapf(err, "hcsshim failed to create task") + } + defer func() { + if err != nil { + ctr.Terminate() + ctr.Wait() + ctr.Close() + } + }() + + if err = ctr.Start(); err != nil { + return nil, errors.Wrap(err, "hcsshim failed to spawn task") } - wc.ctr.Delete(ctx) + var rootfs []*containerdtypes.Mount + for _, l := range append([]string{conf.LayerFolderPath}, spec.Windows.LayerFolders...) { + rootfs = append(rootfs, &containerdtypes.Mount{ + Type: "windows-layer", + Source: l, + }) + } - r.Lock() - delete(r.containers, wc.ctr.ID()) - r.Unlock() - - return &runtime.Exit{ - Status: ec, - Timestamp: wc.ctr.Processes()[0].ExitedAt(), + return &task{ + id: id, + namespace: namespace, + pid: pid, + io: pset, + status: runtime.CreatedStatus, + initSpec: spec.Process, + processes: make(map[string]*process), + hyperV: spec.Windows.HyperV != nil, + rootfs: rootfs, + emitter: emitter, + pidPool: r.pidPool, + hcsContainer: ctr, + terminateDuration: createOpts.TerminateDuration, }, nil } -func (r *Runtime) Tasks(ctx context.Context) ([]runtime.Task, error) { - r.Lock() - defer r.Unlock() - list := make([]runtime.Task, len(r.containers)) - for _, c := range r.containers { - select { - case <-ctx.Done(): - return nil, ctx.Err() - default: - list = append(list, c) - } +func (r *windowsRuntime) cleanup(ctx context.Context) { + cp, err := hcsshim.GetContainers(hcsshim.ComputeSystemQuery{ + Types: []string{"Container"}, + Owners: []string{hcsshimOwner}, + }) + if err != nil { + log.G(ctx).Warn("failed to retrieve running containers") + return } - return list, nil -} -func (r *Runtime) Get(ctx context.Context, id string) (runtime.Task, error) { - r.Lock() - defer r.Unlock() - c, ok := r.containers[id] - if !ok { - return nil, fmt.Errorf("container %s does not exit", id) + for _, p := range cp { + container, err := hcsshim.OpenContainer(p.ID) + if err != nil { + log.G(ctx).Warnf("failed open container %s", p.ID) + continue + } + + err = container.Terminate() + if err == nil || hcsshim.IsPending(err) || hcsshim.IsAlreadyStopped(err) { + container.Wait() + } + container.Close() + + // TODO: remove this once we have a windows snapshotter + var layerFolderPath string + if err := r.db.View(func(tx *bolt.Tx) error { + s := newLayerFolderStore(tx) + l, e := s.Get(p.ID) + if err == nil { + layerFolderPath = l + } + return e + }); err == nil && layerFolderPath != "" { + removeLayer(ctx, layerFolderPath) + if dbErr := r.db.Update(func(tx *bolt.Tx) error { + s := newLayerFolderStore(tx) + return s.Delete(p.ID) + }); dbErr != nil { + log.G(ctx).WithField("id", p.ID). + Error("failed to remove key from metadata") + } + } else { + log.G(ctx).WithField("id", p.ID). + Debug("key not found in metadata, R/W layer may be leaked") + } + } - return c, nil } diff --git a/windows/task.go b/windows/task.go new file mode 100644 index 000000000..9a3fdbe65 --- /dev/null +++ b/windows/task.go @@ -0,0 +1,436 @@ +// +build windows + +package windows + +import ( + "context" + "io" + "sync" + "syscall" + "time" + + "github.com/Microsoft/hcsshim" + "github.com/Sirupsen/logrus" + eventsapi "github.com/containerd/containerd/api/services/events/v1" + containerdtypes "github.com/containerd/containerd/api/types" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/runtime" + "github.com/containerd/containerd/typeurl" + "github.com/gogo/protobuf/types" + specs "github.com/opencontainers/runtime-spec/specs-go" + "github.com/pkg/errors" +) + +type task struct { + sync.Mutex + + id string + namespace string + pid uint32 + io *pipeSet + status runtime.Status + initSpec *specs.Process + processes map[string]*process + hyperV bool + + rootfs []*containerdtypes.Mount + emitter *events.Emitter + + pidPool *pidPool + hcsContainer hcsshim.Container + terminateDuration time.Duration +} + +func (t *task) ID() string { + return t.id +} + +func (t *task) State(ctx context.Context) (runtime.State, error) { + var status runtime.Status + + if p := t.getProcess(t.id); p != nil { + status = p.Status() + } else { + status = t.getStatus() + } + + return runtime.State{ + Status: status, + Pid: t.pid, + Stdin: t.io.src.Stdin, + Stdout: t.io.src.Stdout, + Stderr: t.io.src.Stderr, + Terminal: t.io.src.Terminal, + }, nil +} + +func (t *task) Kill(ctx context.Context, signal uint32, all bool) error { + p := t.getProcess(t.id) + if p == nil { + return errors.Wrapf(errdefs.ErrFailedPrecondition, "task is not running") + } + + if p.Status() == runtime.StoppedStatus { + return errors.Wrapf(errdefs.ErrNotFound, "process is stopped") + } + + return p.Kill(ctx, signal, all) +} + +func (t *task) ResizePty(ctx context.Context, size runtime.ConsoleSize) error { + p := t.getProcess(t.id) + if p == nil { + return errors.Wrap(errdefs.ErrFailedPrecondition, "task not started") + } + + return p.ResizePty(ctx, size) +} + +func (t *task) CloseIO(ctx context.Context) error { + p := t.getProcess(t.id) + if p == nil { + return errors.Wrap(errdefs.ErrFailedPrecondition, "task not started") + } + + return p.hcs.CloseStdin() +} + +func (t *task) Info() runtime.TaskInfo { + return runtime.TaskInfo{ + ID: t.id, + Runtime: pluginID, + Namespace: t.namespace, + // TODO(mlaventure): what about Spec? I think this could be removed from the info, the id is enough since it matches the one from the container + } +} + +func (t *task) Start(ctx context.Context) error { + conf := newProcessConfig(t.initSpec, t.io) + if _, err := t.newProcess(ctx, t.id, conf, t.io); err != nil { + return err + } + + t.emitter.Post(events.WithTopic(ctx, "/tasks/start"), &eventsapi.TaskStart{ + ContainerID: t.id, + Pid: t.pid, + }) + + return nil +} + +func (t *task) Pause(ctx context.Context) error { + if t.hyperV { + err := t.hcsContainer.Pause() + if err == nil { + t.Lock() + t.status = runtime.PausedStatus + t.Unlock() + } + if err == nil { + t.emitter.Post(events.WithTopic(ctx, "/tasks/paused"), &eventsapi.TaskPaused{ + ContainerID: t.id, + }) + } + return errors.Wrap(err, "hcsshim failed to pause task") + } + + return errors.Wrap(errdefs.ErrFailedPrecondition, "not an hyperV task") +} + +func (t *task) Resume(ctx context.Context) error { + if t.hyperV { + err := t.hcsContainer.Resume() + if err == nil { + t.Lock() + t.status = runtime.RunningStatus + t.Unlock() + } + if err == nil { + t.emitter.Post(events.WithTopic(ctx, "/tasks/resumed"), &eventsapi.TaskResumed{ + ContainerID: t.id, + }) + } + return errors.Wrap(err, "hcsshim failed to resume task") + } + + return errors.Wrap(errdefs.ErrFailedPrecondition, "not an hyperV task") +} + +func (t *task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runtime.Process, error) { + if p := t.getProcess(t.id); p == nil { + return nil, errors.Wrap(errdefs.ErrFailedPrecondition, "task not started") + } + + if p := t.getProcess(id); p != nil { + return nil, errors.Wrap(errdefs.ErrAlreadyExists, "id already in use") + } + + s, err := typeurl.UnmarshalAny(opts.Spec) + if err != nil { + return nil, err + } + spec := s.(*specs.Process) + if spec.Cwd == "" { + spec.Cwd = t.initSpec.Cwd + } + + var pset *pipeSet + if pset, err = newPipeSet(ctx, opts.IO); err != nil { + return nil, err + } + defer func() { + if err != nil { + pset.Close() + } + }() + + conf := newProcessConfig(spec, pset) + p, err := t.newProcess(ctx, id, conf, pset) + if err != nil { + return nil, err + } + + t.emitter.Post(events.WithTopic(ctx, "/tasks/exec-added"), &eventsapi.TaskExecAdded{ + ContainerID: t.id, + ExecID: id, + Pid: p.Pid(), + }) + + return p, nil +} + +func (t *task) Pids(ctx context.Context) ([]uint32, error) { + t.Lock() + defer t.Unlock() + + var ( + pids = make([]uint32, len(t.processes)) + idx = 0 + ) + for _, p := range t.processes { + pids[idx] = p.Pid() + idx++ + } + + return pids, nil +} + +func (t *task) Checkpoint(_ context.Context, _ string, _ *types.Any) error { + return errors.Wrap(errdefs.ErrUnavailable, "not supported") +} + +func (t *task) DeleteProcess(ctx context.Context, id string) (*runtime.Exit, error) { + if id == t.id { + return nil, errors.Wrapf(errdefs.ErrInvalidArgument, + "cannot delete init process") + } + if p := t.getProcess(id); p != nil { + ec, ea, err := p.ExitCode() + if err != nil { + return nil, err + } + t.removeProcess(id) + return &runtime.Exit{ + Pid: p.pid, + Status: ec, + Timestamp: ea, + }, nil + } + return nil, errors.Wrapf(errdefs.ErrNotFound, "no such process %s", id) +} + +func (t *task) Update(ctx context.Context, resources *types.Any) error { + return errors.Wrap(errdefs.ErrUnavailable, "not supported") +} + +func (t *task) Process(ctx context.Context, id string) (p runtime.Process, err error) { + p = t.getProcess(id) + if p == nil { + err = errors.Wrapf(errdefs.ErrNotFound, "no such process %d", id) + } + + return p, err +} + +func (t *task) newProcess(ctx context.Context, id string, conf *hcsshim.ProcessConfig, pset *pipeSet) (*process, error) { + var ( + err error + pid uint32 + ) + + // If we fail, close the io right now + defer func() { + if err != nil { + pset.Close() + } + }() + + t.Lock() + if len(t.processes) == 0 { + pid = t.pid + } else { + if pid, err = t.pidPool.Get(); err != nil { + t.Unlock() + return nil, err + } + defer func() { + if err != nil { + t.pidPool.Put(pid) + } + }() + } + t.Unlock() + + var p hcsshim.Process + if p, err = t.hcsContainer.CreateProcess(conf); err != nil { + return nil, errors.Wrapf(err, "failed to create process") + } + + stdin, stdout, stderr, err := p.Stdio() + if err != nil { + p.Kill() + return nil, errors.Wrapf(err, "failed to retrieve init process stdio") + } + + ioCopy := func(name string, dst io.WriteCloser, src io.ReadCloser) { + log.G(ctx).WithFields(logrus.Fields{"id": id, "pid": pid}). + Debugf("%s: copy started", name) + io.Copy(dst, src) + log.G(ctx).WithFields(logrus.Fields{"id": id, "pid": pid}). + Debugf("%s: copy done", name) + dst.Close() + src.Close() + } + + if pset.stdin != nil { + go ioCopy("stdin", stdin, pset.stdin) + } + + if pset.stdout != nil { + go ioCopy("stdout", pset.stdout, stdout) + } + + if pset.stderr != nil { + go ioCopy("stderr", pset.stderr, stderr) + } + + t.Lock() + wp := &process{ + id: id, + pid: pid, + io: pset, + status: runtime.RunningStatus, + task: t, + hcs: p, + exitCh: make(chan struct{}), + } + t.processes[id] = wp + t.Unlock() + + // Wait for the process to exit to get the exit status + go func() { + if err := p.Wait(); err != nil { + herr, ok := err.(*hcsshim.ProcessError) + if ok && herr.Err != syscall.ERROR_BROKEN_PIPE { + log.G(ctx). + WithError(err). + WithFields(logrus.Fields{"id": id, "pid": pid}). + Warnf("hcsshim wait failed (process may have been killed)") + } + // Try to get the exit code nonetheless + } + wp.exitTime = time.Now() + + ec, err := p.ExitCode() + if err != nil { + log.G(ctx). + WithError(err). + WithFields(logrus.Fields{"id": id, "pid": pid}). + Warnf("hcsshim could not retrieve exit code") + // Use the unknown exit code + ec = 255 + } + wp.exitCode = uint32(ec) + + t.emitter.Post(events.WithTopic(ctx, "/tasks/exit"), &eventsapi.TaskExit{ + ContainerID: t.id, + ID: id, + Pid: pid, + ExitStatus: wp.exitCode, + ExitedAt: wp.exitTime, + }) + + close(wp.exitCh) + // Ensure io's are closed + pset.Close() + // Cleanup HCS resources + p.Close() + }() + + return wp, nil +} + +func (t *task) getProcess(id string) *process { + t.Lock() + p := t.processes[id] + t.Unlock() + + return p +} + +func (t *task) removeProcessNL(id string) { + if p, ok := t.processes[id]; ok { + if p.io != nil { + p.io.Close() + } + t.pidPool.Put(p.pid) + delete(t.processes, id) + } +} + +func (t *task) removeProcess(id string) { + t.Lock() + t.removeProcessNL(id) + t.Unlock() +} + +func (t *task) getStatus() runtime.Status { + t.Lock() + status := t.status + t.Unlock() + + return status +} + +// stop tries to shutdown the task. +// It will do so by first calling Shutdown on the hcsshim.Container and if +// that fails, by resorting to caling Terminate +func (t *task) stop(ctx context.Context) error { + if err := t.hcsStop(ctx, t.hcsContainer.Shutdown); err != nil { + return t.hcsStop(ctx, t.hcsContainer.Terminate) + } + t.hcsContainer.Close() + return nil +} + +func (t *task) hcsStop(ctx context.Context, stop func() error) error { + err := stop() + switch { + case hcsshim.IsPending(err): + err = t.hcsContainer.WaitTimeout(t.terminateDuration) + case hcsshim.IsAlreadyStopped(err): + err = nil + } + return err +} + +func (t *task) cleanup() { + t.Lock() + for _, p := range t.processes { + t.removeProcessNL(p.id) + } + removeLayer(context.Background(), t.rootfs[len(t.rootfs)-1].Source) + t.Unlock() +}