diff --git a/runtime/v2/runhcs/service.go b/runtime/v2/runhcs/service.go index 484dcff13..9f6a68f2d 100644 --- a/runtime/v2/runhcs/service.go +++ b/runtime/v2/runhcs/service.go @@ -28,6 +28,7 @@ import ( "os" "os/exec" "path" + "path/filepath" "strconv" "strings" "sync" @@ -43,9 +44,11 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/runtime/v2/runhcs/options" "github.com/containerd/containerd/runtime/v2/shim" taskAPI "github.com/containerd/containerd/runtime/v2/task" "github.com/containerd/go-runc" + "github.com/containerd/typeurl" ptypes "github.com/gogo/protobuf/types" oci "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" @@ -63,18 +66,6 @@ var ( empty = &ptypes.Empty{} ) -func newRunhcs(debugLog string) *runhcs.Runhcs { - rhs := &runhcs.Runhcs{ - Debug: debugLog != "", - LogFormat: runhcs.JSON, - Owner: "containerd-runhcs-shim-v1", - } - if rhs.Debug { - rhs.Log = debugLog - } - return rhs -} - // forwardRunhcsLogs copies logs from c and writes them to the ctx logger // upstream. func forwardRunhcsLogs(ctx context.Context, c net.Conn, fields logrus.Fields) { @@ -154,16 +145,31 @@ type service struct { context context.Context - // debugLog if not "" indicates the log pipe path for runhcs.exe to write its logs to. - debugLog string + // debugLog if not "" indicates the log file or pipe path for runhcs.exe to + // write its logs to. + debugLog string + // if `shimOpts.DebugType == options.Opitons_NPIPE` will hold the listener + // for the runhcs.exe to connect to for sending logs. debugListener net.Listener + shimOpts options.Options + id string processes map[string]*process publisher events.Publisher } +func (s *service) newRunhcs() *runhcs.Runhcs { + return &runhcs.Runhcs{ + Debug: s.shimOpts.Debug, + Log: s.debugLog, + LogFormat: runhcs.JSON, + Owner: "containerd-runhcs-shim-v1", + Root: s.shimOpts.RegistryRoot, + } +} + // getProcess attempts to get a process by id. // The caller MUST NOT have locked s.mu previous to calling this function. func (s *service) getProcess(id, execID string) (*process, error) { @@ -196,7 +202,7 @@ func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) return nil, err } // Forcibly shut down any container in this bundle - rhcs := newRunhcs("") + rhcs := s.newRunhcs() dopts := &runhcs.DeleteOpts{ Force: true, } @@ -290,7 +296,7 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI. // This is a container if p.cid == p.id { - rhcs := newRunhcs(s.debugLog) + rhcs := s.newRunhcs() cs, err := rhcs.State(ctx, p.id) if err != nil { return nil, err @@ -406,6 +412,19 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (*ta s.mu.Lock() defer s.mu.Unlock() + _, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, errors.Wrap(err, "create namespace") + } + + if r.Options != nil { + v, err := typeurl.UnmarshalAny(r.Options) + if err != nil { + return nil, err + } + s.shimOpts = *v.(*options.Options) + } + if p := s.processes[r.ID]; p != nil { return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "process %s already exists", r.ID) } @@ -445,38 +464,41 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (*ta pr.close() } }() - // TODO: Parse the real RunHcs Opts r.Options - opts, ok := ctx.Value(shim.OptsKey{}).(shim.Opts) - if ok && opts.Debug { - if s.debugLog == "" { - logPath := safePipePrefix + fmt.Sprintf("runhcs-log-%s", r.ID) - l, err := winio.ListenPipe(logPath, nil) - if err != nil { - return nil, err - } - s.debugLog = logPath - s.debugListener = l - // Accept connections and forward all logs for each runhcs.exe - // invocation - go func() { - for { - c, err := s.debugListener.Accept() - if err != nil { - if err == errorConnectionAborted { - break - } - log.G(ctx).WithError(err).Debug("log accept failure") - // Logrus error locally? - continue - } - fields := map[string]interface{}{ - "log-source": "runhcs", - "task-id": r.ID, - } - go forwardRunhcsLogs(ctx, c, fields) + if s.shimOpts.Debug { + if s.debugLog == "" { + if s.shimOpts.DebugType == options.Options_FILE { + s.debugLog = filepath.Join(r.Bundle, fmt.Sprintf("runhcs-%s.log", r.ID)) + } else { + logPath := safePipePrefix + fmt.Sprintf("runhcs-log-%s", r.ID) + l, err := winio.ListenPipe(logPath, nil) + if err != nil { + return nil, err } - }() + s.debugLog = logPath + s.debugListener = l + + // Accept connections and forward all logs for each runhcs.exe + // invocation + go func() { + for { + c, err := s.debugListener.Accept() + if err != nil { + if err == winio.ErrPipeListenerClosed { + break + } + log.G(ctx).WithError(err).Debug("log accept failure") + // Logrus error locally? + continue + } + fields := map[string]interface{}{ + "log-source": "runhcs", + "task-id": r.ID, + } + go forwardRunhcsLogs(ctx, c, fields) + } + }() + } } } @@ -485,42 +507,47 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (*ta IO: io, PidFile: pidfilePath, } - rhcs := newRunhcs(s.debugLog) + rhcs := s.newRunhcs() if rhcs.Debug { - doForwardLogs := func(source, logPipeFmt string, opt *string) error { - pipeName := fmt.Sprintf(logPipeFmt, r.ID) - *opt = safePipePrefix + pipeName - l, err := winio.ListenPipe(*opt, nil) - if err != nil { - return err - } - go func() { - defer l.Close() - c, err := l.Accept() + if s.shimOpts.DebugType == options.Options_FILE { + copts.ShimLog = filepath.Join(r.Bundle, fmt.Sprintf("runhcs-shim-%s.log", r.ID)) + copts.VMLog = filepath.Join(r.Bundle, fmt.Sprintf("runhcs-vmshim-%s.log", r.ID)) + } else { + doForwardLogs := func(source, logPipeFmt string, opt *string) error { + pipeName := fmt.Sprintf(logPipeFmt, r.ID) + *opt = safePipePrefix + pipeName + l, err := winio.ListenPipe(*opt, nil) if err != nil { - log.G(ctx). - WithField("task-id", r.ID). - WithError(err). - Errorf("failed to accept %s", pipeName) - } else { - fields := map[string]interface{}{ - "log-source": source, - "task-id": r.ID, - } - go forwardRunhcsLogs(ctx, c, fields) + return err } - }() - return nil - } + go func() { + defer l.Close() + c, err := l.Accept() + if err != nil { + log.G(ctx). + WithField("task-id", r.ID). + WithError(err). + Errorf("failed to accept %s", pipeName) + } else { + fields := map[string]interface{}{ + "log-source": source, + "task-id": r.ID, + } + go forwardRunhcsLogs(ctx, c, fields) + } + }() + return nil + } - err = doForwardLogs("runhcs-shim", "runhcs-shim-log-%s", &copts.ShimLog) - if err != nil { - return nil, err - } + err = doForwardLogs("runhcs-shim", "runhcs-shim-log-%s", &copts.ShimLog) + if err != nil { + return nil, err + } - err = doForwardLogs("runhcs--vm-shim", "runhcs-vm-shim-log-%s", &copts.VMLog) - if err != nil { - return nil, err + err = doForwardLogs("runhcs--vm-shim", "runhcs-vm-shim-log-%s", &copts.VMLog) + if err != nil { + return nil, err + } } } err = rhcs.Create(ctx, r.ID, r.Bundle, copts) @@ -571,7 +598,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. return nil, errors.New("cannot start already started container or process") } - rhcs := newRunhcs(s.debugLog) + rhcs := s.newRunhcs() // This is a start/exec if r.ExecID != "" { @@ -585,36 +612,40 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI. } if rhcs.Debug { - doForwardLogs := func(source, pipeName string, opt *string) error { - *opt = safePipePrefix + pipeName - l, err := winio.ListenPipe(*opt, nil) - if err != nil { - return err - } - go func() { - defer l.Close() - c, err := l.Accept() + if s.shimOpts.DebugType == options.Options_FILE { + eopts.ShimLog = filepath.Join(p.bundle, fmt.Sprintf("runhcs-shim-%s.log", execFmt)) + } else { + doForwardLogs := func(source, pipeName string, opt *string) error { + *opt = safePipePrefix + pipeName + l, err := winio.ListenPipe(*opt, nil) if err != nil { - log.G(ctx). - WithField("task-id", r.ID). - WithField("exec-id", r.ExecID). - WithError(err). - Errorf("failed to accept %s", pipeName) - } else { - fields := map[string]interface{}{ - "log-source": source, - "task-id": r.ID, - "exec-id": r.ExecID, - } - go forwardRunhcsLogs(ctx, c, fields) + return err } - }() - return nil - } + go func() { + defer l.Close() + c, err := l.Accept() + if err != nil { + log.G(ctx). + WithField("task-id", r.ID). + WithField("exec-id", r.ExecID). + WithError(err). + Errorf("failed to accept %s", pipeName) + } else { + fields := map[string]interface{}{ + "log-source": source, + "task-id": r.ID, + "exec-id": r.ExecID, + } + go forwardRunhcsLogs(ctx, c, fields) + } + }() + return nil + } - err = doForwardLogs("runhcs-shim-exec", "runhcs-shim-log-"+execFmt, &eopts.ShimLog) - if err != nil { - return nil, err + err = doForwardLogs("runhcs-shim-exec", "runhcs-shim-log-"+execFmt, &eopts.ShimLog) + if err != nil { + return nil, err + } } } @@ -683,7 +714,7 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP // This is a container if p.cid == p.id { - rhs := newRunhcs(s.debugLog) + rhs := s.newRunhcs() dopts := &runhcs.DeleteOpts{ Force: true, } @@ -729,7 +760,7 @@ func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.E return nil, err } - rhcs := newRunhcs(s.debugLog) + rhcs := s.newRunhcs() if err = rhcs.Pause(ctx, p.id); err != nil { return nil, err } @@ -748,7 +779,7 @@ func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes return nil, err } - rhcs := newRunhcs(s.debugLog) + rhcs := s.newRunhcs() if err = rhcs.Resume(ctx, p.id); err != nil { return nil, err } @@ -773,9 +804,8 @@ func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Emp return nil, err } - // TODO: JTERRY75 runhcs needs r.Signal in string form // TODO: JTERRY75 runhcs support for r.All? - rhcs := newRunhcs(s.debugLog) + rhcs := s.newRunhcs() if err = rhcs.Kill(ctx, p.id, strconv.FormatUint(uint64(r.Signal), 10)); err != nil { if !strings.Contains(err.Error(), "container is stopped") { return nil, err @@ -882,7 +912,7 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (* opts := runhcs.ResizeTTYOpts{ Pid: &pid, } - rhcs := newRunhcs(s.debugLog) + rhcs := s.newRunhcs() if err = rhcs.ResizeTTY(ctx, p.cid, uint16(r.Width), uint16(r.Height), &opts); err != nil { return nil, err } diff --git a/runtime/v2/shim/shim_windows.go b/runtime/v2/shim/shim_windows.go index 6e5642620..bd3f62aea 100644 --- a/runtime/v2/shim/shim_windows.go +++ b/runtime/v2/shim/shim_windows.go @@ -27,7 +27,6 @@ import ( "os" "os/exec" "sync" - "syscall" "unsafe" winio "github.com/Microsoft/go-winio" @@ -40,10 +39,6 @@ import ( "golang.org/x/sys/windows" ) -const ( - errorConnectionAborted syscall.Errno = 1236 -) - // setupSignals creates a new signal handler for all signals func setupSignals() (chan os.Signal, error) { signals := make(chan os.Signal, 32) @@ -209,7 +204,7 @@ func (dswl *deferredShimWriteLogger) beginAccept() { dswl.mu.Unlock() c, err := dswl.l.Accept() - if err == errorConnectionAborted { + if err == winio.ErrPipeListenerClosed { dswl.mu.Lock() dswl.aborted = true dswl.l.Close()