Implement io.containerd.runhcs.v1 shim log opts

Signed-off-by: Justin Terry (VM) <juterry@microsoft.com>
This commit is contained in:
Justin Terry (VM) 2018-11-01 14:09:33 -07:00
parent ec3dbd155e
commit a33ad40245
2 changed files with 142 additions and 117 deletions

View File

@ -28,6 +28,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"path" "path"
"path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -43,9 +44,11 @@ import (
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount" "github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/runtime/v2/runhcs/options"
"github.com/containerd/containerd/runtime/v2/shim" "github.com/containerd/containerd/runtime/v2/shim"
taskAPI "github.com/containerd/containerd/runtime/v2/task" taskAPI "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/go-runc" "github.com/containerd/go-runc"
"github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types" ptypes "github.com/gogo/protobuf/types"
oci "github.com/opencontainers/runtime-spec/specs-go" oci "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -63,18 +66,6 @@ var (
empty = &ptypes.Empty{} empty = &ptypes.Empty{}
) )
func newRunhcs(debugLog string) *runhcs.Runhcs {
rhs := &runhcs.Runhcs{
Debug: debugLog != "",
LogFormat: runhcs.JSON,
Owner: "containerd-runhcs-shim-v1",
}
if rhs.Debug {
rhs.Log = debugLog
}
return rhs
}
// forwardRunhcsLogs copies logs from c and writes them to the ctx logger // forwardRunhcsLogs copies logs from c and writes them to the ctx logger
// upstream. // upstream.
func forwardRunhcsLogs(ctx context.Context, c net.Conn, fields logrus.Fields) { func forwardRunhcsLogs(ctx context.Context, c net.Conn, fields logrus.Fields) {
@ -154,16 +145,31 @@ type service struct {
context context.Context context context.Context
// debugLog if not "" indicates the log pipe path for runhcs.exe to write its logs to. // debugLog if not "" indicates the log file or pipe path for runhcs.exe to
debugLog string // write its logs to.
debugLog string
// if `shimOpts.DebugType == options.Opitons_NPIPE` will hold the listener
// for the runhcs.exe to connect to for sending logs.
debugListener net.Listener debugListener net.Listener
shimOpts options.Options
id string id string
processes map[string]*process processes map[string]*process
publisher events.Publisher publisher events.Publisher
} }
func (s *service) newRunhcs() *runhcs.Runhcs {
return &runhcs.Runhcs{
Debug: s.shimOpts.Debug,
Log: s.debugLog,
LogFormat: runhcs.JSON,
Owner: "containerd-runhcs-shim-v1",
Root: s.shimOpts.RegistryRoot,
}
}
// getProcess attempts to get a process by id. // getProcess attempts to get a process by id.
// The caller MUST NOT have locked s.mu previous to calling this function. // The caller MUST NOT have locked s.mu previous to calling this function.
func (s *service) getProcess(id, execID string) (*process, error) { func (s *service) getProcess(id, execID string) (*process, error) {
@ -196,7 +202,7 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error)
return nil, err return nil, err
} }
// Forcibly shut down any container in this bundle // Forcibly shut down any container in this bundle
rhcs := newRunhcs("") rhcs := s.newRunhcs()
dopts := &runhcs.DeleteOpts{ dopts := &runhcs.DeleteOpts{
Force: true, Force: true,
} }
@ -290,7 +296,7 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
// This is a container // This is a container
if p.cid == p.id { if p.cid == p.id {
rhcs := newRunhcs(s.debugLog) rhcs := s.newRunhcs()
cs, err := rhcs.State(ctx, p.id) cs, err := rhcs.State(ctx, p.id)
if err != nil { if err != nil {
return nil, err return nil, err
@ -406,6 +412,19 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (*ta
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
_, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, errors.Wrap(err, "create namespace")
}
if r.Options != nil {
v, err := typeurl.UnmarshalAny(r.Options)
if err != nil {
return nil, err
}
s.shimOpts = *v.(*options.Options)
}
if p := s.processes[r.ID]; p != nil { if p := s.processes[r.ID]; p != nil {
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "process %s already exists", r.ID) return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "process %s already exists", r.ID)
} }
@ -445,38 +464,41 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (*ta
pr.close() pr.close()
} }
}() }()
// TODO: Parse the real RunHcs Opts r.Options
opts, ok := ctx.Value(shim.OptsKey{}).(shim.Opts)
if ok && opts.Debug {
if s.debugLog == "" {
logPath := safePipePrefix + fmt.Sprintf("runhcs-log-%s", r.ID)
l, err := winio.ListenPipe(logPath, nil)
if err != nil {
return nil, err
}
s.debugLog = logPath
s.debugListener = l
// Accept connections and forward all logs for each runhcs.exe if s.shimOpts.Debug {
// invocation if s.debugLog == "" {
go func() { if s.shimOpts.DebugType == options.Options_FILE {
for { s.debugLog = filepath.Join(r.Bundle, fmt.Sprintf("runhcs-%s.log", r.ID))
c, err := s.debugListener.Accept() } else {
if err != nil { logPath := safePipePrefix + fmt.Sprintf("runhcs-log-%s", r.ID)
if err == errorConnectionAborted { l, err := winio.ListenPipe(logPath, nil)
break if err != nil {
} return nil, err
log.G(ctx).WithError(err).Debug("log accept failure")
// Logrus error locally?
continue
}
fields := map[string]interface{}{
"log-source": "runhcs",
"task-id": r.ID,
}
go forwardRunhcsLogs(ctx, c, fields)
} }
}() s.debugLog = logPath
s.debugListener = l
// Accept connections and forward all logs for each runhcs.exe
// invocation
go func() {
for {
c, err := s.debugListener.Accept()
if err != nil {
if err == winio.ErrPipeListenerClosed {
break
}
log.G(ctx).WithError(err).Debug("log accept failure")
// Logrus error locally?
continue
}
fields := map[string]interface{}{
"log-source": "runhcs",
"task-id": r.ID,
}
go forwardRunhcsLogs(ctx, c, fields)
}
}()
}
} }
} }
@ -485,42 +507,47 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (*ta
IO: io, IO: io,
PidFile: pidfilePath, PidFile: pidfilePath,
} }
rhcs := newRunhcs(s.debugLog) rhcs := s.newRunhcs()
if rhcs.Debug { if rhcs.Debug {
doForwardLogs := func(source, logPipeFmt string, opt *string) error { if s.shimOpts.DebugType == options.Options_FILE {
pipeName := fmt.Sprintf(logPipeFmt, r.ID) copts.ShimLog = filepath.Join(r.Bundle, fmt.Sprintf("runhcs-shim-%s.log", r.ID))
*opt = safePipePrefix + pipeName copts.VMLog = filepath.Join(r.Bundle, fmt.Sprintf("runhcs-vmshim-%s.log", r.ID))
l, err := winio.ListenPipe(*opt, nil) } else {
if err != nil { doForwardLogs := func(source, logPipeFmt string, opt *string) error {
return err pipeName := fmt.Sprintf(logPipeFmt, r.ID)
} *opt = safePipePrefix + pipeName
go func() { l, err := winio.ListenPipe(*opt, nil)
defer l.Close()
c, err := l.Accept()
if err != nil { if err != nil {
log.G(ctx). return err
WithField("task-id", r.ID).
WithError(err).
Errorf("failed to accept %s", pipeName)
} else {
fields := map[string]interface{}{
"log-source": source,
"task-id": r.ID,
}
go forwardRunhcsLogs(ctx, c, fields)
} }
}() go func() {
return nil defer l.Close()
} c, err := l.Accept()
if err != nil {
log.G(ctx).
WithField("task-id", r.ID).
WithError(err).
Errorf("failed to accept %s", pipeName)
} else {
fields := map[string]interface{}{
"log-source": source,
"task-id": r.ID,
}
go forwardRunhcsLogs(ctx, c, fields)
}
}()
return nil
}
err = doForwardLogs("runhcs-shim", "runhcs-shim-log-%s", &copts.ShimLog) err = doForwardLogs("runhcs-shim", "runhcs-shim-log-%s", &copts.ShimLog)
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = doForwardLogs("runhcs--vm-shim", "runhcs-vm-shim-log-%s", &copts.VMLog) err = doForwardLogs("runhcs--vm-shim", "runhcs-vm-shim-log-%s", &copts.VMLog)
if err != nil { if err != nil {
return nil, err return nil, err
}
} }
} }
err = rhcs.Create(ctx, r.ID, r.Bundle, copts) err = rhcs.Create(ctx, r.ID, r.Bundle, copts)
@ -571,7 +598,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
return nil, errors.New("cannot start already started container or process") return nil, errors.New("cannot start already started container or process")
} }
rhcs := newRunhcs(s.debugLog) rhcs := s.newRunhcs()
// This is a start/exec // This is a start/exec
if r.ExecID != "" { if r.ExecID != "" {
@ -585,36 +612,40 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
} }
if rhcs.Debug { if rhcs.Debug {
doForwardLogs := func(source, pipeName string, opt *string) error { if s.shimOpts.DebugType == options.Options_FILE {
*opt = safePipePrefix + pipeName eopts.ShimLog = filepath.Join(p.bundle, fmt.Sprintf("runhcs-shim-%s.log", execFmt))
l, err := winio.ListenPipe(*opt, nil) } else {
if err != nil { doForwardLogs := func(source, pipeName string, opt *string) error {
return err *opt = safePipePrefix + pipeName
} l, err := winio.ListenPipe(*opt, nil)
go func() {
defer l.Close()
c, err := l.Accept()
if err != nil { if err != nil {
log.G(ctx). return err
WithField("task-id", r.ID).
WithField("exec-id", r.ExecID).
WithError(err).
Errorf("failed to accept %s", pipeName)
} else {
fields := map[string]interface{}{
"log-source": source,
"task-id": r.ID,
"exec-id": r.ExecID,
}
go forwardRunhcsLogs(ctx, c, fields)
} }
}() go func() {
return nil defer l.Close()
} c, err := l.Accept()
if err != nil {
log.G(ctx).
WithField("task-id", r.ID).
WithField("exec-id", r.ExecID).
WithError(err).
Errorf("failed to accept %s", pipeName)
} else {
fields := map[string]interface{}{
"log-source": source,
"task-id": r.ID,
"exec-id": r.ExecID,
}
go forwardRunhcsLogs(ctx, c, fields)
}
}()
return nil
}
err = doForwardLogs("runhcs-shim-exec", "runhcs-shim-log-"+execFmt, &eopts.ShimLog) err = doForwardLogs("runhcs-shim-exec", "runhcs-shim-log-"+execFmt, &eopts.ShimLog)
if err != nil { if err != nil {
return nil, err return nil, err
}
} }
} }
@ -683,7 +714,7 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
// This is a container // This is a container
if p.cid == p.id { if p.cid == p.id {
rhs := newRunhcs(s.debugLog) rhs := s.newRunhcs()
dopts := &runhcs.DeleteOpts{ dopts := &runhcs.DeleteOpts{
Force: true, Force: true,
} }
@ -729,7 +760,7 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
return nil, err return nil, err
} }
rhcs := newRunhcs(s.debugLog) rhcs := s.newRunhcs()
if err = rhcs.Pause(ctx, p.id); err != nil { if err = rhcs.Pause(ctx, p.id); err != nil {
return nil, err return nil, err
} }
@ -748,7 +779,7 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
return nil, err return nil, err
} }
rhcs := newRunhcs(s.debugLog) rhcs := s.newRunhcs()
if err = rhcs.Resume(ctx, p.id); err != nil { if err = rhcs.Resume(ctx, p.id); err != nil {
return nil, err return nil, err
} }
@ -773,9 +804,8 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Emp
return nil, err return nil, err
} }
// TODO: JTERRY75 runhcs needs r.Signal in string form
// TODO: JTERRY75 runhcs support for r.All? // TODO: JTERRY75 runhcs support for r.All?
rhcs := newRunhcs(s.debugLog) rhcs := s.newRunhcs()
if err = rhcs.Kill(ctx, p.id, strconv.FormatUint(uint64(r.Signal), 10)); err != nil { if err = rhcs.Kill(ctx, p.id, strconv.FormatUint(uint64(r.Signal), 10)); err != nil {
if !strings.Contains(err.Error(), "container is stopped") { if !strings.Contains(err.Error(), "container is stopped") {
return nil, err return nil, err
@ -882,7 +912,7 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*
opts := runhcs.ResizeTTYOpts{ opts := runhcs.ResizeTTYOpts{
Pid: &pid, Pid: &pid,
} }
rhcs := newRunhcs(s.debugLog) rhcs := s.newRunhcs()
if err = rhcs.ResizeTTY(ctx, p.cid, uint16(r.Width), uint16(r.Height), &opts); err != nil { if err = rhcs.ResizeTTY(ctx, p.cid, uint16(r.Width), uint16(r.Height), &opts); err != nil {
return nil, err return nil, err
} }

View File

@ -27,7 +27,6 @@ import (
"os" "os"
"os/exec" "os/exec"
"sync" "sync"
"syscall"
"unsafe" "unsafe"
winio "github.com/Microsoft/go-winio" winio "github.com/Microsoft/go-winio"
@ -40,10 +39,6 @@ import (
"golang.org/x/sys/windows" "golang.org/x/sys/windows"
) )
const (
errorConnectionAborted syscall.Errno = 1236
)
// setupSignals creates a new signal handler for all signals // setupSignals creates a new signal handler for all signals
func setupSignals() (chan os.Signal, error) { func setupSignals() (chan os.Signal, error) {
signals := make(chan os.Signal, 32) signals := make(chan os.Signal, 32)
@ -209,7 +204,7 @@ func (dswl *deferredShimWriteLogger) beginAccept() {
dswl.mu.Unlock() dswl.mu.Unlock()
c, err := dswl.l.Accept() c, err := dswl.l.Accept()
if err == errorConnectionAborted { if err == winio.ErrPipeListenerClosed {
dswl.mu.Lock() dswl.mu.Lock()
dswl.aborted = true dswl.aborted = true
dswl.l.Close() dswl.l.Close()