Refactor windows runtime

Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com>
This commit is contained in:
Kenfe-Mickael Laventure 2017-04-10 10:01:14 -07:00
parent 15eddd0ce2
commit 22a051c88e
8 changed files with 532 additions and 493 deletions

View File

@ -42,6 +42,19 @@ func spec(id string, config *ocispec.ImageConfig, context *cli.Context) *specs.S
cwd = `C:\` cwd = `C:\`
} }
// Some sane defaults for console
w := 80
h := 20
if tty {
con := console.Current()
size, err := con.Size()
if err == nil {
w = int(size.Width)
h = int(size.Height)
}
}
return &specs.Spec{ return &specs.Spec{
Version: specs.Version, Version: specs.Version,
Platform: specs.Platform{ Platform: specs.Platform{
@ -60,8 +73,8 @@ func spec(id string, config *ocispec.ImageConfig, context *cli.Context) *specs.S
Username: config.User, Username: config.User,
}, },
ConsoleSize: specs.Box{ ConsoleSize: specs.Box{
Height: 20, Height: uint(w),
Width: 80, Width: uint(h),
}, },
}, },
Hostname: id, Hostname: id,

View File

@ -7,146 +7,149 @@ import (
"sync" "sync"
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/windows/hcs" "github.com/containerd/containerd/windows/hcs"
specs "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors" "github.com/pkg/errors"
"golang.org/x/net/context" "golang.org/x/net/context"
winsys "golang.org/x/sys/windows"
) )
var ( var (
ErrLoadedContainer = errors.New("loaded container can only be terminated") ErrLoadedContainer = errors.New("loaded container can only be terminated")
) )
type State struct {
pid uint32
status containerd.Status
}
func (s State) Pid() uint32 {
return s.pid
}
func (s State) Status() containerd.Status {
return s.status
}
type eventCallback func(id string, evType containerd.EventType, pid, exitStatus uint32) type eventCallback func(id string, evType containerd.EventType, pid, exitStatus uint32)
func loadContainers(ctx context.Context, rootDir string) ([]*container, error) { func loadContainers(ctx context.Context, h *hcs.HCS, sendEvent eventCallback) ([]*container, error) {
hcs, err := hcs.LoadAll(ctx, owner, rootDir) hCtr, err := h.LoadContainers(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
containers := make([]*container, 0) containers := make([]*container, 0)
for id, h := range hcs { for _, c := range hCtr {
containers = append(containers, &container{ containers = append(containers, &container{
id: id, ctr: c,
status: containerd.RunningStatus, status: containerd.RunningStatus,
hcs: h, sendEvent: sendEvent,
}) })
} }
return containers, nil return containers, nil
} }
func newContainer(id, rootDir string, pid uint32, spec RuntimeSpec, io containerd.IO, sendEvent eventCallback) (*container, error) { func newContainer(ctx context.Context, h *hcs.HCS, id string, spec RuntimeSpec, io containerd.IO, sendEvent eventCallback) (*container, error) {
hcs, err := hcs.New(rootDir, owner, id, spec.OCISpec, spec.Configuration, io) cio, err := hcs.NewIO(io.Stdin, io.Stdout, io.Stderr, io.Terminal)
if err != nil { if err != nil {
return nil, err return nil, err
} }
hcsCtr, err := h.CreateContainer(ctx, id, spec.OCISpec, spec.Configuration, cio)
if err != nil {
return nil, err
}
sendEvent(id, containerd.CreateEvent, hcsCtr.Pid(), 0)
return &container{ return &container{
runtimePid: pid, ctr: hcsCtr,
id: id,
hcs: hcs,
status: containerd.CreatedStatus, status: containerd.CreatedStatus,
ecSync: make(chan struct{}),
sendEvent: sendEvent, sendEvent: sendEvent,
}, nil }, nil
} }
// container implements both containerd.Container and containerd.State
type container struct { type container struct {
sync.Mutex sync.Mutex
runtimePid uint32 ctr *hcs.Container
id string
hcs *hcs.HCS
status containerd.Status status containerd.Status
sendEvent eventCallback
ec uint32
ecErr error
ecSync chan struct{}
sendEvent func(id string, evType containerd.EventType, pid, exitStatus uint32)
} }
func (c *container) Info() containerd.ContainerInfo { func (c *container) Info() containerd.ContainerInfo {
return containerd.ContainerInfo{ return containerd.ContainerInfo{
ID: c.id, ID: c.ctr.ID(),
Runtime: runtimeName, Runtime: runtimeName,
} }
} }
func (c *container) Start(ctx context.Context) error { func (c *container) Start(ctx context.Context) error {
if c.runtimePid == 0 { if c.ctr.Pid() == 0 {
return ErrLoadedContainer return ErrLoadedContainer
} }
err := c.hcs.Start(ctx, false) err := c.ctr.Start(ctx)
if err != nil { if err != nil {
c.hcs.Terminate(ctx)
c.sendEvent(c.id, containerd.ExitEvent, c.runtimePid, 255)
return err return err
} }
c.setStatus(containerd.RunningStatus) c.setStatus(containerd.RunningStatus)
c.sendEvent(c.id, containerd.StartEvent, c.runtimePid, 0) c.sendEvent(c.ctr.ID(), containerd.StartEvent, c.ctr.Pid(), 0)
// Wait for our process to terminate // Wait for our process to terminate
go func() { go func() {
c.ec, c.ecErr = c.hcs.ExitCode(context.Background()) ec, err := c.ctr.ExitCode()
if err != nil {
log.G(ctx).Debug(err)
}
c.setStatus(containerd.StoppedStatus) c.setStatus(containerd.StoppedStatus)
c.sendEvent(c.id, containerd.ExitEvent, c.runtimePid, c.ec) c.sendEvent(c.ctr.ID(), containerd.ExitEvent, c.ctr.Pid(), ec)
close(c.ecSync)
}() }()
return nil return nil
} }
func (c *container) State(ctx context.Context) (containerd.State, error) { func (c *container) State(ctx context.Context) (containerd.State, error) {
return &State{ return c, nil
pid: c.runtimePid,
status: c.getStatus(),
}, nil
} }
func (c *container) Kill(ctx context.Context, signal uint32, all bool) error { func (c *container) Kill(ctx context.Context, signal uint32, all bool) error {
return c.hcs.Terminate(ctx) if winsys.Signal(signal) == winsys.SIGKILL {
return c.ctr.Kill(ctx)
}
return c.ctr.Stop(ctx)
} }
func (c *container) Exec(ctx context.Context, opts containerd.ExecOpts) (containerd.Process, error) { func (c *container) Exec(ctx context.Context, opts containerd.ExecOpts) (containerd.Process, error) {
if c.runtimePid == 0 { if c.ctr.Pid() == 0 {
return nil, ErrLoadedContainer return nil, ErrLoadedContainer
} }
pio, err := hcs.NewIO(opts.IO.Stdin, opts.IO.Stdout, opts.IO.Stderr, opts.IO.Terminal)
if err != nil {
return nil, err
}
var procSpec specs.Process var procSpec specs.Process
if err := json.Unmarshal(opts.Spec, &procSpec); err != nil { if err := json.Unmarshal(opts.Spec, &procSpec); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal oci spec") return nil, errors.Wrap(err, "failed to unmarshal oci spec")
} }
p, err := c.hcs.Exec(ctx, procSpec, opts.IO) p, err := c.ctr.AddProcess(ctx, procSpec, pio)
if err != nil { if err != nil {
return nil, err return nil, err
} }
go func() { go func() {
ec, _ := p.ExitCode() ec, err := p.ExitCode()
c.sendEvent(c.id, containerd.ExitEvent, p.Pid(), ec) if err != nil {
log.G(ctx).Debug(err)
}
c.sendEvent(c.ctr.ID(), containerd.ExitEvent, p.Pid(), ec)
}() }()
return &process{p}, nil return &process{p}, nil
} }
func (c *container) Status() containerd.Status {
return c.getStatus()
}
func (c *container) Pid() uint32 {
return c.ctr.Pid()
}
func (c *container) setStatus(status containerd.Status) { func (c *container) setStatus(status containerd.Status) {
c.Lock() c.Lock()
c.status = status c.status = status
@ -158,44 +161,3 @@ func (c *container) getStatus() containerd.Status {
defer c.Unlock() defer c.Unlock()
return c.status return c.status
} }
func (c *container) exitCode(ctx context.Context) (uint32, error) {
if c.runtimePid == 0 {
return 255, ErrLoadedContainer
}
<-c.ecSync
return c.ec, c.ecErr
}
func (c *container) remove(ctx context.Context) error {
return c.hcs.Remove(ctx)
}
func (c *container) getRuntimePid() uint32 {
return c.runtimePid
}
type process struct {
p *hcs.Process
}
func (p *process) State(ctx context.Context) (containerd.State, error) {
return &processState{p.p}, nil
}
func (p *process) Kill(ctx context.Context, sig uint32, all bool) error {
return p.p.Kill()
}
type processState struct {
p *hcs.Process
}
func (s *processState) Status() containerd.Status {
return s.p.Status()
}
func (s *processState) Pid() uint32 {
return s.p.Pid()
}

View File

@ -4,21 +4,19 @@ package hcs
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"strings" "strings"
"syscall" "sync"
"time" "time"
"github.com/Microsoft/hcsshim" "github.com/Microsoft/hcsshim"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/containerd/containerd"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/windows/pid"
"github.com/opencontainers/runtime-spec/specs-go" "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -28,13 +26,13 @@ const (
defaultTerminateTimeout = 5 * time.Minute defaultTerminateTimeout = 5 * time.Minute
) )
func LoadAll(ctx context.Context, owner, rootDir string) (map[string]*HCS, error) { func (s *HCS) LoadContainers(ctx context.Context) ([]*Container, error) {
ctrProps, err := hcsshim.GetContainers(hcsshim.ComputeSystemQuery{}) ctrProps, err := hcsshim.GetContainers(hcsshim.ComputeSystemQuery{})
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to retrieve running containers") return nil, errors.Wrap(err, "failed to retrieve running containers")
} }
containers := make(map[string]*HCS) containers := make([]*Container, 0)
for _, p := range ctrProps { for _, p := range ctrProps {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -42,336 +40,357 @@ func LoadAll(ctx context.Context, owner, rootDir string) (map[string]*HCS, error
default: default:
} }
if p.Owner != owner || p.SystemType != "Container" { if p.Owner != s.owner || p.SystemType != "Container" {
continue continue
} }
// TODO: take context in account
container, err := hcsshim.OpenContainer(p.ID) container, err := hcsshim.OpenContainer(p.ID)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "failed open container %s", p.ID) return nil, errors.Wrapf(err, "failed open container %s", p.ID)
} }
stateDir := filepath.Join(rootDir, p.ID) stateDir := filepath.Join(s.stateDir, p.ID)
b, err := ioutil.ReadFile(filepath.Join(stateDir, layerFile)) b, err := ioutil.ReadFile(filepath.Join(stateDir, layerFile))
containers[p.ID] = &HCS{ containers = append(containers, &Container{
id: p.ID, id: p.ID,
container: container, Container: container,
stateDir: stateDir, stateDir: stateDir,
hcs: s,
io: &IO{},
layerFolderPath: string(b), layerFolderPath: string(b),
conf: Configuration{ conf: Configuration{
TerminateDuration: defaultTerminateTimeout, TerminateDuration: defaultTerminateTimeout,
}, },
} })
} }
return containers, nil return containers, nil
} }
// New creates a new container (but doesn't start) it. func New(owner, rootDir string) *HCS {
func New(rootDir, owner, containerID string, spec specs.Spec, conf Configuration, cio containerd.IO) (*HCS, error) { return &HCS{
stateDir := filepath.Join(rootDir, containerID) stateDir: rootDir,
if err := os.MkdirAll(stateDir, 0755); err != nil {
return nil, errors.Wrapf(err, "unable to create container state dir %s", stateDir)
}
if conf.TerminateDuration == 0 {
conf.TerminateDuration = defaultTerminateTimeout
}
h := &HCS{
stateDir: stateDir,
owner: owner, owner: owner,
id: containerID, pidPool: pid.NewPool(),
spec: spec,
conf: conf,
} }
sio, err := newSIO(cio)
if err != nil {
return nil, err
}
h.io = sio
runtime.SetFinalizer(sio, func(s *shimIO) {
s.Close()
})
hcsConf, err := h.newHCSConfiguration()
if err != nil {
return nil, err
}
ctr, err := hcsshim.CreateContainer(containerID, hcsConf)
if err != nil {
removeLayer(context.TODO(), hcsConf.LayerFolderPath)
return nil, err
}
h.container = ctr
h.layerFolderPath = hcsConf.LayerFolderPath
return h, nil
} }
type HCS struct { type HCS struct {
stateDir string stateDir string
owner string owner string
id string pidPool *pid.Pool
spec specs.Spec
conf Configuration
io *shimIO
container hcsshim.Container
initProcess hcsshim.Process
layerFolderPath string
} }
// Start starts the associated container and instantiate the init func (s *HCS) CreateContainer(ctx context.Context, id string, spec specs.Spec, conf Configuration, io *IO) (c *Container, err error) {
// process within it. pid, err := s.pidPool.Get()
func (s *HCS) Start(ctx context.Context, servicing bool) error {
if s.initProcess != nil {
return errors.New("init process already started")
}
if err := s.container.Start(); err != nil {
if err := s.Terminate(ctx); err != nil {
log.G(ctx).WithError(err).Errorf("failed to terminate container %s", s.id)
}
return err
}
proc, err := s.newProcess(ctx, s.io, s.spec.Process)
if err != nil {
s.Terminate(ctx)
return err
}
s.initProcess = proc
return nil
}
// Pid returns the pid of the container init process
func (s *HCS) Pid() int {
return s.initProcess.Pid()
}
// ExitCode waits for the container to exit and return the exit code
// of the init process
func (s *HCS) ExitCode(ctx context.Context) (uint32, error) {
// TODO: handle a context cancellation
if err := s.initProcess.Wait(); err != nil {
if herr, ok := err.(*hcsshim.ProcessError); ok && herr.Err != syscall.ERROR_BROKEN_PIPE {
return 255, errors.Wrapf(err, "failed to wait for container '%s' init process", s.id)
}
// container is probably dead, let's try to get its exit code
}
ec, err := s.initProcess.ExitCode()
if err != nil {
if herr, ok := err.(*hcsshim.ProcessError); ok && herr.Err != syscall.ERROR_BROKEN_PIPE {
return 255, errors.Wrapf(err, "failed to get container '%s' init process exit code", s.id)
}
// Well, unknown exit code it is
ec = 255
}
return uint32(ec), err
}
// Exec starts a new process within the container
func (s *HCS) Exec(ctx context.Context, procSpec specs.Process, io containerd.IO) (*Process, error) {
sio, err := newSIO(io)
if err != nil { if err != nil {
return nil, err return nil, err
} }
p, err := s.newProcess(ctx, sio, procSpec) defer func() {
if err != nil {
s.pidPool.Put(pid)
}
}()
stateDir := filepath.Join(s.stateDir, id)
if err := os.MkdirAll(stateDir, 0755); err != nil {
return nil, errors.Wrapf(err, "unable to create container state dir %s", stateDir)
}
defer func() {
if err != nil {
os.RemoveAll(stateDir)
}
}()
if conf.TerminateDuration == 0 {
conf.TerminateDuration = defaultTerminateTimeout
}
ctrConf, err := newContainerConfig(s.owner, id, spec, conf)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Process{ layerPathFile := filepath.Join(stateDir, layerFile)
containerID: s.id, if err := ioutil.WriteFile(layerPathFile, []byte(ctrConf.LayerFolderPath), 0644); err != nil {
p: p, log.G(ctx).WithError(err).Warnf("failed to save active layer %s", ctrConf.LayerFolderPath)
status: containerd.RunningStatus, }
ctr, err := hcsshim.CreateContainer(id, ctrConf)
if err != nil {
removeLayer(ctx, ctrConf.LayerFolderPath)
return nil, errors.Wrapf(err, "failed to create container %s", id)
}
err = ctr.Start()
if err != nil {
ctr.Terminate()
removeLayer(ctx, ctrConf.LayerFolderPath)
return nil, errors.Wrapf(err, "failed to start container %s", id)
}
return &Container{
Container: ctr,
id: id,
pid: pid,
spec: spec,
conf: conf,
stateDir: stateDir,
io: io,
hcs: s,
layerFolderPath: ctrConf.LayerFolderPath,
processes: make([]*Process, 0),
}, nil }, nil
} }
// newProcess create a new process within a running container. This is type Container struct {
// used to create both the init process and subsequent 'exec' sync.Mutex
// processes. hcsshim.Container
func (s *HCS) newProcess(ctx context.Context, sio *shimIO, procSpec specs.Process) (hcsshim.Process, error) {
conf := hcsshim.ProcessConfig{ id string
EmulateConsole: sio.terminal, stateDir string
CreateStdInPipe: sio.stdin != nil, pid uint32
CreateStdOutPipe: sio.stdout != nil, spec specs.Spec
CreateStdErrPipe: sio.stderr != nil, conf Configuration
User: procSpec.User.Username, io *IO
CommandLine: strings.Join(procSpec.Args, " "), hcs *HCS
Environment: ociSpecEnvToHCSEnv(procSpec.Env), layerFolderPath string
WorkingDirectory: procSpec.Cwd,
processes []*Process
}
func (c *Container) ID() string {
return c.id
}
func (c *Container) Pid() uint32 {
return c.pid
}
func (c *Container) Start(ctx context.Context) error {
_, err := c.addProcess(ctx, c.spec.Process, c.io)
return err
}
func (c *Container) getDeathErr(err error) error {
switch {
case hcsshim.IsPending(err):
err = c.WaitTimeout(c.conf.TerminateDuration)
case hcsshim.IsAlreadyStopped(err):
err = nil
}
return err
}
func (c *Container) Kill(ctx context.Context) error {
return c.getDeathErr(c.Terminate())
}
func (c *Container) Stop(ctx context.Context) error {
err := c.getDeathErr(c.Shutdown())
if err != nil {
log.G(ctx).WithError(err).Debugf("failed to shutdown container %s, calling terminate", c.id)
return c.getDeathErr(c.Terminate())
}
return nil
}
func (c *Container) Delete(ctx context.Context) {
defer func() {
if err := c.Stop(ctx); err != nil {
log.G(ctx).WithError(err).WithField("id", c.id).
Errorf("failed to shutdown/terminate container")
}
c.Lock()
for _, p := range c.processes {
if err := p.Delete(); err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{"pid": p.Pid(), "id": c.id}).
Errorf("failed to clean process resources")
}
}
c.Unlock()
if err := c.Close(); err != nil {
log.G(ctx).WithError(err).WithField("id", c.id).Errorf("failed to clean container resources")
}
c.io.Close()
// Cleanup folder layer
if err := removeLayer(ctx, c.layerFolderPath); err == nil {
os.RemoveAll(c.stateDir)
}
}()
if update, err := c.HasPendingUpdates(); err != nil || !update {
return
}
serviceCtr, err := c.hcs.CreateContainer(ctx, c.id+"_servicing", c.spec, c.conf, &IO{})
if err != nil {
log.G(ctx).WithError(err).WithField("id", c.id).Warn("could not create servicing container")
return
}
defer serviceCtr.Close()
err = serviceCtr.Start(ctx)
if err != nil {
log.G(ctx).WithError(err).WithField("id", c.id).Warn("failed to start servicing container")
serviceCtr.Terminate()
return
}
err = serviceCtr.processes[0].Wait()
if err == nil {
_, err = serviceCtr.processes[0].ExitCode()
log.G(ctx).WithError(err).WithField("id", c.id).Errorf("failed to retrieve servicing container exit code")
}
if err != nil {
if err := serviceCtr.Terminate(); err != nil {
log.G(ctx).WithError(err).WithField("id", c.id).Errorf("failed to terminate servicing container")
}
}
}
func (c *Container) ExitCode() (uint32, error) {
if len(c.processes) == 0 {
return 255, errors.New("container not started")
}
return c.processes[0].ExitCode()
}
func (c *Container) AddProcess(ctx context.Context, spec specs.Process, io *IO) (*Process, error) {
if len(c.processes) == 0 {
return nil, errors.New("container not started")
}
return c.addProcess(ctx, spec, io)
}
func (c *Container) addProcess(ctx context.Context, spec specs.Process, pio *IO) (*Process, error) {
// If we don't have a process yet, reused the container pid
var pid uint32
if len(c.processes) == 0 {
pid = c.pid
} else {
pid, err := c.hcs.pidPool.Get()
if err != nil {
return nil, err
}
defer func() {
if err != nil {
c.hcs.pidPool.Put(pid)
}
}()
}
conf := hcsshim.ProcessConfig{
EmulateConsole: pio.terminal,
CreateStdInPipe: pio.stdin != nil,
CreateStdOutPipe: pio.stdout != nil,
CreateStdErrPipe: pio.stderr != nil,
User: spec.User.Username,
CommandLine: strings.Join(spec.Args, " "),
Environment: ociSpecEnvToHCSEnv(spec.Env),
WorkingDirectory: spec.Cwd,
ConsoleSize: [2]uint{spec.ConsoleSize.Height, spec.ConsoleSize.Width},
} }
conf.ConsoleSize[0] = procSpec.ConsoleSize.Height
conf.ConsoleSize[1] = procSpec.ConsoleSize.Width
if conf.WorkingDirectory == "" { if conf.WorkingDirectory == "" {
conf.WorkingDirectory = s.spec.Process.Cwd conf.WorkingDirectory = c.spec.Process.Cwd
} }
proc, err := s.container.CreateProcess(&conf) proc, err := c.CreateProcess(&conf)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "failed to create process with conf %#v", conf) return nil, errors.Wrapf(err, "failed to create process")
} }
pid := proc.Pid()
stdin, stdout, stderr, err := proc.Stdio() stdin, stdout, stderr, err := proc.Stdio()
if err != nil { if err != nil {
s.Terminate(ctx) proc.Kill()
return nil, err return nil, errors.Wrapf(err, "failed to retrieve process stdio")
} }
if sio.stdin != nil { if pio.stdin != nil {
go func() { go func() {
log.G(ctx).WithField("pid", pid).Debug("stdin: copy started") log.G(ctx).WithFields(logrus.Fields{"id": c.id, "pid": pid}).Debug("stdin: copy started")
io.Copy(stdin, sio.stdin) io.Copy(stdin, pio.stdin)
log.G(ctx).WithField("pid", pid).Debug("stdin: copy done") log.G(ctx).WithFields(logrus.Fields{"id": c.id, "pid": pid}).Debug("stdin: copy done")
stdin.Close() stdin.Close()
sio.stdin.Close() pio.stdin.Close()
}() }()
} else { } else {
proc.CloseStdin() proc.CloseStdin()
} }
if sio.stdout != nil { if pio.stdout != nil {
go func() { go func() {
log.G(ctx).WithField("pid", pid).Debug("stdout: copy started") log.G(ctx).WithFields(logrus.Fields{"id": c.id, "pid": pid}).Debug("stdout: copy started")
io.Copy(sio.stdout, stdout) io.Copy(pio.stdout, stdout)
log.G(ctx).WithField("pid", pid).Debug("stdout: copy done") log.G(ctx).WithFields(logrus.Fields{"id": c.id, "pid": pid}).Debug("stdout: copy done")
stdout.Close() stdout.Close()
sio.stdout.Close() pio.stdout.Close()
}() }()
} }
if sio.stderr != nil { if pio.stderr != nil {
go func() { go func() {
log.G(ctx).WithField("pid", pid).Debug("stderr: copy started") log.G(ctx).WithFields(logrus.Fields{"id": c.id, "pid": pid}).Debug("stderr: copy started")
io.Copy(sio.stderr, stderr) io.Copy(pio.stderr, stderr)
log.G(ctx).WithField("pid", pid).Debug("stderr: copy done") log.G(ctx).WithFields(logrus.Fields{"id": c.id, "pid": pid}).Debug("stderr: copy done")
stderr.Close() stderr.Close()
sio.stderr.Close() pio.stderr.Close()
}() }()
} }
return proc, nil p := &Process{
} Process: proc,
pid: pid,
// Terminate stop a running container. io: pio,
func (s *HCS) Terminate(ctx context.Context) error { ecSync: make(chan struct{}),
err := s.container.Terminate()
switch {
case hcsshim.IsPending(err):
// TODO: take the context into account
err = s.container.WaitTimeout(s.conf.TerminateDuration)
case hcsshim.IsAlreadyStopped(err):
err = nil
} }
return err c.Lock()
} c.processes = append(c.processes, p)
idx := len(c.processes) - 1
c.Unlock()
func (s *HCS) Shutdown(ctx context.Context) error { go func() {
err := s.container.Shutdown() p.ec, p.ecErr = processExitCode(c.ID(), p)
switch { close(p.ecSync)
case hcsshim.IsPending(err): c.Lock()
// TODO: take the context into account p.Delete()
err = s.container.WaitTimeout(s.conf.TerminateDuration) // Remove process from slice (but keep the init one around)
case hcsshim.IsAlreadyStopped(err): if idx > 0 {
err = nil c.processes[idx] = c.processes[len(c.processes)-1]
} c.processes[len(c.processes)-1] = nil
c.processes = c.processes[:len(c.processes)-1]
if err != nil {
log.G(ctx).WithError(err).Debugf("failed to shutdown container %s, calling terminate", s.id)
return s.Terminate(ctx)
}
return nil
}
// Remove start a servicing container if needed then cleanup the container
// resources
func (s *HCS) Remove(ctx context.Context) error {
defer func() {
if err := s.Shutdown(ctx); err != nil {
log.G(ctx).WithError(err).WithField("id", s.id).
Errorf("failed to shutdown/terminate container")
}
if s.initProcess != nil {
if err := s.initProcess.Close(); err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{"pid": s.Pid(), "id": s.id}).
Errorf("failed to clean init process resources")
}
}
if err := s.container.Close(); err != nil {
log.G(ctx).WithError(err).WithField("id", s.id).Errorf("failed to clean container resources")
}
// Cleanup folder layer
if err := removeLayer(ctx, s.layerFolderPath); err == nil {
os.RemoveAll(s.stateDir)
} }
c.Unlock()
}() }()
if update, err := s.container.HasPendingUpdates(); err != nil || !update { return p, nil
return nil
}
// TODO: take the context into account
serviceHCS, err := New(s.stateDir, s.owner, s.id+"_servicing", s.spec, s.conf, containerd.IO{})
if err != nil {
log.G(ctx).WithError(err).WithField("id", s.id).Warn("could not create servicing container")
return nil
}
defer serviceHCS.container.Close()
err = serviceHCS.Start(ctx, true)
if err != nil {
if err := serviceHCS.Terminate(ctx); err != nil {
log.G(ctx).WithError(err).WithField("id", s.id).Errorf("failed to terminate servicing container for %s")
}
log.G(ctx).WithError(err).WithField("id", s.id).Errorf("failed to start servicing container")
return nil
}
// wait for the container to exit
_, err = serviceHCS.ExitCode(ctx)
if err != nil {
if err := serviceHCS.Terminate(ctx); err != nil {
log.G(ctx).WithError(err).WithField("id", s.id).Errorf("failed to terminate servicing container for %s")
}
log.G(ctx).WithError(err).WithField("id", s.id).Errorf("failed to get servicing container exit code")
}
serviceHCS.container.WaitTimeout(s.conf.TerminateDuration)
return nil
} }
// newHCSConfiguration generates a hcsshim configuration from the instance // newHCSConfiguration generates a hcsshim configuration from the instance
// OCI Spec and hcs.Configuration. // OCI Spec and hcs.Configuration.
func (s *HCS) newHCSConfiguration() (*hcsshim.ContainerConfig, error) { func newContainerConfig(owner, id string, spec specs.Spec, conf Configuration) (*hcsshim.ContainerConfig, error) {
configuration := &hcsshim.ContainerConfig{ configuration := &hcsshim.ContainerConfig{
SystemType: "Container", SystemType: "Container",
Name: s.id, Name: id,
Owner: s.owner, Owner: owner,
HostName: s.spec.Hostname, HostName: spec.Hostname,
IgnoreFlushesDuringBoot: s.conf.IgnoreFlushesDuringBoot, IgnoreFlushesDuringBoot: conf.IgnoreFlushesDuringBoot,
HvPartition: s.conf.UseHyperV, HvPartition: conf.UseHyperV,
AllowUnqualifiedDNSQuery: s.conf.AllowUnqualifiedDNSQuery, AllowUnqualifiedDNSQuery: conf.AllowUnqualifiedDNSQuery,
EndpointList: s.conf.NetworkEndpoints, EndpointList: conf.NetworkEndpoints,
NetworkSharedContainerName: s.conf.NetworkSharedContainerID, NetworkSharedContainerName: conf.NetworkSharedContainerID,
Credentials: s.conf.Credentials, Credentials: conf.Credentials,
} }
// TODO: use the create request Mount for those // TODO: use the create request Mount for those
for _, layerPath := range s.conf.Layers { for _, layerPath := range conf.Layers {
_, filename := filepath.Split(layerPath) _, filename := filepath.Split(layerPath)
guid, err := hcsshim.NameToGuid(filename) guid, err := hcsshim.NameToGuid(filename)
if err != nil { if err != nil {
@ -383,9 +402,9 @@ func (s *HCS) newHCSConfiguration() (*hcsshim.ContainerConfig, error) {
}) })
} }
if len(s.spec.Mounts) > 0 { if len(spec.Mounts) > 0 {
mds := make([]hcsshim.MappedDir, len(s.spec.Mounts)) mds := make([]hcsshim.MappedDir, len(spec.Mounts))
for i, mount := range s.spec.Mounts { for i, mount := range spec.Mounts {
mds[i] = hcsshim.MappedDir{ mds[i] = hcsshim.MappedDir{
HostPath: mount.Source, HostPath: mount.Source,
ContainerPath: mount.Destination, ContainerPath: mount.Destination,
@ -400,12 +419,12 @@ func (s *HCS) newHCSConfiguration() (*hcsshim.ContainerConfig, error) {
configuration.MappedDirectories = mds configuration.MappedDirectories = mds
} }
if s.conf.DNSSearchList != nil { if conf.DNSSearchList != nil {
configuration.DNSSearchList = strings.Join(s.conf.DNSSearchList, ",") configuration.DNSSearchList = strings.Join(conf.DNSSearchList, ",")
} }
if configuration.HvPartition { if configuration.HvPartition {
for _, layerPath := range s.conf.Layers { for _, layerPath := range conf.Layers {
utilityVMPath := filepath.Join(layerPath, "UtilityVM") utilityVMPath := filepath.Join(layerPath, "UtilityVM")
_, err := os.Stat(utilityVMPath) _, err := os.Stat(utilityVMPath)
if err == nil { if err == nil {
@ -427,55 +446,42 @@ func (s *HCS) newHCSConfiguration() (*hcsshim.ContainerConfig, error) {
} }
if len(configuration.Layers) > 0 { if len(configuration.Layers) > 0 {
di.HomeDir = filepath.Dir(s.conf.Layers[0]) di.HomeDir = filepath.Dir(conf.Layers[0])
} }
// Windows doesn't support creating a container with a readonly // Windows doesn't support creating a container with a readonly
// filesystem, so always create a RW one // filesystem, so always create a RW one
if err := hcsshim.CreateSandboxLayer(di, s.id, s.conf.Layers[0], s.conf.Layers); err != nil { if err := hcsshim.CreateSandboxLayer(di, id, conf.Layers[0], conf.Layers); err != nil {
return nil, errors.Wrapf(err, "failed to create sandbox layer for %s: layers: %#v, driverInfo: %#v", return nil, errors.Wrapf(err, "failed to create sandbox layer for %s: layers: %#v, driverInfo: %#v",
s.id, configuration.Layers, di) id, configuration.Layers, di)
} }
configuration.LayerFolderPath = filepath.Join(di.HomeDir, id)
configuration.LayerFolderPath = filepath.Join(di.HomeDir, s.id) err := hcsshim.ActivateLayer(di, id)
if err := ioutil.WriteFile(filepath.Join(s.stateDir, layerFile), []byte(configuration.LayerFolderPath), 0644); err != nil {
log.L.WithError(err).Warnf("failed to save active layer %s", configuration.LayerFolderPath)
}
err := hcsshim.ActivateLayer(di, s.id)
if err != nil { if err != nil {
removeLayer(context.TODO(), configuration.LayerFolderPath) removeLayer(context.TODO(), configuration.LayerFolderPath)
return nil, errors.Wrapf(err, "failed to active layer %s", configuration.LayerFolderPath) return nil, errors.Wrapf(err, "failed to active layer %s", configuration.LayerFolderPath)
} }
err = hcsshim.PrepareLayer(di, s.id, s.conf.Layers) err = hcsshim.PrepareLayer(di, id, conf.Layers)
if err != nil { if err != nil {
removeLayer(context.TODO(), configuration.LayerFolderPath) removeLayer(context.TODO(), configuration.LayerFolderPath)
return nil, errors.Wrapf(err, "failed to prepare layer %s", configuration.LayerFolderPath) return nil, errors.Wrapf(err, "failed to prepare layer %s", configuration.LayerFolderPath)
} }
volumePath, err := hcsshim.GetLayerMountPath(di, s.id) volumePath, err := hcsshim.GetLayerMountPath(di, id)
if err != nil { if err != nil {
if err := hcsshim.DestroyLayer(di, s.id); err != nil { if err := hcsshim.DestroyLayer(di, id); err != nil {
log.L.Warnf("failed to DestroyLayer %s: %s", s.id, err) log.L.Warnf("failed to DestroyLayer %s: %s", id, err)
} }
return nil, errors.Wrapf(err, "failed to getmount path for layer %s: driverInfo: %#v", s.id, di) return nil, errors.Wrapf(err, "failed to getmount path for layer %s: driverInfo: %#v", id, di)
} }
configuration.VolumePath = volumePath configuration.VolumePath = volumePath
f, err := os.OpenFile(fmt.Sprintf("%s-hcs.json", s.id), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 066)
if err != nil {
fmt.Println("failed to create file:", err)
} else {
defer f.Close()
enc := json.NewEncoder(f)
enc.Encode(configuration)
}
return configuration, nil return configuration, nil
} }
// removeLayer delete the given layer, all associated containers must have // removeLayer deletes the given layer, all associated containers must have
// been shutdown for this to succeed. // been shutdown for this to succeed.
func removeLayer(ctx context.Context, path string) error { func removeLayer(ctx context.Context, path string) error {
layerID := filepath.Base(path) layerID := filepath.Base(path)

View File

@ -11,32 +11,51 @@ import (
) )
type Process struct { type Process struct {
containerID string hcsshim.Process
p hcsshim.Process
status containerd.Status pid uint32
io *IO
ec uint32
ecErr error
ecSync chan struct{}
} }
func (h *Process) Pid() uint32 { func (p *Process) Pid() uint32 {
return uint32(h.p.Pid()) return p.pid
} }
func (h *Process) Kill() error { func (p *Process) ExitCode() (uint32, error) {
return h.p.Kill() <-p.ecSync
return p.ec, p.ecErr
} }
func (h *Process) ExitCode() (uint32, error) { func (p *Process) Status() containerd.Status {
if err := h.p.Wait(); err != nil { select {
case <-p.ecSync:
return containerd.StoppedStatus
default:
}
return containerd.RunningStatus
}
func (p *Process) Delete() error {
p.io.Close()
return p.Close()
}
func processExitCode(containerID string, p *Process) (uint32, error) {
if err := p.Wait(); err != nil {
if herr, ok := err.(*hcsshim.ProcessError); ok && herr.Err != syscall.ERROR_BROKEN_PIPE { if herr, ok := err.(*hcsshim.ProcessError); ok && herr.Err != syscall.ERROR_BROKEN_PIPE {
return 255, errors.Wrapf(err, "failed to wait for container '%s' process %d", h.containerID, h.p.Pid()) return 255, errors.Wrapf(err, "failed to wait for container '%s' process %u", containerID, p.pid)
} }
// container is probably dead, let's try to get its exit code // process is probably dead, let's try to get its exit code
} }
h.status = containerd.StoppedStatus
ec, err := h.p.ExitCode() ec, err := p.Process.ExitCode()
if err != nil { if err != nil {
if herr, ok := err.(*hcsshim.ProcessError); ok && herr.Err != syscall.ERROR_BROKEN_PIPE { if herr, ok := err.(*hcsshim.ProcessError); ok && herr.Err != syscall.ERROR_BROKEN_PIPE {
return 255, errors.Wrapf(err, "failed to get container '%s' process %d exit code", h.containerID, h.p.Pid()) return 255, errors.Wrapf(err, "failed to get container '%s' process %d exit code", containerID, p.pid)
} }
// Well, unknown exit code it is // Well, unknown exit code it is
ec = 255 ec = 255
@ -44,7 +63,3 @@ func (h *Process) ExitCode() (uint32, error) {
return uint32(ec), err return uint32(ec), err
} }
func (h *Process) Status() containerd.Status {
return h.status
}

View File

@ -7,28 +7,27 @@ import (
"time" "time"
"github.com/Microsoft/go-winio" "github.com/Microsoft/go-winio"
"github.com/containerd/containerd"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
type shimIO struct { type IO struct {
stdin net.Conn stdin net.Conn
stdout net.Conn stdout net.Conn
stderr net.Conn stderr net.Conn
terminal bool terminal bool
} }
// newSIO connects to the provided pipes // NewIO connects to the provided pipe addresses
func newSIO(io containerd.IO) (*shimIO, error) { func NewIO(stdin, stdout, stderr string, terminal bool) (*IO, error) {
var ( var (
c net.Conn c net.Conn
err error err error
sio shimIO io IO
) )
defer func() { defer func() {
if err != nil { if err != nil {
sio.Close() io.Close()
} }
}() }()
@ -38,19 +37,19 @@ func newSIO(io containerd.IO) (*shimIO, error) {
conn *net.Conn conn *net.Conn
}{ }{
{ {
name: io.Stdin, name: stdin,
open: io.Stdin != "", open: stdin != "",
conn: &sio.stdin, conn: &io.stdin,
}, },
{ {
name: io.Stdout, name: stdout,
open: io.Stdout != "", open: stdout != "",
conn: &sio.stdout, conn: &io.stdout,
}, },
{ {
name: io.Stderr, name: stderr,
open: !io.Terminal && io.Stderr != "", open: !terminal && stderr != "",
conn: &sio.stderr, conn: &io.stderr,
}, },
} { } {
if p.open { if p.open {
@ -63,12 +62,12 @@ func newSIO(io containerd.IO) (*shimIO, error) {
} }
} }
return &sio, nil return &io, nil
} }
// Close terminates all successfully dialed IO connections // Close terminates all successfully dialed IO connections
func (s *shimIO) Close() { func (i *IO) Close() {
for _, cn := range []net.Conn{s.stdin, s.stdout, s.stderr} { for _, cn := range []net.Conn{i.stdin, i.stdout, i.stderr} {
if cn != nil { if cn != nil {
cn.Close() cn.Close()
cn = nil cn = nil

46
windows/pid/pid.go Normal file
View File

@ -0,0 +1,46 @@
// +build windows
package pid
import (
"errors"
"sync"
)
type Pool struct {
sync.Mutex
pool map[uint32]struct{}
cur uint32
}
func NewPool() *Pool {
return &Pool{
pool: make(map[uint32]struct{}),
}
}
func (p *Pool) Get() (uint32, error) {
p.Lock()
defer p.Unlock()
pid := p.cur + 1
for pid != p.cur {
// 0 is reserved and invalid
if pid == 0 {
pid = 1
}
if _, ok := p.pool[pid]; !ok {
p.cur = pid
return pid, nil
}
pid++
}
return 0, errors.New("pid pool exhausted")
}
func (p *Pool) Put(pid uint32) {
p.Lock()
delete(p.pool, pid)
p.Unlock()
}

30
windows/process.go Normal file
View File

@ -0,0 +1,30 @@
// +build windows
package windows
import (
"github.com/containerd/containerd"
"github.com/containerd/containerd/windows/hcs"
"golang.org/x/net/context"
)
// process implements containerd.Process and containerd.State
type process struct {
*hcs.Process
}
func (p *process) State(ctx context.Context) (containerd.State, error) {
return p, nil
}
func (p *process) Kill(ctx context.Context, sig uint32, all bool) error {
return p.Process.Kill()
}
func (p *process) Status() containerd.Status {
return p.Process.Status()
}
func (p *process) Pid() uint32 {
return p.Process.Pid()
}

View File

@ -8,13 +8,13 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
"syscall"
"time" "time"
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/windows/hcs" "github.com/containerd/containerd/windows/hcs"
"github.com/containerd/containerd/windows/pid"
specs "github.com/opencontainers/runtime-spec/specs-go" specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -24,18 +24,9 @@ import (
const ( const (
runtimeName = "windows" runtimeName = "windows"
owner = "containerd" owner = "containerd"
configFilename = "config.json"
) )
// Win32 error codes that are used for various workarounds var _ = (containerd.Runtime)(&Runtime{})
// These really should be ALL_CAPS to match golangs syscall library and standard
// Win32 error conventions, but golint insists on CamelCase.
const (
CoEClassstring = syscall.Errno(0x800401F3) // Invalid class string
ErrorNoNetwork = syscall.Errno(1222) // The network is not present or not started
ErrorBadPathname = syscall.Errno(161) // The specified path is invalid
ErrorInvalidObject = syscall.Errno(0x800710D8) // The object identifier does not represent a valid object
)
func init() { func init() {
plugin.Register(runtimeName, &plugin.Registration{ plugin.Register(runtimeName, &plugin.Registration{
@ -52,16 +43,30 @@ func New(ic *plugin.InitContext) (interface{}, error) {
return nil, errors.Wrapf(err, "could not create state directory at %s", rootDir) return nil, errors.Wrapf(err, "could not create state directory at %s", rootDir)
} }
r := &Runtime{
pidPool: pid.NewPool(),
containers: make(map[string]*container),
events: make(chan *containerd.Event, 2048),
eventsContext: c,
eventsCancel: cancel,
rootDir: rootDir,
hcs: hcs.New(owner, rootDir),
}
sendEvent := func(id string, evType containerd.EventType, pid, exitStatus uint32) {
r.sendEvent(id, evType, pid, exitStatus)
}
// Terminate all previous container that we may have started. We don't // Terminate all previous container that we may have started. We don't
// support restoring containers // support restoring containers
ctrs, err := loadContainers(ic.Context, r.hcs, sendEvent)
ctrs, err := loadContainers(ic.Context, rootDir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, c := range ctrs { for _, c := range ctrs {
c.remove(ic.Context) c.ctr.Delete(ic.Context)
r.sendEvent(c.ctr.ID(), containerd.ExitEvent, c.ctr.Pid(), 255)
} }
// Try to delete the old state dir and recreate it // Try to delete the old state dir and recreate it
@ -72,16 +77,9 @@ func New(ic *plugin.InitContext) (interface{}, error) {
if err := os.MkdirAll(stateDir, 0755); err != nil { if err := os.MkdirAll(stateDir, 0755); err != nil {
return nil, errors.Wrapf(err, "could not create state directory at %s", stateDir) return nil, errors.Wrapf(err, "could not create state directory at %s", stateDir)
} }
r.stateDir = stateDir
return &Runtime{ return r, nil
containers: make(map[string]*container),
containersPid: make(map[uint32]struct{}),
events: make(chan *containerd.Event, 2048),
eventsContext: c,
eventsCancel: cancel,
stateDir: stateDir,
rootDir: rootDir,
}, nil
} }
type Runtime struct { type Runtime struct {
@ -89,10 +87,11 @@ type Runtime struct {
rootDir string rootDir string
stateDir string stateDir string
pidPool *pid.Pool
hcs *hcs.HCS
containers map[string]*container containers map[string]*container
containersPid map[uint32]struct{}
currentPid uint32
events chan *containerd.Event events chan *containerd.Event
eventsContext context.Context eventsContext context.Context
@ -113,26 +112,19 @@ func (r *Runtime) Create(ctx context.Context, id string, opts containerd.CreateO
return nil, errors.Wrap(err, "failed to unmarshal oci spec") return nil, errors.Wrap(err, "failed to unmarshal oci spec")
} }
pid, err := r.getPid() sendEvent := func(id string, evType containerd.EventType, pid, exitStatus uint32) {
if err != nil { r.sendEvent(id, evType, pid, exitStatus)
return nil, err
} }
ctr, err := newContainer(id, r.rootDir, pid, rtSpec, opts.IO, func(id string, evType containerd.EventType, pid, exitStatus uint32) { ctr, err := newContainer(ctx, r.hcs, id, rtSpec, opts.IO, sendEvent)
r.sendEvent(id, evType, pid, exitStatus)
})
if err != nil { if err != nil {
r.putPid(pid)
return nil, err return nil, err
} }
r.Lock() r.Lock()
r.containers[id] = ctr r.containers[id] = ctr
r.containersPid[pid] = struct{}{}
r.Unlock() r.Unlock()
r.sendEvent(id, containerd.CreateEvent, pid, 0)
return ctr, nil return ctr, nil
} }
@ -141,30 +133,32 @@ func (r *Runtime) Delete(ctx context.Context, c containerd.Container) (uint32, e
if !ok { if !ok {
return 0, fmt.Errorf("container cannot be cast as *windows.container") return 0, fmt.Errorf("container cannot be cast as *windows.container")
} }
ec, err := wc.exitCode(ctx) ec, err := wc.ctr.ExitCode()
if err != nil { if err != nil {
ec = 255 log.G(ctx).WithError(err).Errorf("failed to retrieve exit code for container %s", wc.ctr.ID())
log.G(ctx).WithError(err).Errorf("failed to retrieve exit code for container %s", c.Info().ID)
} }
if err = wc.remove(ctx); err == nil { wc.ctr.Delete(ctx)
r.Lock() r.Lock()
delete(r.containers, c.Info().ID) delete(r.containers, wc.ctr.ID())
r.Unlock() r.Unlock()
}
r.putPid(wc.getRuntimePid())
return ec, err return ec, err
} }
func (r *Runtime) Containers() ([]containerd.Container, error) { func (r *Runtime) Containers(ctx context.Context) ([]containerd.Container, error) {
r.Lock() r.Lock()
defer r.Unlock()
list := make([]containerd.Container, len(r.containers)) list := make([]containerd.Container, len(r.containers))
for _, c := range r.containers { for _, c := range r.containers {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
list = append(list, c) list = append(list, c)
} }
r.Unlock() }
return list, nil return list, nil
} }
@ -183,29 +177,3 @@ func (r *Runtime) sendEvent(id string, evType containerd.EventType, pid, exitSta
ExitStatus: exitStatus, ExitStatus: exitStatus,
} }
} }
func (r *Runtime) getPid() (uint32, error) {
r.Lock()
defer r.Unlock()
pid := r.currentPid + 1
for pid != r.currentPid {
// 0 is reserved and invalid
if pid == 0 {
pid = 1
}
if _, ok := r.containersPid[pid]; !ok {
r.currentPid = pid
return pid, nil
}
pid++
}
return 0, errors.New("pid pool exhausted")
}
func (r *Runtime) putPid(pid uint32) {
r.Lock()
delete(r.containersPid, pid)
r.Unlock()
}