Adds containerd-shim-runhcs verbose logging support
Revendors to Microsoft/hcsshim v0.7.5 that added support for logging all runhcs.exe commands via Windows named pipes. This now launches all runhcs.exe commands and forwards debug logging to the containerd-shim-runhcs log when with --debug. Signed-off-by: Justin Terry (VM) <juterry@microsoft.com>
This commit is contained in:
@@ -22,15 +22,19 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
winio "github.com/Microsoft/go-winio"
|
||||
"github.com/Microsoft/hcsshim/cmd/go-runhcs"
|
||||
containerd_types "github.com/containerd/containerd/api/types"
|
||||
"github.com/containerd/containerd/api/types/task"
|
||||
@@ -50,24 +54,87 @@ import (
|
||||
|
||||
const (
|
||||
runhcsShimVersion = "0.0.1"
|
||||
safePipePrefix = `\\.\pipe\ProtectedPrefix\Administrators\`
|
||||
|
||||
errorConnectionAborted syscall.Errno = 1236
|
||||
)
|
||||
|
||||
var (
|
||||
empty = &ptypes.Empty{}
|
||||
)
|
||||
|
||||
func newRunhcs(bundle string) *runhcs.Runhcs {
|
||||
func newRunhcs(debugLog string) *runhcs.Runhcs {
|
||||
rhs := &runhcs.Runhcs{
|
||||
Debug: logrus.GetLevel() == logrus.DebugLevel,
|
||||
Debug: debugLog != "",
|
||||
LogFormat: runhcs.JSON,
|
||||
Owner: "containerd-runhcs-shim-v1",
|
||||
}
|
||||
if rhs.Debug {
|
||||
rhs.Log = filepath.Join(bundle, "runhcs-debug.log")
|
||||
rhs.Log = debugLog
|
||||
}
|
||||
return rhs
|
||||
}
|
||||
|
||||
// forwardRunhcsLogs copies logs from c and writes them to the ctx logger
|
||||
// upstream.
|
||||
func forwardRunhcsLogs(ctx context.Context, c net.Conn, fields logrus.Fields) {
|
||||
defer c.Close()
|
||||
j := json.NewDecoder(c)
|
||||
|
||||
for {
|
||||
e := logrus.Entry{}
|
||||
err := j.Decode(&e.Data)
|
||||
if err == io.EOF || err == errorConnectionAborted {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
// Likely the last message wasn't complete at closure. Just read all
|
||||
// data and forward as error.
|
||||
data, _ := ioutil.ReadAll(io.MultiReader(j.Buffered(), c))
|
||||
if len(data) != 0 {
|
||||
log.G(ctx).WithFields(fields).Error(string(data))
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
msg := e.Data[logrus.FieldKeyMsg]
|
||||
delete(e.Data, logrus.FieldKeyMsg)
|
||||
|
||||
level, err := logrus.ParseLevel(e.Data[logrus.FieldKeyLevel].(string))
|
||||
if err != nil {
|
||||
log.G(ctx).WithFields(fields).WithError(err).Debug("invalid log level")
|
||||
level = logrus.DebugLevel
|
||||
}
|
||||
delete(e.Data, logrus.FieldKeyLevel)
|
||||
|
||||
// TODO: JTERRY75 maybe we need to make this configurable so we know
|
||||
// that runhcs is using the same one we are deserializing.
|
||||
ti, err := time.Parse(logrus.DefaultTimestampFormat, e.Data[logrus.FieldKeyTime].(string))
|
||||
if err != nil {
|
||||
log.G(ctx).WithFields(fields).WithError(err).Debug("invalid time stamp format")
|
||||
ti = time.Time{}
|
||||
}
|
||||
delete(e.Data, logrus.FieldKeyTime)
|
||||
|
||||
etr := log.G(ctx).WithFields(fields).WithFields(e.Data)
|
||||
etr.Time = ti
|
||||
switch level {
|
||||
case logrus.PanicLevel:
|
||||
etr.Panic(msg)
|
||||
case logrus.FatalLevel:
|
||||
etr.Fatal(msg)
|
||||
case logrus.ErrorLevel:
|
||||
etr.Error(msg)
|
||||
case logrus.WarnLevel:
|
||||
etr.Warn(msg)
|
||||
case logrus.InfoLevel:
|
||||
etr.Info(msg)
|
||||
case logrus.DebugLevel:
|
||||
etr.Debug(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// New returns a new runhcs shim service that can be used via GRPC
|
||||
func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) {
|
||||
return &service{
|
||||
@@ -87,6 +154,10 @@ type service struct {
|
||||
|
||||
context context.Context
|
||||
|
||||
// debugLog if not "" indicates the log pipe path for runhcs.exe to write its logs to.
|
||||
debugLog string
|
||||
debugListener net.Listener
|
||||
|
||||
id string
|
||||
processes map[string]*process
|
||||
|
||||
@@ -195,6 +266,8 @@ func (s *service) StartShim(ctx context.Context, id, containerdBinary, container
|
||||
|
||||
// State returns runtime state information for a process
|
||||
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
|
||||
log.G(ctx).Debugf("State: %s: %s", r.ID, r.ExecID)
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@@ -208,7 +281,7 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
|
||||
|
||||
// This is a container
|
||||
if p.cid == p.id {
|
||||
rhcs := newRunhcs(p.bundle)
|
||||
rhcs := newRunhcs(s.debugLog)
|
||||
cs, err := rhcs.State(ctx, p.id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -318,7 +391,7 @@ func writeMountsToConfig(bundle string, mounts []*containerd_types.Mount) error
|
||||
|
||||
// Create a new initial process and container with runhcs
|
||||
func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (*taskAPI.CreateTaskResponse, error) {
|
||||
log.G(ctx).Infof("Create: %s", r.ID)
|
||||
log.G(ctx).Debugf("Create: %s", r.ID)
|
||||
|
||||
// Hold the lock for the entire duration to avoid duplicate process creation.
|
||||
s.mu.Lock()
|
||||
@@ -363,16 +436,84 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (*ta
|
||||
pr.close()
|
||||
}
|
||||
}()
|
||||
// TODO: Parse the real RunHcs Opts r.Options
|
||||
opts, ok := ctx.Value(shim.OptsKey{}).(shim.Opts)
|
||||
if ok && opts.Debug {
|
||||
if s.debugLog == "" {
|
||||
logPath := safePipePrefix + fmt.Sprintf("runhcs-log-%s", r.ID)
|
||||
l, err := winio.ListenPipe(logPath, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.debugLog = logPath
|
||||
s.debugListener = l
|
||||
|
||||
// Accept connections and forward all logs for each runhcs.exe
|
||||
// invocation
|
||||
go func() {
|
||||
for {
|
||||
c, err := s.debugListener.Accept()
|
||||
if err != nil {
|
||||
if err == errorConnectionAborted {
|
||||
break
|
||||
}
|
||||
log.G(ctx).WithError(err).Debug("log accept failure")
|
||||
// Logrus error locally?
|
||||
continue
|
||||
}
|
||||
fields := map[string]interface{}{
|
||||
"log-source": "runhcs",
|
||||
"task-id": r.ID,
|
||||
}
|
||||
go forwardRunhcsLogs(ctx, c, fields)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
pidfilePath := path.Join(r.Bundle, "runhcs-pidfile.pid")
|
||||
copts := &runhcs.CreateOpts{
|
||||
IO: io,
|
||||
PidFile: pidfilePath,
|
||||
ShimLog: path.Join(r.Bundle, "runhcs-shim.log"),
|
||||
VMLog: path.Join(r.Bundle, "runhcs-vm-shim.log"),
|
||||
}
|
||||
rhcs := newRunhcs(s.debugLog)
|
||||
if rhcs.Debug {
|
||||
doForwardLogs := func(source, logPipeFmt string, opt *string) error {
|
||||
pipeName := fmt.Sprintf(logPipeFmt, r.ID)
|
||||
*opt = safePipePrefix + pipeName
|
||||
l, err := winio.ListenPipe(*opt, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
defer l.Close()
|
||||
c, err := l.Accept()
|
||||
if err != nil {
|
||||
log.G(ctx).
|
||||
WithField("task-id", r.ID).
|
||||
WithError(err).
|
||||
Errorf("failed to accept %s", pipeName)
|
||||
} else {
|
||||
fields := map[string]interface{}{
|
||||
"log-source": source,
|
||||
"task-id": r.ID,
|
||||
}
|
||||
go forwardRunhcsLogs(ctx, c, fields)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
rhcs := newRunhcs(r.Bundle)
|
||||
err = doForwardLogs("runhcs-shim", "runhcs-shim-log-%s", &copts.ShimLog)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = doForwardLogs("runhcs--vm-shim", "runhcs-vm-shim-log-%s", &copts.VMLog)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
err = rhcs.Create(ctx, r.ID, r.Bundle, copts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -408,7 +549,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (*ta
|
||||
|
||||
// Start a process
|
||||
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
|
||||
log.G(ctx).Infof("Start: %s: %s", r.ID, r.ExecID)
|
||||
log.G(ctx).Debugf("Start: %s: %s", r.ID, r.ExecID)
|
||||
var p *process
|
||||
var err error
|
||||
if p, err = s.getProcess(r.ID, r.ExecID); err != nil {
|
||||
@@ -421,7 +562,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
|
||||
return nil, errors.New("cannot start already started container or process")
|
||||
}
|
||||
|
||||
rhcs := newRunhcs(p.bundle)
|
||||
rhcs := newRunhcs(s.debugLog)
|
||||
|
||||
// This is a start/exec
|
||||
if r.ExecID != "" {
|
||||
@@ -431,10 +572,43 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
|
||||
eopts := &runhcs.ExecOpts{
|
||||
IO: p.relay.io,
|
||||
PidFile: pidfilePath,
|
||||
ShimLog: path.Join(p.bundle, fmt.Sprintf("runhcs-%s-shim.log", execFmt)),
|
||||
Detach: true,
|
||||
}
|
||||
|
||||
if rhcs.Debug {
|
||||
doForwardLogs := func(source, pipeName string, opt *string) error {
|
||||
*opt = safePipePrefix + pipeName
|
||||
l, err := winio.ListenPipe(*opt, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
defer l.Close()
|
||||
c, err := l.Accept()
|
||||
if err != nil {
|
||||
log.G(ctx).
|
||||
WithField("task-id", r.ID).
|
||||
WithField("exec-id", r.ExecID).
|
||||
WithError(err).
|
||||
Errorf("failed to accept %s", pipeName)
|
||||
} else {
|
||||
fields := map[string]interface{}{
|
||||
"log-source": source,
|
||||
"task-id": r.ID,
|
||||
"exec-id": r.ExecID,
|
||||
}
|
||||
go forwardRunhcsLogs(ctx, c, fields)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
err = doForwardLogs("runhcs-shim-exec", "runhcs-shim-log-"+execFmt, &eopts.ShimLog)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// ID here is the containerID to exec the process in.
|
||||
err = rhcs.Exec(ctx, r.ID, procConfig, eopts)
|
||||
if err != nil {
|
||||
@@ -490,7 +664,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
|
||||
|
||||
// Delete the initial process and container
|
||||
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
|
||||
log.G(ctx).Infof("Delete: %s: %s", r.ID, r.ExecID)
|
||||
log.G(ctx).Debugf("Delete: %s: %s", r.ID, r.ExecID)
|
||||
|
||||
var p *process
|
||||
var err error
|
||||
@@ -500,7 +674,7 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
|
||||
|
||||
// This is a container
|
||||
if p.cid == p.id {
|
||||
rhs := newRunhcs(p.bundle)
|
||||
rhs := newRunhcs(s.debugLog)
|
||||
dopts := &runhcs.DeleteOpts{
|
||||
Force: true,
|
||||
}
|
||||
@@ -530,11 +704,15 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
|
||||
|
||||
// Pids returns all pids inside the container
|
||||
func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.PidsResponse, error) {
|
||||
log.G(ctx).Debugf("Pids: %s", r.ID)
|
||||
|
||||
return nil, errdefs.ErrNotImplemented
|
||||
}
|
||||
|
||||
// Pause the container
|
||||
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
|
||||
log.G(ctx).Debugf("Pause: %s", r.ID)
|
||||
|
||||
// TODO: Validate that 'id' is actually a valid parent container ID
|
||||
var p *process
|
||||
var err error
|
||||
@@ -542,7 +720,7 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rhcs := newRunhcs(p.bundle)
|
||||
rhcs := newRunhcs(s.debugLog)
|
||||
if err = rhcs.Pause(ctx, p.id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -552,6 +730,8 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E
|
||||
|
||||
// Resume the container
|
||||
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
|
||||
log.G(ctx).Debugf("Resume: %s", r.ID)
|
||||
|
||||
// TODO: Validate that 'id' is actually a valid parent container ID
|
||||
var p *process
|
||||
var err error
|
||||
@@ -559,7 +739,7 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rhcs := newRunhcs(p.bundle)
|
||||
rhcs := newRunhcs(s.debugLog)
|
||||
if err = rhcs.Resume(ctx, p.id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -569,12 +749,14 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes
|
||||
|
||||
// Checkpoint the container
|
||||
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
|
||||
log.G(ctx).Debugf("Checkpoint: %s", r.ID)
|
||||
|
||||
return nil, errdefs.ErrNotImplemented
|
||||
}
|
||||
|
||||
// Kill a process with the provided signal
|
||||
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
|
||||
log.G(ctx).Infof("Kill: %s: %s", r.ID, r.ExecID)
|
||||
log.G(ctx).Debugf("Kill: %s: %s", r.ID, r.ExecID)
|
||||
|
||||
var p *process
|
||||
var err error
|
||||
@@ -584,7 +766,7 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Emp
|
||||
|
||||
// TODO: JTERRY75 runhcs needs r.Signal in string form
|
||||
// TODO: JTERRY75 runhcs support for r.All?
|
||||
rhcs := newRunhcs(p.bundle)
|
||||
rhcs := newRunhcs(s.debugLog)
|
||||
if err = rhcs.Kill(ctx, p.id, strconv.FormatUint(uint64(r.Signal), 10)); err != nil {
|
||||
if !strings.Contains(err.Error(), "container is stopped") {
|
||||
return nil, err
|
||||
@@ -596,7 +778,7 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Emp
|
||||
|
||||
// Exec an additional process inside the container
|
||||
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
|
||||
log.G(ctx).Infof("Exec: %s: %s", r.ID, r.ExecID)
|
||||
log.G(ctx).Debugf("Exec: %s: %s", r.ID, r.ExecID)
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
@@ -679,6 +861,8 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
|
||||
|
||||
// ResizePty of a process
|
||||
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
|
||||
log.G(ctx).Debugf("ResizePty: %s: %s", r.ID, r.ExecID)
|
||||
|
||||
var p *process
|
||||
var err error
|
||||
if p, err = s.getProcess(r.ID, r.ExecID); err != nil {
|
||||
@@ -689,7 +873,7 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*
|
||||
opts := runhcs.ResizeTTYOpts{
|
||||
Pid: &pid,
|
||||
}
|
||||
rhcs := newRunhcs(p.bundle)
|
||||
rhcs := newRunhcs(s.debugLog)
|
||||
if err = rhcs.ResizeTTY(ctx, p.cid, uint16(r.Width), uint16(r.Height), &opts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -699,6 +883,8 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*
|
||||
|
||||
// CloseIO of a process
|
||||
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
|
||||
log.G(ctx).Debugf("CloseIO: %s: %s", r.ID, r.ExecID)
|
||||
|
||||
var p *process
|
||||
var err error
|
||||
if p, err = s.getProcess(r.ID, r.ExecID); err != nil {
|
||||
@@ -710,11 +896,15 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp
|
||||
|
||||
// Update a running container
|
||||
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
|
||||
log.G(ctx).Debugf("Update: %s", r.ID)
|
||||
|
||||
return nil, errdefs.ErrNotImplemented
|
||||
}
|
||||
|
||||
// Wait for a process to exit
|
||||
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
|
||||
log.G(ctx).Debugf("Wait: %s: %s", r.ID, r.ExecID)
|
||||
|
||||
var p *process
|
||||
var err error
|
||||
if p, err = s.getProcess(r.ID, r.ExecID); err != nil {
|
||||
@@ -729,11 +919,15 @@ func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.Wa
|
||||
|
||||
// Stats returns statistics about the running container
|
||||
func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.StatsResponse, error) {
|
||||
log.G(ctx).Debugf("Stats: %s", r.ID)
|
||||
|
||||
return nil, errdefs.ErrNotImplemented
|
||||
}
|
||||
|
||||
// Connect returns the runhcs shim information
|
||||
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
|
||||
log.G(ctx).Debugf("Connect: %s", r.ID)
|
||||
|
||||
return &taskAPI.ConnectResponse{
|
||||
ShimPid: uint32(os.Getpid()),
|
||||
TaskPid: s.processes[s.id].pid,
|
||||
@@ -743,7 +937,11 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
|
||||
|
||||
// Shutdown stops this instance of the runhcs shim
|
||||
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
|
||||
log.G(ctx).Infof("Shutdown: %s", r.ID)
|
||||
log.G(ctx).Debugf("Shutdown: %s", r.ID)
|
||||
|
||||
if s.debugListener != nil {
|
||||
s.debugListener.Close()
|
||||
}
|
||||
|
||||
os.Exit(0)
|
||||
return empty, nil
|
||||
|
||||
Reference in New Issue
Block a user