Introduce containerd-shim-runhcs-v1 on Windows

Implements the containerd-shim-runhcs-v1 shim on Windows for the runtime
v2 shim API.

Signed-off-by: Justin Terry (VM) <juterry@microsoft.com>
This commit is contained in:
Justin Terry (VM)
2018-08-08 11:35:15 -07:00
parent 3f42445e38
commit 019b0c34de
101 changed files with 6735 additions and 3649 deletions

View File

@@ -0,0 +1,98 @@
// +build windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runhcs
import (
"bytes"
"context"
"os/exec"
"sync"
"syscall"
"time"
)
var (
bytesBufferPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
}
)
func getBuffer() *bytes.Buffer {
return bytesBufferPool.Get().(*bytes.Buffer)
}
func putBuffer(b *bytes.Buffer) {
b.Reset()
bytesBufferPool.Put(b)
}
type processExit struct {
pid uint32
exitStatus uint32
exitedAt time.Time
exitErr error
}
func runCmd(ctx context.Context, c *exec.Cmd) (*processExit, error) {
ec, startErr := startCmd(ctx, c)
if startErr != nil {
return nil, startErr
}
er, cmdErr := waitCmd(ctx, ec)
return er, cmdErr
}
func startCmd(ctx context.Context, c *exec.Cmd) (<-chan *processExit, error) {
if err := c.Start(); err != nil {
return nil, err
}
ec := make(chan *processExit, 1)
go func() {
defer close(ec)
var status int
eerr := c.Wait()
if eerr != nil {
status = 255
if exitErr, ok := eerr.(*exec.ExitError); ok {
if ws, ok := exitErr.Sys().(syscall.WaitStatus); ok {
status = ws.ExitStatus()
}
}
}
ec <- &processExit{
pid: uint32(c.Process.Pid),
exitStatus: uint32(status),
exitedAt: time.Now(),
exitErr: eerr,
}
}()
return ec, nil
}
func waitCmd(ctx context.Context, ec <-chan *processExit) (*processExit, error) {
e := <-ec
if e.exitStatus != 0 {
return e, e.exitErr
}
return e, nil
}

159
runtime/v2/runhcs/io.go Normal file
View File

@@ -0,0 +1,159 @@
// +build windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runhcs
import (
"context"
"io"
"net"
"sync"
"time"
"github.com/Microsoft/go-winio"
"github.com/containerd/containerd/log"
runc "github.com/containerd/go-runc"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
type pipeSet struct {
stdin net.Conn
stdout net.Conn
stderr net.Conn
}
// newPipeSet connects to the provided pipe addresses
func newPipeSet(ctx context.Context, stdin, stdout, stderr string, terminal bool) (*pipeSet, error) {
var (
err error
set = &pipeSet{}
)
defer func() {
if err != nil {
set.Close()
}
}()
g, _ := errgroup.WithContext(ctx)
dialfn := func(name string, conn *net.Conn) error {
if name == "" {
return nil
}
dialTimeout := 3 * time.Second
c, err := winio.DialPipe(name, &dialTimeout)
if err != nil {
return errors.Wrapf(err, "failed to connect to %s", name)
}
*conn = c
return nil
}
g.Go(func() error {
return dialfn(stdin, &set.stdin)
})
g.Go(func() error {
return dialfn(stdout, &set.stdout)
})
g.Go(func() error {
return dialfn(stderr, &set.stderr)
})
err = g.Wait()
if err != nil {
return nil, err
}
return set, nil
}
// Close terminates all successfully dialed IO connections
func (p *pipeSet) Close() {
for _, cn := range []net.Conn{p.stdin, p.stdout, p.stderr} {
if cn != nil {
cn.Close()
}
}
}
type pipeRelay struct {
ctx context.Context
ps *pipeSet
io runc.IO
wg sync.WaitGroup
once sync.Once
}
func newPipeRelay(ctx context.Context, ps *pipeSet, downstream runc.IO) *pipeRelay {
pr := &pipeRelay{
ctx: ctx,
ps: ps,
io: downstream,
}
if ps.stdin != nil {
go func() {
if _, err := io.Copy(downstream.Stdin(), ps.stdin); err != nil {
if err != winio.ErrFileClosed {
log.G(ctx).WithError(err).Error("error copying stdin to pipe")
}
}
}()
}
if ps.stdout != nil {
pr.wg.Add(1)
go func() {
if _, err := io.Copy(ps.stdout, downstream.Stdout()); err != nil {
log.G(ctx).WithError(err).Error("error copying stdout from pipe")
}
pr.wg.Done()
}()
}
if ps.stderr != nil {
pr.wg.Add(1)
go func() {
if _, err := io.Copy(ps.stderr, downstream.Stderr()); err != nil {
log.G(pr.ctx).WithError(err).Error("error copying stderr from pipe")
}
pr.wg.Done()
}()
}
return pr
}
func (pr *pipeRelay) wait() {
pr.wg.Wait()
}
// closeIO closes stdin to unblock an waiters
func (pr *pipeRelay) closeIO() {
if pr.ps.stdin != nil {
pr.ps.Close()
pr.io.Stdin().Close()
}
}
// close closes all open pipes
func (pr *pipeRelay) close() {
pr.once.Do(func() {
pr.io.Close()
pr.ps.Close()
})
}

View File

@@ -0,0 +1,178 @@
// +build windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runhcs
import (
"context"
"os"
"os/exec"
"sync"
"sync/atomic"
"syscall"
"time"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/runtime"
)
func newProcess(ctx context.Context, s *service, id string, pid uint32, pr *pipeRelay, bundle, stdin, stdout, stderr string, terminal bool) (*process, error) {
p, err := os.FindProcess(int(pid))
if err != nil {
return nil, err
}
process := &process{
cid: id,
id: id,
pid: pid,
bundle: bundle,
stdin: stdin,
stdout: stdout,
stderr: stderr,
terminal: terminal,
relay: pr,
waitBlock: make(chan struct{}),
}
// Store the default non-exited value for calls to stat
process.exit.Store(&processExit{
pid: pid,
exitStatus: 255,
exitedAt: time.Time{},
exitErr: nil,
})
go waitForProcess(ctx, process, p, s)
return process, nil
}
func waitForProcess(ctx context.Context, process *process, p *os.Process, s *service) {
var status int
_, eerr := p.Wait()
if eerr != nil {
status = 255
if exitErr, ok := eerr.(*exec.ExitError); ok {
if ws, ok := exitErr.Sys().(syscall.WaitStatus); ok {
status = ws.ExitStatus()
}
}
}
now := time.Now()
process.exit.Store(&processExit{
pid: process.pid,
exitStatus: uint32(status),
exitedAt: now,
exitErr: eerr,
})
// Wait for the relay
process.relay.wait()
// close the client io, and free upstream waiters
process.close()
s.publisher.Publish(
ctx,
runtime.TaskExitEventTopic,
&eventstypes.TaskExit{
ContainerID: process.cid,
ID: process.id,
Pid: process.pid,
ExitStatus: uint32(status),
ExitedAt: now,
})
}
func newExecProcess(ctx context.Context, s *service, cid, id string, pr *pipeRelay, bundle, stdin, stdout, stderr string, terminal bool) (*process, error) {
process := &process{
cid: cid,
id: id,
bundle: bundle,
stdin: stdin,
stdout: stdout,
stderr: stderr,
terminal: terminal,
relay: pr,
waitBlock: make(chan struct{}),
}
// Store the default non-exited value for calls to stat
process.exit.Store(&processExit{
exitStatus: 255,
exitedAt: time.Time{},
exitErr: nil,
})
return process, nil
}
type process struct {
sync.Mutex
cid string
id string
pid uint32
bundle string
stdin string
stdout string
stderr string
terminal bool
relay *pipeRelay
// started track if the process has ever been started and will not be reset
// for the lifetime of the process object.
started bool
waitBlock chan struct{}
// exit holds the exit value for all calls to `stat`. By default a
// non-exited value is stored of status: 255, at: time 0.
exit atomic.Value
// closeOnce is responsible for closing waitBlock and any io.
closeOnce sync.Once
}
// closeIO closes the stdin of the executing process to unblock any waiters
func (p *process) closeIO() {
p.Lock()
defer p.Unlock()
p.relay.closeIO()
}
// close closes all stdio and frees any waiters. This is safe to call multiple
// times.
func (p *process) close() {
p.closeOnce.Do(func() {
p.relay.close()
// Free any waiters
close(p.waitBlock)
})
}
// stat is a non-blocking query of the current process state.
func (p *process) stat() *processExit {
er := p.exit.Load()
return er.(*processExit)
}
// wait waits for the container process to exit and returns the exit status. If
// the process failed post start the processExit will contain the exitErr. This
// is safe to call previous to calling start().
func (p *process) wait() *processExit {
<-p.waitBlock
return p.stat()
}

View File

@@ -0,0 +1,761 @@
// +build windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package runhcs
import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"path"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/Microsoft/hcsshim/cmd/go-runhcs"
containerd_types "github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/runtime/v2/shim"
taskAPI "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/go-runc"
ptypes "github.com/gogo/protobuf/types"
oci "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const (
runhcsBinary = "runhcs"
runhcsVersion = "0.0.1"
runhcsDebugLegacy = "--debug" // TODO: JTERRY75 remove when all cmd's are complete in go-runhcs
)
var (
empty = &ptypes.Empty{}
runhcsDebug = false
)
func init() {
if logrus.GetLevel() == logrus.DebugLevel {
runhcsDebug = true
}
}
func newRunhcs(bundle string) *runhcs.Runhcs {
rhs := &runhcs.Runhcs{
Debug: runhcsDebug,
LogFormat: runhcs.JSON,
Owner: "containerd-runhcs-shim-v1",
}
if rhs.Debug {
rhs.Log = filepath.Join(bundle, "runhcs-debug.log")
}
return rhs
}
// New returns a new runhcs shim service that can be used via GRPC
func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) {
return &service{
context: ctx,
id: id,
processes: make(map[string]*process),
publisher: publisher,
}, nil
}
var _ = (taskAPI.TaskService)(&service{})
var _ = (shim.Shim)(&service{})
// service is the runhcs shim implementation of the v2 TaskService over GRPC
type service struct {
mu sync.Mutex
context context.Context
id string
processes map[string]*process
publisher events.Publisher
}
// getProcess attempts to get a process by id.
// The caller MUST NOT have locked s.mu previous to calling this function.
func (s *service) getProcess(id string) (*process, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.getProcessLocked(id)
}
// getProcessLocked attempts to get a process by id.
// The caller MUST protect s.mu previous to calling this function.
func (s *service) getProcessLocked(id string) (*process, error) {
var p *process
var ok bool
if p, ok = s.processes[id]; !ok {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s", id)
}
return p, nil
}
func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) {
log.G(ctx).Info("Starting Cleanup")
_, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
path, err := os.Getwd()
if err != nil {
return nil, err
}
if err := os.RemoveAll(path); err != nil {
return nil, err
}
return &taskAPI.DeleteResponse{
ExitedAt: time.Now(),
ExitStatus: 255,
}, nil
}
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return "", err
}
self, err := os.Executable()
if err != nil {
return "", err
}
cwd, err := os.Getwd()
if err != nil {
return "", err
}
socketAddress, err := shim.SocketAddress(ctx, id)
if err != nil {
return "", err
}
args := []string{
"-namespace", ns,
"-id", id,
"-address", containerdAddress,
"-publish-binary", containerdBinary,
"-socket", socketAddress,
"-debug",
}
cmd := exec.Command(self, args...)
cmd.Dir = cwd
cmd.Env = append(os.Environ(), "GOMAXPROCS=2")
if err = cmd.Start(); err != nil {
return "", err
}
defer func() {
if err != nil {
cmd.Process.Kill()
}
}()
// TODO: JTERRY75 - Windows does not use the ExtraFiles to pass the socket
// because winio does not support it which exposes a race condition between
// these two processes. For now AnonDialer will retry if the file does not
// exist up to the timeout waiting for the shim service to create the pipe.
go cmd.Wait()
if err := shim.WritePidFile("shim.pid", cmd.Process.Pid); err != nil {
return "", err
}
if err := shim.WriteAddress("address", socketAddress); err != nil {
return "", err
}
return socketAddress, nil
}
// State returns runtime state information for a process
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
var p *process
var err error
if p, err = s.getProcessLocked(r.ID); err != nil {
return nil, err
}
cmd := exec.Command(runhcsBinary, runhcsDebugLegacy, "state", r.ID)
sout := getBuffer()
defer putBuffer(sout)
cmd.Stdout = sout
_, stateErr := runCmd(ctx, cmd)
if stateErr != nil {
return nil, stateErr
}
// TODO: JTERRY75 merge this with runhcs declaration
type containerState struct {
// Version is the OCI version for the container
Version string `json:"ociVersion"`
// ID is the container ID
ID string `json:"id"`
// InitProcessPid is the init process id in the parent namespace
InitProcessPid int `json:"pid"`
// Status is the current status of the container, running, paused, ...
Status string `json:"status"`
// Bundle is the path on the filesystem to the bundle
Bundle string `json:"bundle"`
// Rootfs is a path to a directory containing the container's root filesystem.
Rootfs string `json:"rootfs"`
// Created is the unix timestamp for the creation time of the container in UTC
Created time.Time `json:"created"`
// Annotations is the user defined annotations added to the config.
Annotations map[string]string `json:"annotations,omitempty"`
// The owner of the state directory (the owner of the container).
Owner string `json:"owner"`
}
var cs containerState
if err := json.NewDecoder(sout).Decode(&cs); err != nil {
log.G(ctx).WithError(err).Debugf("failed to decode runhcs state output: %s", sout.Bytes())
return nil, err
}
status := task.StatusUnknown
switch cs.Status {
case "created":
status = task.StatusCreated
case "running":
status = task.StatusRunning
case "stopped":
status = task.StatusStopped
case "paused":
status = task.StatusPaused
}
pe := p.stat()
return &taskAPI.StateResponse{
ID: r.ID,
Bundle: cs.Bundle,
Pid: uint32(cs.InitProcessPid),
Status: status,
Stdin: p.stdin,
Stdout: p.stdout,
Stderr: p.stderr,
Terminal: p.terminal,
ExitStatus: pe.exitStatus,
ExitedAt: pe.exitedAt,
}, nil
}
func writeMountsToConfig(bundle string, mounts []*containerd_types.Mount) error {
if len(mounts) != 1 {
return errors.New("Rootfs does not contain exactly 1 mount for the root file system")
}
m := mounts[0]
if m.Type != "windows-layer" && m.Type != "lcow-layer" {
return errors.Errorf("unsupported mount type '%s'", m.Type)
}
// parentLayerPaths are passed in layerN, layerN-1, ..., layer 0
//
// The OCI spec expects:
// windows-layer order is layerN, layerN-1, ..., layer0, scratch
// lcow-layer order is layer0, layer1, ..., layerN, scratch
var parentLayerPaths []string
for _, option := range mounts[0].Options {
if strings.HasPrefix(option, mount.ParentLayerPathsFlag) {
err := json.Unmarshal([]byte(option[len(mount.ParentLayerPathsFlag):]), &parentLayerPaths)
if err != nil {
return errors.Wrap(err, "failed to unmarshal parent layer paths from mount")
}
}
}
if m.Type == "lcow-layer" {
// Reverse the lcow-layer parents
for i := len(parentLayerPaths)/2 - 1; i >= 0; i-- {
opp := len(parentLayerPaths) - 1 - i
parentLayerPaths[i], parentLayerPaths[opp] = parentLayerPaths[opp], parentLayerPaths[i]
}
}
cf, err := os.OpenFile(path.Join(bundle, "config.json"), os.O_RDWR, 0)
if err != nil {
return errors.Wrap(err, "bundle does not contain config.json")
}
defer cf.Close()
var spec oci.Spec
if err := json.NewDecoder(cf).Decode(&spec); err != nil {
return errors.Wrap(err, "bundle config.json is not valid oci spec")
}
if err := cf.Truncate(0); err != nil {
return errors.Wrap(err, "failed to truncate config.json")
}
if _, err := cf.Seek(0, 0); err != nil {
return errors.Wrap(err, "failed to seek to 0 in config.json")
}
// Append the parents
spec.Windows.LayerFolders = append(spec.Windows.LayerFolders, parentLayerPaths...)
// Append the scratch
spec.Windows.LayerFolders = append(spec.Windows.LayerFolders, m.Source)
if err := json.NewEncoder(cf).Encode(spec); err != nil {
return errors.Wrap(err, "failed to write Mounts into config.json")
}
return nil
}
// Create a new initial process and container with runhcs
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (*taskAPI.CreateTaskResponse, error) {
// Hold the lock for the entire duration to avoid duplicate process creation.
s.mu.Lock()
defer s.mu.Unlock()
if p := s.processes[r.ID]; p != nil {
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "process %s already exists", r.ID)
}
if r.ID != s.id {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "init process id '%s' != shim id '%s'", r.ID, s.id)
}
// Add the mounts to the layer paths
if err := writeMountsToConfig(r.Bundle, r.Rootfs); err != nil {
return nil, errdefs.ToGRPC(err)
}
// Create the IO for the process
io, err := runc.NewPipeIO()
if err != nil {
return nil, errors.Wrap(err, "failed to create io pipes")
}
defer func() {
if err != nil {
io.Close()
}
}()
// Create the upstream IO
ps, err := newPipeSet(ctx, r.Stdin, r.Stdout, r.Stderr, r.Terminal)
if err != nil {
return nil, errors.Wrap(err, "failed to connect to upstream IO")
}
defer func() {
if err != nil {
ps.Close()
}
}()
// Create the relay for them both.
pr := newPipeRelay(ctx, ps, io)
defer func() {
if err != nil {
pr.close()
}
}()
pidfilePath := path.Join(r.Bundle, "runhcs-pidfile.pid")
copts := &runhcs.CreateOpts{
IO: io,
PidFile: pidfilePath,
ShimLog: path.Join(r.Bundle, "runhcs-shim.log"),
VMLog: path.Join(r.Bundle, "runhcs-vm-shim.log"),
}
rhcs := newRunhcs(r.Bundle)
err = rhcs.Create(ctx, r.ID, r.Bundle, copts)
if err != nil {
return nil, err
}
// We successfully created the process. Convert from initpid to the container process.
pid, err := runc.ReadPidFile(pidfilePath)
if err != nil {
return nil, errors.Wrap(err, "failed to read container pid from pidfile")
}
process, err := newProcess(
ctx,
s,
r.ID,
uint32(pid),
pr,
r.Bundle,
r.Stdin,
r.Stdout,
r.Stderr,
r.Terminal)
if err != nil {
return nil, err
}
s.processes[r.ID] = process
return &taskAPI.CreateTaskResponse{
Pid: uint32(pid),
}, nil
}
// Start a process
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
log.G(ctx).Infof("Start: %s: %s", r.ID, r.ExecID)
var p *process
var err error
var id string
if r.ExecID != "" {
id = r.ExecID
} else {
id = r.ID
}
if p, err = s.getProcess(id); err != nil {
return nil, err
}
p.Lock()
defer p.Unlock()
if p.started {
return nil, errors.New("cannot start already started container or process")
}
rhcs := newRunhcs(p.bundle)
// This is a start/exec
if r.ExecID != "" {
execFmt := fmt.Sprintf("exec-%s-%s", r.ID, r.ExecID)
pidfilePath := path.Join(p.bundle, fmt.Sprintf("runhcs-%s-pidfile.pid", execFmt))
procConfig := path.Join(p.bundle, execFmt+"config.json")
eopts := &runhcs.ExecOpts{
IO: p.relay.io,
PidFile: pidfilePath,
ShimLog: path.Join(p.bundle, fmt.Sprintf("runhcs-%s-shim.log", execFmt)),
Detach: true,
}
err = rhcs.Exec(ctx, r.ID, procConfig, eopts)
if err != nil {
return nil, errors.Wrapf(err, "failed to exec process: %s", r.ExecID)
}
p.started = true
// We successfully exec'd the process. Convert from initpid to the container process.
pid, err := runc.ReadPidFile(pidfilePath)
if err != nil {
return nil, errors.Wrap(err, "failed to read container pid from pidfile")
}
proc, err := os.FindProcess(pid)
if err != nil {
return nil, errors.Wrap(err, "failed to find exec process pid")
}
p.pid = uint32(pid)
go waitForProcess(ctx, p, proc, s)
} else {
if err := rhcs.Start(ctx, p.id); err != nil {
return nil, errors.Wrapf(err, "failed to start container: %s", p.id)
}
p.started = true
// TODO: JTERRY75 - This is a total hack. We cant return from this call
// until the state has transitioned. Because runhcs start is long lived it
// will return before the actual state is "RUNNING" which causes a failure
// if the 'state' query comes in and it is not in the "RUNNING" state.
stateRequest := &taskAPI.StateRequest{
ID: r.ID,
ExecID: r.ExecID,
}
for {
sr, err := s.State(ctx, stateRequest)
if err != nil {
log.G(ctx).WithError(err).Debug("failed to query state of container")
break
}
// Have we transitioned states yet? Expect != created && != unknown
if sr.Status != task.StatusCreated && sr.Status != task.StatusUnknown {
break
}
time.Sleep(1 * time.Second)
}
}
return &taskAPI.StartResponse{
Pid: p.pid,
}, nil
}
// Delete the initial process and container
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
var p *process
var err error
if p, err = s.getProcess(r.ExecID); err != nil {
return nil, err
}
rhs := newRunhcs(p.bundle)
dopts := &runhcs.DeleteOpts{
Force: true,
}
if err := rhs.Delete(ctx, p.id, dopts); err != nil {
return nil, errors.Wrapf(err, "failed to delete container: %s", p.id)
}
select {
case <-time.After(5 * time.Second):
// Force close the container process since it didnt shutdown in time.
p.close()
case <-p.waitBlock:
}
exit := p.stat()
s.mu.Lock()
delete(s.processes, r.ID)
s.mu.Unlock()
return &taskAPI.DeleteResponse{
ExitedAt: exit.exitedAt,
ExitStatus: exit.exitStatus,
Pid: p.pid,
}, nil
}
// Pids returns all pids inside the container
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
return nil, errdefs.ErrNotImplemented
}
// Pause the container
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
// TODO: Validate that 'id' is actually a valid parent container ID
if _, err := s.getProcess(r.ID); err != nil {
return nil, err
}
cmd := exec.Command(runhcsBinary, runhcsDebugLegacy, "pause", r.ID)
_, err := runCmd(ctx, cmd)
if err != nil {
return nil, err
}
return empty, nil
}
// Resume the container
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
// TODO: Validate that 'id' is actually a valid parent container ID
if _, err := s.getProcess(r.ID); err != nil {
return nil, err
}
cmd := exec.Command(runhcsBinary, runhcsDebugLegacy, "resume", r.ID)
_, err := runCmd(ctx, cmd)
if err != nil {
return nil, err
}
return empty, nil
}
// Checkpoint the container
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
return nil, errdefs.ErrNotImplemented
}
// Kill a process with the provided signal
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
var p *process
var err error
if p, err = s.getProcess(r.ExecID); err != nil {
return nil, err
}
// TODO: JTERRY75 runhcs needs r.Signal in string form
// TODO: JTERRY75 runhcs support for r.All?
rhcs := newRunhcs(p.bundle)
if err = rhcs.Kill(ctx, p.id, strconv.FormatUint(uint64(r.Signal), 10)); err != nil {
if !strings.Contains(err.Error(), "container is not running") {
return nil, err
}
}
return empty, nil
}
// Exec an additional process inside the container
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
var parent *process
var err error
// Get the parent container
if parent, err = s.getProcessLocked(r.ID); err != nil {
return nil, err
}
if p := s.processes[r.ExecID]; p != nil {
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "exec process %s already exists", r.ExecID)
}
execFmt := fmt.Sprintf("exec-%s-%s", r.ID, r.ExecID)
var spec oci.Process
if err := json.Unmarshal(r.Spec.Value, &spec); err != nil {
return nil, errors.Wrap(err, "request.Spec was not oci process")
}
procConfig := path.Join(parent.bundle, execFmt+"config.json")
cf, err := os.OpenFile(procConfig, os.O_CREATE|os.O_WRONLY, 0)
if err != nil {
return nil, errors.Wrap(err, "bundle does not contain config.json")
}
if err := json.NewEncoder(cf).Encode(spec); err != nil {
cf.Close()
return nil, errors.Wrap(err, "failed to write Mounts into config.json")
}
cf.Close()
// Create the IO for the process
io, err := runc.NewPipeIO()
if err != nil {
return nil, errors.Wrap(err, "failed to create io pipes")
}
defer func() {
if err != nil {
io.Close()
}
}()
// Create the upstream IO
ps, err := newPipeSet(ctx, r.Stdin, r.Stdout, r.Stderr, r.Terminal)
if err != nil {
return nil, errors.Wrap(err, "failed to connect to upstream IO")
}
defer func() {
if err != nil {
ps.Close()
}
}()
// Create the relay for them both.
pr := newPipeRelay(ctx, ps, io)
defer func() {
if err != nil {
pr.close()
}
}()
process, err := newExecProcess(
ctx,
s,
r.ID,
r.ExecID,
pr,
parent.bundle,
r.Stdin,
r.Stdout,
r.Stderr,
r.Terminal)
if err != nil {
return nil, err
}
s.processes[r.ExecID] = process
return empty, nil
}
// ResizePty of a process
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
var p *process
var err error
if p, err = s.getProcess(r.ExecID); err != nil {
return nil, err
}
cmd := exec.Command(
runhcsBinary,
runhcsDebugLegacy,
"resize-tty",
r.ID,
"-p",
strconv.FormatUint(uint64(p.pid), 10),
strconv.FormatUint(uint64(r.Width), 10),
strconv.FormatUint(uint64(r.Height), 10))
_, err = runCmd(ctx, cmd)
if err != nil {
return nil, err
}
return empty, nil
}
// CloseIO of a process
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
var p *process
var err error
if p, err = s.getProcess(r.ExecID); err != nil {
return nil, err
}
p.closeIO()
return empty, nil
}
// Update a running container
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
return nil, errdefs.ErrNotImplemented
}
// Wait for a process to exit
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
var p *process
var err error
if p, err = s.getProcess(r.ExecID); err != nil {
return nil, err
}
pe := p.wait()
return &taskAPI.WaitResponse{
ExitedAt: pe.exitedAt,
ExitStatus: pe.exitStatus,
}, nil
}
// Stats returns statistics about the running container
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
return nil, errdefs.ErrNotImplemented
}
// Connect returns the runhcs shim information
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
return &taskAPI.ConnectResponse{
ShimPid: uint32(os.Getpid()),
TaskPid: s.processes[s.id].pid,
Version: runhcsVersion,
}, nil
}
// Shutdown stops this instance of the runhcs shim
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
os.Exit(0)
return empty, nil
}

View File

@@ -27,6 +27,7 @@ import (
"os"
"os/exec"
"sync"
"unsafe"
winio "github.com/Microsoft/go-winio"
"github.com/containerd/containerd/events"
@@ -35,6 +36,7 @@ import (
"github.com/containerd/typeurl"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/windows"
)
// setupSignals creates a new signal handler for all signals
@@ -51,8 +53,43 @@ func subreaper() error {
return nil
}
type fakeSignal struct {
}
func (fs *fakeSignal) String() string {
return ""
}
func (fs *fakeSignal) Signal() {
}
func setupDumpStacks(dump chan<- os.Signal) {
// TODO: JTERRY75: Make this based on events. signal.Notify(dump, syscall.SIGUSR1)
// Windows does not support signals like *nix systems. So instead of
// trapping on SIGUSR1 to dump stacks, we wait on a Win32 event to be
// signaled. ACL'd to builtin administrators and local system
event := "Global\\containerd-shim-runhcs-v1-" + fmt.Sprint(os.Getpid())
ev, _ := windows.UTF16PtrFromString(event)
sd, err := winio.SddlToSecurityDescriptor("D:P(A;;GA;;;BA)(A;;GA;;;SY)")
if err != nil {
logrus.Errorf("failed to get security descriptor for debug stackdump event %s: %s", event, err.Error())
return
}
var sa windows.SecurityAttributes
sa.Length = uint32(unsafe.Sizeof(sa))
sa.InheritHandle = 1
sa.SecurityDescriptor = uintptr(unsafe.Pointer(&sd[0]))
h, err := windows.CreateEvent(&sa, 0, 0, ev)
if h == 0 || err != nil {
logrus.Errorf("failed to create debug stackdump event %s: %s", event, err.Error())
return
}
go func() {
logrus.Debugf("Stackdump - waiting signal at %s", event)
for {
windows.WaitForSingleObject(h, windows.INFINITE)
dump <- new(fakeSignal)
}
}()
}
// serve serves the ttrpc API over a unix socket at the provided path