Add wait API endpoint for waiting on process exit

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby
2017-09-18 15:54:29 -04:00
parent 2f5dda6229
commit d67763d922
23 changed files with 1314 additions and 355 deletions

View File

@@ -95,3 +95,16 @@ func (p *Process) Start(ctx context.Context) error {
}
return nil
}
func (p *Process) Wait(ctx context.Context) (*runtime.Exit, error) {
r, err := p.t.shim.Wait(ctx, &shim.WaitRequest{
ID: p.id,
})
if err != nil {
return nil, err
}
return &runtime.Exit{
Timestamp: r.ExitedAt,
Status: r.ExitStatus,
}, nil
}

View File

@@ -245,19 +245,22 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
if err != nil {
return nil, errdefs.FromGRPC(err)
}
t, err := newTask(id, namespace, int(cr.Pid), s)
t, err := newTask(id, namespace, int(cr.Pid), s, r.monitor)
if err != nil {
return nil, err
}
if err := r.tasks.Add(ctx, t); err != nil {
return nil, err
}
// after the task is created, add it to the monitor
if err = r.monitor.Monitor(t); err != nil {
if _, err := r.Delete(ctx, t); err != nil {
log.G(ctx).WithError(err).Error("deleting task after failed monitor")
// after the task is created, add it to the monitor if it has a cgroup
// this can be different on a checkpoint/restore
if t.cg != nil {
if err = r.monitor.Monitor(t); err != nil {
if _, err := r.Delete(ctx, t); err != nil {
log.G(ctx).WithError(err).Error("deleting task after failed monitor")
}
return nil, err
}
return nil, err
}
return t, nil
}
@@ -274,9 +277,17 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er
if err := r.monitor.Stop(lc); err != nil {
return nil, err
}
bundle := loadBundle(
lc.id,
filepath.Join(r.state, namespace, lc.id),
filepath.Join(r.root, namespace, lc.id),
)
rsp, err := lc.shim.Delete(ctx, empty)
if err != nil {
if cerr := r.cleanupAfterDeadShim(ctx, bundle, namespace, c.ID(), lc.pid, nil); cerr != nil {
log.G(ctx).WithError(err).Error("unable to cleanup task")
}
return nil, errdefs.FromGRPC(err)
}
r.tasks.Delete(ctx, lc)
@@ -284,11 +295,6 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er
log.G(ctx).WithError(err).Error("failed to kill shim")
}
bundle := loadBundle(
lc.id,
filepath.Join(r.state, namespace, lc.id),
filepath.Join(r.root, namespace, lc.id),
)
if err := bundle.Delete(); err != nil {
log.G(ctx).WithError(err).Error("failed to delete bundle")
}
@@ -359,7 +365,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
continue
}
t, err := newTask(id, ns, pid, s)
t, err := newTask(id, ns, pid, s, r.monitor)
if err != nil {
log.G(ctx).WithError(err).Error("loading task type")
continue

View File

@@ -24,7 +24,7 @@ import (
)
type execProcess struct {
sync.WaitGroup
wg sync.WaitGroup
processState
@@ -41,7 +41,8 @@ type execProcess struct {
path string
spec specs.Process
parent *initProcess
parent *initProcess
waitBlock chan struct{}
}
func newExecProcess(context context.Context, path string, r *shimapi.ExecProcessRequest, parent *initProcess, id string) (process, error) {
@@ -66,11 +67,16 @@ func newExecProcess(context context.Context, path string, r *shimapi.ExecProcess
stderr: r.Stderr,
terminal: r.Terminal,
},
waitBlock: make(chan struct{}),
}
e.processState = &execCreatedState{p: e}
return e, nil
}
func (e *execProcess) Wait() {
<-e.waitBlock
}
func (e *execProcess) ID() string {
return e.id
}
@@ -97,13 +103,18 @@ func (e *execProcess) setExited(status int) {
e.status = status
e.exited = time.Now()
e.parent.platform.shutdownConsole(context.Background(), e.console)
e.Wait()
close(e.waitBlock)
}
func (e *execProcess) delete(ctx context.Context) error {
e.wg.Wait()
if e.io != nil {
for _, c := range e.closers {
c.Close()
}
e.io.Close()
}
return nil
}
func (e *execProcess) resize(ws console.WinSize) error {
@@ -175,11 +186,11 @@ func (e *execProcess) start(ctx context.Context) (err error) {
if err != nil {
return errors.Wrap(err, "failed to retrieve console master")
}
if e.console, err = e.parent.platform.copyConsole(ctx, console, e.stdio.stdin, e.stdio.stdout, e.stdio.stderr, &e.WaitGroup, &copyWaitGroup); err != nil {
if e.console, err = e.parent.platform.copyConsole(ctx, console, e.stdio.stdin, e.stdio.stdout, e.stdio.stderr, &e.wg, &copyWaitGroup); err != nil {
return errors.Wrap(err, "failed to start console copy")
}
} else if !e.stdio.isNull() {
if err := copyPipes(ctx, e.io, e.stdio.stdin, e.stdio.stdout, e.stdio.stderr, &e.WaitGroup, &copyWaitGroup); err != nil {
if err := copyPipes(ctx, e.io, e.stdio.stdin, e.stdio.stdout, e.stdio.stderr, &e.wg, &copyWaitGroup); err != nil {
return errors.Wrap(err, "failed to start io pipe copy")
}
}

View File

@@ -46,6 +46,9 @@ func (s *execCreatedState) Start(ctx context.Context) error {
func (s *execCreatedState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.delete(ctx); err != nil {
return err
}
return s.transition("deleted")
}
@@ -151,6 +154,9 @@ func (s *execStoppedState) Start(ctx context.Context) error {
func (s *execStoppedState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.delete(ctx); err != nil {
return err
}
return s.transition("deleted")
}

View File

@@ -30,7 +30,7 @@ import (
const InitPidFile = "init.pid"
type initProcess struct {
sync.WaitGroup
wg sync.WaitGroup
initState
// mu is used to ensure that `Start()` and `Exited()` calls return in
@@ -39,6 +39,8 @@ type initProcess struct {
// the reaper interface.
mu sync.Mutex
waitBlock chan struct{}
workDir string
id string
@@ -113,8 +115,10 @@ func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskR
stderr: r.Stderr,
terminal: r.Terminal,
},
rootfs: rootfs,
workDir: s.config.WorkDir,
rootfs: rootfs,
workDir: s.config.WorkDir,
status: 0,
waitBlock: make(chan struct{}),
}
p.initState = &createdState{p: p}
var (
@@ -149,22 +153,24 @@ func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskR
Detach: true,
NoSubreaper: true,
}
if _, err := p.runtime.Restore(context, r.ID, r.Bundle, opts); err != nil {
return nil, p.runtimeError(err, "OCI runtime restore failed")
}
} else {
opts := &runc.CreateOpts{
PidFile: pidFile,
IO: p.io,
NoPivot: options.NoPivotRoot,
NoNewKeyring: options.NoNewKeyring,
}
if socket != nil {
opts.ConsoleSocket = socket
}
if err := p.runtime.Create(context, r.ID, r.Bundle, opts); err != nil {
return nil, p.runtimeError(err, "OCI runtime create failed")
p.initState = &createdCheckpointState{
p: p,
opts: opts,
}
success = true
return p, nil
}
opts := &runc.CreateOpts{
PidFile: pidFile,
IO: p.io,
NoPivot: options.NoPivotRoot,
NoNewKeyring: options.NoNewKeyring,
}
if socket != nil {
opts.ConsoleSocket = socket
}
if err := p.runtime.Create(context, r.ID, r.Bundle, opts); err != nil {
return nil, p.runtimeError(err, "OCI runtime create failed")
}
if r.Stdin != "" {
sc, err := fifo.OpenFifo(context, r.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
@@ -180,13 +186,13 @@ func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskR
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve console master")
}
console, err = s.platform.copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &p.WaitGroup, &copyWaitGroup)
console, err = s.platform.copyConsole(context, console, r.Stdin, r.Stdout, r.Stderr, &p.wg, &copyWaitGroup)
if err != nil {
return nil, errors.Wrap(err, "failed to start console copy")
}
p.console = console
} else if !hasNoIO(r) {
if err := copyPipes(context, p.io, r.Stdin, r.Stdout, r.Stderr, &p.WaitGroup, &copyWaitGroup); err != nil {
if err := copyPipes(context, p.io, r.Stdin, r.Stdout, r.Stderr, &p.wg, &copyWaitGroup); err != nil {
return nil, errors.Wrap(err, "failed to start io pipe copy")
}
}
@@ -201,6 +207,10 @@ func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskR
return p, nil
}
func (p *initProcess) Wait() {
<-p.waitBlock
}
func (p *initProcess) ID() string {
return p.id
}
@@ -240,14 +250,15 @@ func (p *initProcess) start(context context.Context) error {
}
func (p *initProcess) setExited(status int) {
p.status = status
p.exited = time.Now()
p.status = status
p.platform.shutdownConsole(context.Background(), p.console)
close(p.waitBlock)
}
func (p *initProcess) delete(context context.Context) error {
p.killAll(context)
p.Wait()
p.wg.Wait()
err := p.runtime.Delete(context, p.id, nil)
// ignore errors if a runtime has already deleted the process
// but we still hold metadata and pipes

View File

@@ -4,10 +4,14 @@ package shim
import (
"context"
"sync"
"syscall"
"github.com/containerd/console"
"github.com/containerd/containerd/errdefs"
shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/fifo"
runc "github.com/containerd/go-runc"
"github.com/pkg/errors"
)
@@ -109,6 +113,120 @@ func (s *createdState) SetExited(status int) {
}
}
type createdCheckpointState struct {
p *initProcess
opts *runc.RestoreOpts
}
func (s *createdCheckpointState) transition(name string) error {
switch name {
case "running":
s.p.initState = &runningState{p: s.p}
case "stopped":
s.p.initState = &stoppedState{p: s.p}
case "deleted":
s.p.initState = &deletedState{}
default:
return errors.Errorf("invalid state transition %q to %q", stateName(s), name)
}
return nil
}
func (s *createdCheckpointState) Pause(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot pause task in created state")
}
func (s *createdCheckpointState) Resume(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot resume task in created state")
}
func (s *createdCheckpointState) Update(context context.Context, r *shimapi.UpdateTaskRequest) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.update(context, r)
}
func (s *createdCheckpointState) Checkpoint(context context.Context, r *shimapi.CheckpointTaskRequest) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return errors.Errorf("cannot checkpoint a task in created state")
}
func (s *createdCheckpointState) Resize(ws console.WinSize) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.resize(ws)
}
func (s *createdCheckpointState) Start(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
p := s.p
if _, err := s.p.runtime.Restore(ctx, p.id, p.bundle, s.opts); err != nil {
return p.runtimeError(err, "OCI runtime restore failed")
}
sio := p.stdio
if sio.stdin != "" {
sc, err := fifo.OpenFifo(ctx, sio.stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", sio.stdin)
}
p.stdin = sc
p.closers = append(p.closers, sc)
}
var copyWaitGroup sync.WaitGroup
if !sio.isNull() {
if err := copyPipes(ctx, p.io, sio.stdin, sio.stdout, sio.stderr, &p.wg, &copyWaitGroup); err != nil {
return errors.Wrap(err, "failed to start io pipe copy")
}
}
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(s.opts.PidFile)
if err != nil {
return errors.Wrap(err, "failed to retrieve OCI runtime container pid")
}
p.pid = pid
return s.transition("running")
}
func (s *createdCheckpointState) Delete(ctx context.Context) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
if err := s.p.delete(ctx); err != nil {
return err
}
return s.transition("deleted")
}
func (s *createdCheckpointState) Kill(ctx context.Context, sig uint32, all bool) error {
s.p.mu.Lock()
defer s.p.mu.Unlock()
return s.p.kill(ctx, sig, all)
}
func (s *createdCheckpointState) SetExited(status int) {
s.p.mu.Lock()
defer s.p.mu.Unlock()
s.p.setExited(status)
if err := s.transition("stopped"); err != nil {
panic(err)
}
}
type runningState struct {
p *initProcess
}
@@ -278,6 +396,7 @@ func (s *pausedState) SetExited(status int) {
if err := s.transition("stopped"); err != nil {
panic(err)
}
}
type stoppedState struct {

View File

@@ -86,3 +86,7 @@ func (c *local) ShimInfo(ctx context.Context, in *google_protobuf.Empty, opts ..
func (c *local) Update(ctx context.Context, in *shimapi.UpdateTaskRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) {
return c.s.Update(ctx, in)
}
func (c *local) Wait(ctx context.Context, in *shimapi.WaitRequest, opts ...grpc.CallOption) (*shimapi.WaitResponse, error) {
return c.s.Wait(ctx, in)
}

View File

@@ -38,7 +38,9 @@ type process interface {
// Stdio returns io information for the container
Stdio() stdio
// Status returns the process status
Status(ctx context.Context) (string, error)
Status(context.Context) (string, error)
// Wait blocks until the process has exited
Wait()
}
type processState interface {
@@ -58,7 +60,7 @@ func stateName(v interface{}) string {
switch v.(type) {
case *runningState, *execRunningState:
return "running"
case *createdState, *execCreatedState:
case *createdState, *execCreatedState, *createdCheckpointState:
return "created"
case *pausedState:
return "paused"

View File

@@ -46,7 +46,7 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) {
config: config,
context: context,
processes: make(map[string]process),
events: make(chan interface{}, 4096),
events: make(chan interface{}, 128),
ec: reaper.Default.Subscribe(),
}
go s.processExits()
@@ -111,7 +111,9 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh
}
func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.StartResponse, error) {
p := s.getProcess(r.ID)
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[r.ID]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s not found", r.ID)
}
@@ -138,7 +140,9 @@ func (s *Service) Start(ctx context.Context, r *shimapi.StartRequest) (*shimapi.
}
func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimapi.DeleteResponse, error) {
p := s.getProcess(s.id)
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
@@ -146,7 +150,7 @@ func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimap
if err := p.Delete(ctx); err != nil {
return nil, err
}
s.deleteProcess(p.ID())
delete(s.processes, s.id)
s.platform.close()
s.events <- &eventsapi.TaskDelete{
ContainerID: s.id,
@@ -162,17 +166,19 @@ func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimap
}
func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessRequest) (*shimapi.DeleteResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
if r.ID == s.id {
return nil, grpc.Errorf(codes.InvalidArgument, "cannot delete init process with DeleteProcess")
}
p := s.getProcess(r.ID)
p := s.processes[r.ID]
if p == nil {
return nil, errors.Wrapf(errdefs.ErrNotFound, "process %s", r.ID)
}
if err := p.Delete(ctx); err != nil {
return nil, err
}
s.deleteProcess(r.ID)
delete(s.processes, r.ID)
return &shimapi.DeleteResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
@@ -207,6 +213,8 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*goo
}
func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*google_protobuf.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
if r.ID == "" {
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "id not provided")
}
@@ -214,7 +222,7 @@ func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*
Width: uint16(r.Width),
Height: uint16(r.Height),
}
p := s.getProcess(r.ID)
p := s.processes[r.ID]
if p == nil {
return nil, errors.Errorf("process does not exist %s", r.ID)
}
@@ -225,7 +233,9 @@ func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*
}
func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) {
p := s.getProcess(r.ID)
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[r.ID]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s not found", r.ID)
}
@@ -262,7 +272,9 @@ func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.
}
func (s *Service) Pause(ctx context.Context, r *google_protobuf.Empty) (*google_protobuf.Empty, error) {
p := s.getProcess(s.id)
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
@@ -276,7 +288,9 @@ func (s *Service) Pause(ctx context.Context, r *google_protobuf.Empty) (*google_
}
func (s *Service) Resume(ctx context.Context, r *google_protobuf.Empty) (*google_protobuf.Empty, error) {
p := s.getProcess(s.id)
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
@@ -290,8 +304,10 @@ func (s *Service) Resume(ctx context.Context, r *google_protobuf.Empty) (*google
}
func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*google_protobuf.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
if r.ID == "" {
p := s.getProcess(s.id)
p := s.processes[s.id]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
@@ -301,7 +317,7 @@ func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*google_pro
return empty, nil
}
p := s.getProcess(r.ID)
p := s.processes[r.ID]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s not found", r.ID)
}
@@ -322,7 +338,9 @@ func (s *Service) ListPids(ctx context.Context, r *shimapi.ListPidsRequest) (*sh
}
func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*google_protobuf.Empty, error) {
p := s.getProcess(r.ID)
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[r.ID]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process does not exist %s", r.ID)
}
@@ -335,7 +353,9 @@ func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*goog
}
func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskRequest) (*google_protobuf.Empty, error) {
p := s.getProcess(s.id)
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
@@ -355,7 +375,9 @@ func (s *Service) ShimInfo(ctx context.Context, r *google_protobuf.Empty) (*shim
}
func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*google_protobuf.Empty, error) {
p := s.getProcess(s.id)
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
@@ -365,23 +387,19 @@ func (s *Service) Update(ctx context.Context, r *shimapi.UpdateTaskRequest) (*go
return empty, nil
}
func (s *Service) addProcess(id string, p process) {
func (s *Service) Wait(ctx context.Context, r *shimapi.WaitRequest) (*shimapi.WaitResponse, error) {
s.mu.Lock()
s.processes[id] = p
p := s.processes[r.ID]
s.mu.Unlock()
}
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
p.Wait()
func (s *Service) getProcess(id string) process {
s.mu.Lock()
p := s.processes[id]
s.mu.Unlock()
return p
}
func (s *Service) deleteProcess(id string) {
s.mu.Lock()
delete(s.processes, id)
s.mu.Unlock()
return &shimapi.WaitResponse{
ExitStatus: uint32(p.ExitStatus()),
ExitedAt: p.ExitedAt(),
}, nil
}
func (s *Service) processExits() {
@@ -402,7 +420,6 @@ func (s *Service) checkProcesses(e runc.Exit) {
Error("failed to kill init's children")
}
}
p.SetExited(e.Status)
s.events <- &eventsapi.TaskExit{
ContainerID: s.id,
@@ -417,7 +434,9 @@ func (s *Service) checkProcesses(e runc.Exit) {
}
func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
p := s.getProcess(s.id)
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
if p == nil {
return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created")
}
@@ -436,7 +455,7 @@ func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, er
func (s *Service) forward(publisher events.Publisher) {
for e := range s.events {
if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil {
log.G(s.context).WithError(err).Error("post event")
logrus.WithError(err).Error("post event")
}
}
}
@@ -464,7 +483,7 @@ func getTopic(ctx context.Context, e interface{}) string {
case *eventsapi.TaskCheckpointed:
return runtime.TaskCheckpointedEventTopic
default:
log.G(ctx).Warnf("no topic for type %#v", e)
logrus.Warnf("no topic for type %#v", e)
}
return runtime.TaskUnknownTopic
}

View File

@@ -27,6 +27,8 @@
UpdateTaskRequest
StartRequest
StartResponse
WaitRequest
WaitResponse
*/
package shim
@@ -242,6 +244,23 @@ func (m *StartResponse) Reset() { *m = StartResponse{} }
func (*StartResponse) ProtoMessage() {}
func (*StartResponse) Descriptor() ([]byte, []int) { return fileDescriptorShim, []int{17} }
type WaitRequest struct {
ID string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
}
func (m *WaitRequest) Reset() { *m = WaitRequest{} }
func (*WaitRequest) ProtoMessage() {}
func (*WaitRequest) Descriptor() ([]byte, []int) { return fileDescriptorShim, []int{18} }
type WaitResponse struct {
ExitStatus uint32 `protobuf:"varint,1,opt,name=exit_status,json=exitStatus,proto3" json:"exit_status,omitempty"`
ExitedAt time.Time `protobuf:"bytes,2,opt,name=exited_at,json=exitedAt,stdtime" json:"exited_at"`
}
func (m *WaitResponse) Reset() { *m = WaitResponse{} }
func (*WaitResponse) ProtoMessage() {}
func (*WaitResponse) Descriptor() ([]byte, []int) { return fileDescriptorShim, []int{19} }
func init() {
proto.RegisterType((*CreateTaskRequest)(nil), "containerd.runtime.linux.shim.v1.CreateTaskRequest")
proto.RegisterType((*CreateTaskResponse)(nil), "containerd.runtime.linux.shim.v1.CreateTaskResponse")
@@ -261,6 +280,8 @@ func init() {
proto.RegisterType((*UpdateTaskRequest)(nil), "containerd.runtime.linux.shim.v1.UpdateTaskRequest")
proto.RegisterType((*StartRequest)(nil), "containerd.runtime.linux.shim.v1.StartRequest")
proto.RegisterType((*StartResponse)(nil), "containerd.runtime.linux.shim.v1.StartResponse")
proto.RegisterType((*WaitRequest)(nil), "containerd.runtime.linux.shim.v1.WaitRequest")
proto.RegisterType((*WaitResponse)(nil), "containerd.runtime.linux.shim.v1.WaitResponse")
}
// Reference imports to suppress errors if they are not otherwise used.
@@ -291,6 +312,7 @@ type ShimClient interface {
// ShimInfo returns information about the shim.
ShimInfo(ctx context.Context, in *google_protobuf1.Empty, opts ...grpc.CallOption) (*ShimInfoResponse, error)
Update(ctx context.Context, in *UpdateTaskRequest, opts ...grpc.CallOption) (*google_protobuf1.Empty, error)
Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (*WaitResponse, error)
}
type shimClient struct {
@@ -436,6 +458,15 @@ func (c *shimClient) Update(ctx context.Context, in *UpdateTaskRequest, opts ...
return out, nil
}
func (c *shimClient) Wait(ctx context.Context, in *WaitRequest, opts ...grpc.CallOption) (*WaitResponse, error) {
out := new(WaitResponse)
err := grpc.Invoke(ctx, "/containerd.runtime.linux.shim.v1.Shim/Wait", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Shim service
type ShimServer interface {
@@ -456,6 +487,7 @@ type ShimServer interface {
// ShimInfo returns information about the shim.
ShimInfo(context.Context, *google_protobuf1.Empty) (*ShimInfoResponse, error)
Update(context.Context, *UpdateTaskRequest) (*google_protobuf1.Empty, error)
Wait(context.Context, *WaitRequest) (*WaitResponse, error)
}
func RegisterShimServer(s *grpc.Server, srv ShimServer) {
@@ -732,6 +764,24 @@ func _Shim_Update_Handler(srv interface{}, ctx context.Context, dec func(interfa
return interceptor(ctx, in, info, handler)
}
func _Shim_Wait_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(WaitRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ShimServer).Wait(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/containerd.runtime.linux.shim.v1.Shim/Wait",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ShimServer).Wait(ctx, req.(*WaitRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Shim_serviceDesc = grpc.ServiceDesc{
ServiceName: "containerd.runtime.linux.shim.v1.Shim",
HandlerType: (*ShimServer)(nil),
@@ -796,6 +846,10 @@ var _Shim_serviceDesc = grpc.ServiceDesc{
MethodName: "Update",
Handler: _Shim_Update_Handler,
},
{
MethodName: "Wait",
Handler: _Shim_Wait_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "github.com/containerd/containerd/linux/shim/v1/shim.proto",
@@ -1471,6 +1525,61 @@ func (m *StartResponse) MarshalTo(dAtA []byte) (int, error) {
return i, nil
}
func (m *WaitRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *WaitRequest) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.ID) > 0 {
dAtA[i] = 0xa
i++
i = encodeVarintShim(dAtA, i, uint64(len(m.ID)))
i += copy(dAtA[i:], m.ID)
}
return i, nil
}
func (m *WaitResponse) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *WaitResponse) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.ExitStatus != 0 {
dAtA[i] = 0x8
i++
i = encodeVarintShim(dAtA, i, uint64(m.ExitStatus))
}
dAtA[i] = 0x12
i++
i = encodeVarintShim(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.ExitedAt)))
n9, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.ExitedAt, dAtA[i:])
if err != nil {
return 0, err
}
i += n9
return i, nil
}
func encodeFixed64Shim(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
dAtA[offset+1] = uint8(v >> 8)
@@ -1791,6 +1900,27 @@ func (m *StartResponse) Size() (n int) {
return n
}
func (m *WaitRequest) Size() (n int) {
var l int
_ = l
l = len(m.ID)
if l > 0 {
n += 1 + l + sovShim(uint64(l))
}
return n
}
func (m *WaitResponse) Size() (n int) {
var l int
_ = l
if m.ExitStatus != 0 {
n += 1 + sovShim(uint64(m.ExitStatus))
}
l = github_com_gogo_protobuf_types.SizeOfStdTime(m.ExitedAt)
n += 1 + l + sovShim(uint64(l))
return n
}
func sovShim(x uint64) (n int) {
for {
n++
@@ -2016,6 +2146,27 @@ func (this *StartResponse) String() string {
}, "")
return s
}
func (this *WaitRequest) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&WaitRequest{`,
`ID:` + fmt.Sprintf("%v", this.ID) + `,`,
`}`,
}, "")
return s
}
func (this *WaitResponse) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&WaitResponse{`,
`ExitStatus:` + fmt.Sprintf("%v", this.ExitStatus) + `,`,
`ExitedAt:` + strings.Replace(strings.Replace(this.ExitedAt.String(), "Timestamp", "google_protobuf3.Timestamp", 1), `&`, ``, 1) + `,`,
`}`,
}, "")
return s
}
func valueToStringShim(v interface{}) string {
rv := reflect.ValueOf(v)
if rv.IsNil() {
@@ -4272,6 +4423,184 @@ func (m *StartResponse) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *WaitRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowShim
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: WaitRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: WaitRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowShim
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthShim
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.ID = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipShim(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthShim
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *WaitResponse) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowShim
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: WaitResponse: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: WaitResponse: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field ExitStatus", wireType)
}
m.ExitStatus = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowShim
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.ExitStatus |= (uint32(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ExitedAt", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowShim
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthShim
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.ExitedAt, dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipShim(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthShim
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipShim(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
@@ -4382,73 +4711,76 @@ func init() {
}
var fileDescriptorShim = []byte{
// 1083 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x57, 0x4f, 0x6f, 0x1b, 0x45,
0x14, 0xef, 0xfa, 0xbf, 0x9f, 0x71, 0x48, 0x86, 0x34, 0x6c, 0x5d, 0xc9, 0xb1, 0xf6, 0x10, 0x19,
0xa1, 0xae, 0x89, 0x83, 0x5a, 0x0a, 0x12, 0x52, 0x92, 0x56, 0x28, 0x82, 0xa8, 0xd1, 0x26, 0x05,
0x04, 0x42, 0xd1, 0xc6, 0x3b, 0xb1, 0x47, 0xb1, 0x77, 0xb6, 0x3b, 0xb3, 0x21, 0xe1, 0xc4, 0x89,
0x33, 0x1f, 0x87, 0x8f, 0x90, 0x03, 0x07, 0x8e, 0x9c, 0x0a, 0x8d, 0xc4, 0x91, 0xef, 0x80, 0xe6,
0x8f, 0xed, 0xb5, 0x9d, 0x65, 0xd7, 0xbd, 0xc4, 0xf3, 0x66, 0x7e, 0xef, 0xcd, 0x9b, 0xf7, 0x7b,
0x7f, 0x36, 0xf0, 0xb4, 0x4f, 0xf8, 0x20, 0x3a, 0xb3, 0x7b, 0x74, 0xd4, 0xe9, 0x51, 0x9f, 0xbb,
0xc4, 0xc7, 0xa1, 0x17, 0x5f, 0x0e, 0x89, 0x1f, 0x5d, 0x75, 0xd8, 0x80, 0x8c, 0x3a, 0x97, 0xdb,
0xf2, 0xd7, 0x0e, 0x42, 0xca, 0x29, 0x6a, 0x4d, 0x41, 0x76, 0x18, 0xf9, 0x9c, 0x8c, 0xb0, 0x2d,
0xc1, 0xb6, 0x04, 0x5d, 0x6e, 0x37, 0x1e, 0xf4, 0x29, 0xed, 0x0f, 0x71, 0x47, 0xe2, 0xcf, 0xa2,
0xf3, 0x8e, 0xeb, 0x5f, 0x2b, 0xe5, 0xc6, 0xc3, 0xf9, 0x23, 0x3c, 0x0a, 0xf8, 0xf8, 0x70, 0xbd,
0x4f, 0xfb, 0x54, 0x2e, 0x3b, 0x62, 0xa5, 0x77, 0x37, 0xe7, 0x55, 0xc4, 0x8d, 0x8c, 0xbb, 0xa3,
0x40, 0x03, 0x1e, 0xa7, 0xbe, 0xc5, 0x0d, 0x48, 0x87, 0x5f, 0x07, 0x98, 0x75, 0x46, 0x34, 0xf2,
0xb9, 0xd6, 0xfb, 0x74, 0x09, 0x3d, 0xee, 0xb2, 0x0b, 0xf9, 0x47, 0xe9, 0x5a, 0xff, 0xe6, 0x60,
0x6d, 0x3f, 0xc4, 0x2e, 0xc7, 0x27, 0x2e, 0xbb, 0x70, 0xf0, 0xab, 0x08, 0x33, 0x8e, 0x36, 0x20,
0x47, 0x3c, 0xd3, 0x68, 0x19, 0xed, 0xea, 0x5e, 0xe9, 0xf6, 0xf5, 0x66, 0xee, 0xe0, 0x99, 0x93,
0x23, 0x1e, 0xda, 0x80, 0xd2, 0x59, 0xe4, 0x7b, 0x43, 0x6c, 0xe6, 0xc4, 0x99, 0xa3, 0x25, 0x64,
0x42, 0x59, 0x47, 0xd0, 0xcc, 0xcb, 0x83, 0xb1, 0x88, 0x3a, 0x50, 0x0a, 0x29, 0xe5, 0xe7, 0xcc,
0x2c, 0xb4, 0xf2, 0xed, 0x5a, 0xf7, 0x7d, 0x3b, 0x16, 0x75, 0xe9, 0x92, 0x7d, 0x28, 0x9e, 0xe2,
0x68, 0x18, 0x6a, 0x40, 0x85, 0xe3, 0x70, 0x44, 0x7c, 0x77, 0x68, 0x16, 0x5b, 0x46, 0xbb, 0xe2,
0x4c, 0x64, 0xb4, 0x0e, 0x45, 0xc6, 0x3d, 0xe2, 0x9b, 0x25, 0x79, 0x89, 0x12, 0x84, 0x53, 0x8c,
0x7b, 0x34, 0xe2, 0x66, 0x59, 0x39, 0xa5, 0x24, 0xbd, 0x8f, 0xc3, 0xd0, 0xac, 0x4c, 0xf6, 0x71,
0x18, 0xa2, 0x26, 0x40, 0x6f, 0x80, 0x7b, 0x17, 0x01, 0x25, 0x3e, 0x37, 0xab, 0xf2, 0x2c, 0xb6,
0x83, 0x3e, 0x84, 0xb5, 0xc0, 0x0d, 0xb1, 0xcf, 0x4f, 0x63, 0x30, 0x90, 0xb0, 0x55, 0x75, 0xb0,
0x3f, 0x05, 0xdb, 0x50, 0xa6, 0x01, 0x27, 0xd4, 0x67, 0x66, 0xad, 0x65, 0xb4, 0x6b, 0xdd, 0x75,
0x5b, 0xd1, 0x6c, 0x8f, 0x69, 0xb6, 0x77, 0xfd, 0x6b, 0x67, 0x0c, 0xb2, 0xb6, 0x00, 0xc5, 0xc3,
0xcd, 0x02, 0xea, 0x33, 0x8c, 0x56, 0x21, 0x1f, 0xe8, 0x80, 0xd7, 0x1d, 0xb1, 0xb4, 0x7e, 0x31,
0x60, 0xe5, 0x19, 0x1e, 0x62, 0x8e, 0x93, 0x41, 0x68, 0x13, 0x6a, 0xf8, 0x8a, 0xf0, 0x53, 0xc6,
0x5d, 0x1e, 0x31, 0xc9, 0x49, 0xdd, 0x01, 0xb1, 0x75, 0x2c, 0x77, 0xd0, 0x2e, 0x54, 0x85, 0x84,
0xbd, 0x53, 0x97, 0x4b, 0x66, 0x6a, 0xdd, 0xc6, 0x82, 0x7f, 0x27, 0xe3, 0x34, 0xdc, 0xab, 0xdc,
0xbc, 0xde, 0xbc, 0xf7, 0xeb, 0x5f, 0x9b, 0x86, 0x53, 0x51, 0x6a, 0xbb, 0xdc, 0xb2, 0x61, 0x5d,
0xf9, 0x71, 0x14, 0xd2, 0x1e, 0x66, 0x2c, 0x25, 0x45, 0xac, 0xdf, 0x0c, 0x40, 0xcf, 0xaf, 0x70,
0x2f, 0x1b, 0x7c, 0x86, 0xee, 0x5c, 0x12, 0xdd, 0xf9, 0xbb, 0xe9, 0x2e, 0x24, 0xd0, 0x5d, 0x9c,
0xa1, 0xbb, 0x0d, 0x05, 0x16, 0xe0, 0x9e, 0xcc, 0x99, 0x24, 0x7a, 0x24, 0xc2, 0xba, 0x0f, 0xef,
0xcd, 0x78, 0xae, 0xe2, 0x6e, 0x7d, 0x0b, 0xab, 0x0e, 0x66, 0xe4, 0x27, 0x7c, 0xc4, 0xaf, 0xd3,
0x9e, 0xb3, 0x0e, 0xc5, 0x1f, 0x89, 0xc7, 0x07, 0x9a, 0x0b, 0x25, 0x08, 0xd7, 0x06, 0x98, 0xf4,
0x07, 0x8a, 0x83, 0xba, 0xa3, 0x25, 0x6b, 0x0b, 0xde, 0x11, 0x44, 0xe1, 0xb4, 0x98, 0xfe, 0x9e,
0x83, 0xba, 0x06, 0xea, 0x5c, 0x58, 0xb6, 0x40, 0x75, 0xee, 0xe4, 0xa7, 0xb9, 0xb3, 0x23, 0xc2,
0x25, 0xd3, 0x46, 0x84, 0x71, 0xa5, 0xfb, 0x30, 0x5e, 0x98, 0x97, 0xdb, 0xba, 0x36, 0x55, 0x1e,
0x39, 0x1a, 0x3a, 0x65, 0xa4, 0x78, 0x37, 0x23, 0xa5, 0x04, 0x46, 0xca, 0x33, 0x8c, 0xc4, 0x39,
0xaf, 0xcc, 0x71, 0x3e, 0x97, 0xd2, 0xd5, 0xff, 0x4f, 0x69, 0x78, 0xab, 0x94, 0x7e, 0x01, 0xb5,
0x2f, 0xc9, 0x70, 0x98, 0xa1, 0xd9, 0x31, 0xd2, 0x1f, 0x27, 0x66, 0xdd, 0xd1, 0x92, 0x88, 0xa5,
0x3b, 0x1c, 0xca, 0x58, 0x56, 0x1c, 0xb1, 0xb4, 0x3e, 0x87, 0x95, 0xfd, 0x21, 0x65, 0xf8, 0xe0,
0x45, 0x86, 0xfc, 0x50, 0x01, 0x54, 0xb9, 0xae, 0x04, 0xeb, 0x03, 0x78, 0xf7, 0x2b, 0xc2, 0xf8,
0x11, 0xf1, 0x52, 0xcb, 0x6b, 0x0b, 0x56, 0xa7, 0x50, 0x9d, 0x0c, 0x08, 0x0a, 0x01, 0xf1, 0x98,
0x69, 0xb4, 0xf2, 0xed, 0xba, 0x23, 0xd7, 0xd6, 0xf7, 0x70, 0x7f, 0xda, 0xa5, 0xe2, 0xad, 0x5d,
0x80, 0x5d, 0x3e, 0x50, 0xa6, 0x1d, 0xb9, 0x8e, 0x37, 0xb1, 0x5c, 0x96, 0x26, 0xf6, 0x08, 0x56,
0x8f, 0x07, 0x64, 0x74, 0xe0, 0x9f, 0xd3, 0x89, 0x13, 0x0f, 0xa0, 0x22, 0xc6, 0xe6, 0xe9, 0xb4,
0x45, 0x95, 0x85, 0x7c, 0x44, 0x3c, 0xeb, 0x0b, 0x58, 0x7b, 0x19, 0x78, 0x73, 0x23, 0xa6, 0x0b,
0xd5, 0x10, 0x33, 0x1a, 0x85, 0x3d, 0xcc, 0xa4, 0x42, 0xd2, 0xad, 0x53, 0x98, 0xae, 0x97, 0x90,
0xa7, 0x05, 0xe9, 0xa9, 0x2c, 0x17, 0x81, 0x4b, 0x29, 0x17, 0x5d, 0x16, 0xb9, 0x49, 0x59, 0x74,
0xff, 0x01, 0x28, 0x88, 0xb7, 0xa1, 0x01, 0x14, 0x65, 0xc9, 0x21, 0xdb, 0x4e, 0xfb, 0x4e, 0xb0,
0xe3, 0x45, 0xdc, 0xe8, 0x64, 0xc6, 0x6b, 0xe7, 0x18, 0x94, 0xd4, 0x48, 0x40, 0x3b, 0xe9, 0xaa,
0x0b, 0xb3, 0xba, 0xf1, 0xf1, 0x72, 0x4a, 0xfa, 0x52, 0xf5, 0xbc, 0x90, 0x67, 0x7c, 0xde, 0x24,
0xe6, 0x19, 0x9f, 0x17, 0x8b, 0xbd, 0x03, 0x25, 0x35, 0x40, 0xd0, 0xc6, 0x02, 0xbf, 0xcf, 0xc5,
0x47, 0x53, 0xe3, 0xa3, 0x74, 0x93, 0x73, 0xa3, 0xf0, 0x1a, 0xea, 0x33, 0x43, 0x09, 0x3d, 0xce,
0x6a, 0x62, 0x76, 0x2c, 0xbd, 0xc5, 0xd5, 0xaf, 0xa0, 0x32, 0x2e, 0x40, 0xb4, 0x9d, 0xae, 0x3d,
0x57, 0xd7, 0x8d, 0xee, 0x32, 0x2a, 0xfa, 0xca, 0x27, 0x50, 0x3c, 0x72, 0x23, 0x96, 0x1c, 0xc0,
0x84, 0x7d, 0xf4, 0x09, 0x94, 0x1c, 0xcc, 0xa2, 0xd1, 0xf2, 0x9a, 0x3f, 0x00, 0xc4, 0x3e, 0x72,
0x9e, 0x64, 0x48, 0xb1, 0xbb, 0x9a, 0x4d, 0xa2, 0xf9, 0x43, 0x28, 0x88, 0x0e, 0x8c, 0x1e, 0xa5,
0x1b, 0x8e, 0x75, 0xea, 0x44, 0x73, 0x27, 0x50, 0x10, 0x83, 0x1b, 0x65, 0x28, 0x85, 0xc5, 0x4f,
0x93, 0x44, 0xab, 0xdf, 0x40, 0x75, 0x32, 0xf7, 0x51, 0x06, 0xde, 0xe6, 0x3f, 0x12, 0x12, 0x0d,
0x1f, 0x43, 0x59, 0x8f, 0x0b, 0x94, 0x21, 0xff, 0x66, 0x27, 0x4b, 0xa2, 0xd1, 0xaf, 0xa1, 0x32,
0xee, 0xc9, 0x89, 0x6c, 0x67, 0x78, 0xc4, 0x42, 0x5f, 0x7f, 0x09, 0x25, 0xd5, 0xbc, 0xb3, 0x74,
0xa7, 0x85, 0x36, 0x9f, 0xe4, 0xee, 0xde, 0xe1, 0xcd, 0x9b, 0xe6, 0xbd, 0x3f, 0xdf, 0x34, 0xef,
0xfd, 0x7c, 0xdb, 0x34, 0x6e, 0x6e, 0x9b, 0xc6, 0x1f, 0xb7, 0x4d, 0xe3, 0xef, 0xdb, 0xa6, 0xf1,
0xdd, 0xce, 0x72, 0xff, 0xd1, 0x7d, 0x26, 0x7e, 0xcf, 0x4a, 0xd2, 0xfc, 0xce, 0x7f, 0x01, 0x00,
0x00, 0xff, 0xff, 0xa0, 0x05, 0x00, 0x28, 0x0f, 0x0e, 0x00, 0x00,
// 1123 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x57, 0xcd, 0x6e, 0xdb, 0x46,
0x10, 0x36, 0xf5, 0xaf, 0x51, 0xe4, 0xda, 0x5b, 0xc7, 0x65, 0x14, 0x40, 0x16, 0x08, 0xd4, 0x50,
0x51, 0x84, 0xaa, 0xe5, 0x22, 0x69, 0x5a, 0xa0, 0x80, 0xed, 0x04, 0x85, 0xd1, 0x1a, 0x31, 0x68,
0xa7, 0x29, 0x5a, 0x14, 0x06, 0x2d, 0xae, 0xa5, 0x85, 0x29, 0x92, 0xe1, 0x2e, 0x5d, 0xbb, 0xa7,
0x9e, 0x7a, 0xee, 0xe3, 0xf4, 0x11, 0x7c, 0xc8, 0xa1, 0xc7, 0x9e, 0xd2, 0xc6, 0xf7, 0xbe, 0x43,
0xb1, 0x3f, 0x92, 0x28, 0xc9, 0x0c, 0x29, 0x5f, 0xac, 0x9d, 0xe5, 0x37, 0xbb, 0xb3, 0xf3, 0x7d,
0x3b, 0xb3, 0x86, 0xa7, 0x7d, 0xc2, 0x06, 0xd1, 0xa9, 0xd9, 0xf3, 0x87, 0x9d, 0x9e, 0xef, 0x31,
0x9b, 0x78, 0x38, 0x74, 0xe2, 0x43, 0x97, 0x78, 0xd1, 0x65, 0x87, 0x0e, 0xc8, 0xb0, 0x73, 0xb1,
0x25, 0x7e, 0xcd, 0x20, 0xf4, 0x99, 0x8f, 0x5a, 0x13, 0x90, 0x19, 0x46, 0x1e, 0x23, 0x43, 0x6c,
0x0a, 0xb0, 0x29, 0x40, 0x17, 0x5b, 0x8d, 0x07, 0x7d, 0xdf, 0xef, 0xbb, 0xb8, 0x23, 0xf0, 0xa7,
0xd1, 0x59, 0xc7, 0xf6, 0xae, 0xa4, 0x73, 0xe3, 0xe1, 0xec, 0x27, 0x3c, 0x0c, 0xd8, 0xe8, 0xe3,
0x5a, 0xdf, 0xef, 0xfb, 0x62, 0xd8, 0xe1, 0x23, 0x35, 0xbb, 0x31, 0xeb, 0xc2, 0x77, 0xa4, 0xcc,
0x1e, 0x06, 0x0a, 0xf0, 0x38, 0xf5, 0x2c, 0x76, 0x40, 0x3a, 0xec, 0x2a, 0xc0, 0xb4, 0x33, 0xf4,
0x23, 0x8f, 0x29, 0xbf, 0x2f, 0x17, 0xf0, 0x63, 0x36, 0x3d, 0x17, 0x7f, 0xa4, 0xaf, 0xf1, 0x5f,
0x0e, 0x56, 0xf7, 0x42, 0x6c, 0x33, 0x7c, 0x6c, 0xd3, 0x73, 0x0b, 0xbf, 0x8e, 0x30, 0x65, 0x68,
0x1d, 0x72, 0xc4, 0xd1, 0xb5, 0x96, 0xd6, 0xae, 0xee, 0x96, 0x6e, 0xde, 0x6e, 0xe4, 0xf6, 0x9f,
0x59, 0x39, 0xe2, 0xa0, 0x75, 0x28, 0x9d, 0x46, 0x9e, 0xe3, 0x62, 0x3d, 0xc7, 0xbf, 0x59, 0xca,
0x42, 0x3a, 0x94, 0x55, 0x06, 0xf5, 0xbc, 0xf8, 0x30, 0x32, 0x51, 0x07, 0x4a, 0xa1, 0xef, 0xb3,
0x33, 0xaa, 0x17, 0x5a, 0xf9, 0x76, 0xad, 0xfb, 0x91, 0x19, 0xcb, 0xba, 0x08, 0xc9, 0x3c, 0xe0,
0x47, 0xb1, 0x14, 0x0c, 0x35, 0xa0, 0xc2, 0x70, 0x38, 0x24, 0x9e, 0xed, 0xea, 0xc5, 0x96, 0xd6,
0xae, 0x58, 0x63, 0x1b, 0xad, 0x41, 0x91, 0x32, 0x87, 0x78, 0x7a, 0x49, 0x6c, 0x22, 0x0d, 0x1e,
0x14, 0x65, 0x8e, 0x1f, 0x31, 0xbd, 0x2c, 0x83, 0x92, 0x96, 0x9a, 0xc7, 0x61, 0xa8, 0x57, 0xc6,
0xf3, 0x38, 0x0c, 0x51, 0x13, 0xa0, 0x37, 0xc0, 0xbd, 0xf3, 0xc0, 0x27, 0x1e, 0xd3, 0xab, 0xe2,
0x5b, 0x6c, 0x06, 0x7d, 0x0a, 0xab, 0x81, 0x1d, 0x62, 0x8f, 0x9d, 0xc4, 0x60, 0x20, 0x60, 0x2b,
0xf2, 0xc3, 0xde, 0x04, 0x6c, 0x42, 0xd9, 0x0f, 0x18, 0xf1, 0x3d, 0xaa, 0xd7, 0x5a, 0x5a, 0xbb,
0xd6, 0x5d, 0x33, 0x25, 0xcd, 0xe6, 0x88, 0x66, 0x73, 0xc7, 0xbb, 0xb2, 0x46, 0x20, 0x63, 0x13,
0x50, 0x3c, 0xdd, 0x34, 0xf0, 0x3d, 0x8a, 0xd1, 0x0a, 0xe4, 0x03, 0x95, 0xf0, 0xba, 0xc5, 0x87,
0xc6, 0xef, 0x1a, 0x2c, 0x3f, 0xc3, 0x2e, 0x66, 0x38, 0x19, 0x84, 0x36, 0xa0, 0x86, 0x2f, 0x09,
0x3b, 0xa1, 0xcc, 0x66, 0x11, 0x15, 0x9c, 0xd4, 0x2d, 0xe0, 0x53, 0x47, 0x62, 0x06, 0xed, 0x40,
0x95, 0x5b, 0xd8, 0x39, 0xb1, 0x99, 0x60, 0xa6, 0xd6, 0x6d, 0xcc, 0xc5, 0x77, 0x3c, 0x92, 0xe1,
0x6e, 0xe5, 0xfa, 0xed, 0xc6, 0xd2, 0x1f, 0xff, 0x6c, 0x68, 0x56, 0x45, 0xba, 0xed, 0x30, 0xc3,
0x84, 0x35, 0x19, 0xc7, 0x61, 0xe8, 0xf7, 0x30, 0xa5, 0x29, 0x12, 0x31, 0xfe, 0xd4, 0x00, 0x3d,
0xbf, 0xc4, 0xbd, 0x6c, 0xf0, 0x29, 0xba, 0x73, 0x49, 0x74, 0xe7, 0x6f, 0xa7, 0xbb, 0x90, 0x40,
0x77, 0x71, 0x8a, 0xee, 0x36, 0x14, 0x68, 0x80, 0x7b, 0x42, 0x33, 0x49, 0xf4, 0x08, 0x84, 0x71,
0x1f, 0x3e, 0x9c, 0x8a, 0x5c, 0xe6, 0xdd, 0xf8, 0x01, 0x56, 0x2c, 0x4c, 0xc9, 0xaf, 0xf8, 0x90,
0x5d, 0xa5, 0x1d, 0x67, 0x0d, 0x8a, 0xbf, 0x10, 0x87, 0x0d, 0x14, 0x17, 0xd2, 0xe0, 0xa1, 0x0d,
0x30, 0xe9, 0x0f, 0x24, 0x07, 0x75, 0x4b, 0x59, 0xc6, 0x26, 0xdc, 0xe3, 0x44, 0xe1, 0xb4, 0x9c,
0xbe, 0xc9, 0x41, 0x5d, 0x01, 0x95, 0x16, 0x16, 0xbd, 0xa0, 0x4a, 0x3b, 0xf9, 0x89, 0x76, 0xb6,
0x79, 0xba, 0x84, 0x6c, 0x78, 0x1a, 0x97, 0xbb, 0x0f, 0xe3, 0x17, 0xf3, 0x62, 0x4b, 0xdd, 0x4d,
0xa9, 0x23, 0x4b, 0x41, 0x27, 0x8c, 0x14, 0x6f, 0x67, 0xa4, 0x94, 0xc0, 0x48, 0x79, 0x8a, 0x91,
0x38, 0xe7, 0x95, 0x19, 0xce, 0x67, 0x24, 0x5d, 0x7d, 0xbf, 0xa4, 0xe1, 0x4e, 0x92, 0x7e, 0x01,
0xb5, 0x6f, 0x89, 0xeb, 0x66, 0x28, 0x76, 0x94, 0xf4, 0x47, 0xc2, 0xac, 0x5b, 0xca, 0xe2, 0xb9,
0xb4, 0x5d, 0x57, 0xe4, 0xb2, 0x62, 0xf1, 0xa1, 0xf1, 0x35, 0x2c, 0xef, 0xb9, 0x3e, 0xc5, 0xfb,
0x2f, 0x32, 0xe8, 0x43, 0x26, 0x50, 0x6a, 0x5d, 0x1a, 0xc6, 0x27, 0xf0, 0xc1, 0x77, 0x84, 0xb2,
0x43, 0xe2, 0xa4, 0x5e, 0xaf, 0x4d, 0x58, 0x99, 0x40, 0x95, 0x18, 0x10, 0x14, 0x02, 0xe2, 0x50,
0x5d, 0x6b, 0xe5, 0xdb, 0x75, 0x4b, 0x8c, 0x8d, 0x9f, 0xe0, 0xfe, 0xa4, 0x4a, 0xc5, 0x4b, 0x3b,
0x07, 0xdb, 0x6c, 0x20, 0x97, 0xb6, 0xc4, 0x38, 0x5e, 0xc4, 0x72, 0x59, 0x8a, 0xd8, 0x23, 0x58,
0x39, 0x1a, 0x90, 0xe1, 0xbe, 0x77, 0xe6, 0x8f, 0x83, 0x78, 0x00, 0x15, 0xde, 0x36, 0x4f, 0x26,
0x25, 0xaa, 0xcc, 0xed, 0x43, 0xe2, 0x18, 0xdf, 0xc0, 0xea, 0xcb, 0xc0, 0x99, 0x69, 0x31, 0x5d,
0xa8, 0x86, 0x98, 0xfa, 0x51, 0xd8, 0xc3, 0x54, 0x38, 0x24, 0xed, 0x3a, 0x81, 0xa9, 0xfb, 0x12,
0xb2, 0xb4, 0x24, 0x3d, 0x15, 0xd7, 0x85, 0xe3, 0x52, 0xae, 0x8b, 0xba, 0x16, 0xb9, 0x49, 0xdd,
0xfd, 0x18, 0x6a, 0xaf, 0x6c, 0x92, 0xba, 0x43, 0x08, 0xf7, 0x24, 0x4c, 0x6d, 0x30, 0x23, 0x5b,
0xed, 0xfd, 0xb2, 0xcd, 0xdd, 0x45, 0xb6, 0xdd, 0x37, 0x35, 0x28, 0xf0, 0xb4, 0xa3, 0x01, 0x14,
0x45, 0x35, 0x40, 0xa6, 0x99, 0xf6, 0x84, 0x31, 0xe3, 0xf5, 0xa5, 0xd1, 0xc9, 0x8c, 0x57, 0xc7,
0xa2, 0x50, 0x92, 0xdd, 0x0a, 0x6d, 0xa7, 0xbb, 0xce, 0x3d, 0x23, 0x1a, 0x9f, 0x2f, 0xe6, 0xa4,
0x36, 0x95, 0xc7, 0x0b, 0x59, 0xc6, 0xe3, 0x8d, 0xe5, 0x90, 0xf1, 0x78, 0x31, 0x59, 0x58, 0x50,
0x92, 0xbd, 0x0d, 0xad, 0xcf, 0x71, 0xf1, 0x9c, 0xbf, 0xe7, 0x1a, 0x9f, 0xa5, 0x2f, 0x39, 0xd3,
0xa5, 0xaf, 0xa0, 0x3e, 0xd5, 0x2f, 0xd1, 0xe3, 0xac, 0x4b, 0x4c, 0x77, 0xcc, 0x3b, 0x6c, 0xfd,
0x1a, 0x2a, 0xa3, 0xda, 0x80, 0xb6, 0xd2, 0xbd, 0x67, 0x4a, 0x4e, 0xa3, 0xbb, 0x88, 0x8b, 0xda,
0xf2, 0x09, 0x14, 0x0f, 0xed, 0x88, 0x26, 0x27, 0x30, 0x61, 0x1e, 0x7d, 0x01, 0x25, 0x0b, 0xd3,
0x68, 0xb8, 0xb8, 0xe7, 0xcf, 0x00, 0xb1, 0xf7, 0xd7, 0x93, 0x0c, 0x12, 0xbb, 0xad, 0x0e, 0x26,
0x2e, 0x7f, 0x00, 0x05, 0xde, 0x1c, 0xd0, 0xa3, 0xf4, 0x85, 0x63, 0x4d, 0x24, 0x71, 0xb9, 0x63,
0x28, 0xf0, 0x37, 0x05, 0xca, 0x70, 0x15, 0xe6, 0x5f, 0x4d, 0x89, 0xab, 0xbe, 0x82, 0xea, 0xf8,
0x49, 0x82, 0x32, 0xf0, 0x36, 0xfb, 0x7e, 0x49, 0x5c, 0xf8, 0x08, 0xca, 0xaa, 0x93, 0xa1, 0x0c,
0xfa, 0x9b, 0x6e, 0x7a, 0x89, 0x8b, 0x7e, 0x0f, 0x95, 0x51, 0xbb, 0x48, 0x64, 0x3b, 0xc3, 0x21,
0xe6, 0x5a, 0xce, 0x4b, 0x28, 0xc9, 0xbe, 0x92, 0xa5, 0x3a, 0xcd, 0x75, 0xa0, 0xc4, 0x70, 0x31,
0x14, 0x78, 0x6d, 0xcf, 0xa2, 0x80, 0x58, 0xab, 0x68, 0x98, 0x59, 0xe1, 0x32, 0xfa, 0xdd, 0x83,
0xeb, 0x77, 0xcd, 0xa5, 0xbf, 0xdf, 0x35, 0x97, 0x7e, 0xbb, 0x69, 0x6a, 0xd7, 0x37, 0x4d, 0xed,
0xaf, 0x9b, 0xa6, 0xf6, 0xef, 0x4d, 0x53, 0xfb, 0x71, 0x7b, 0xb1, 0xff, 0x69, 0xbf, 0xe2, 0xbf,
0xa7, 0x25, 0x71, 0x8a, 0xed, 0xff, 0x03, 0x00, 0x00, 0xff, 0xff, 0x70, 0x88, 0x12, 0x32, 0x11,
0x0f, 0x00, 0x00,
}

View File

@@ -47,6 +47,8 @@ service Shim {
rpc ShimInfo(google.protobuf.Empty) returns (ShimInfoResponse);
rpc Update(UpdateTaskRequest) returns (google.protobuf.Empty);
rpc Wait(WaitRequest) returns (WaitResponse);
}
message CreateTaskRequest {
@@ -152,3 +154,12 @@ message StartResponse {
string id = 1;
uint32 pid = 2;
}
message WaitRequest {
string id = 1;
}
message WaitResponse {
uint32 exit_status = 1;
google.protobuf.Timestamp exited_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}

View File

@@ -22,12 +22,18 @@ type Task struct {
shim *client.Client
namespace string
cg cgroups.Cgroup
monitor runtime.TaskMonitor
}
func newTask(id, namespace string, pid int, shim *client.Client) (*Task, error) {
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(pid))
if err != nil {
return nil, err
func newTask(id, namespace string, pid int, shim *client.Client, monitor runtime.TaskMonitor) (*Task, error) {
var (
err error
cg cgroups.Cgroup
)
if pid > 0 {
if cg, err = cgroups.Load(cgroups.V1, cgroups.PidPath(pid)); err != nil {
return nil, err
}
}
return &Task{
id: id,
@@ -35,6 +41,7 @@ func newTask(id, namespace string, pid int, shim *client.Client) (*Task, error)
shim: shim,
namespace: namespace,
cg: cg,
monitor: monitor,
}, nil
}
@@ -51,12 +58,24 @@ func (t *Task) Info() runtime.TaskInfo {
}
func (t *Task) Start(ctx context.Context) error {
_, err := t.shim.Start(ctx, &shim.StartRequest{
hasCgroup := t.cg != nil
r, err := t.shim.Start(ctx, &shim.StartRequest{
ID: t.id,
})
if err != nil {
return errdefs.FromGRPC(err)
}
t.pid = int(r.Pid)
if !hasCgroup {
cg, err := cgroups.Load(cgroups.V1, cgroups.PidPath(t.pid))
if err != nil {
return err
}
t.cg = cg
if err := t.monitor.Monitor(t); err != nil {
return err
}
}
return nil
}
@@ -225,3 +244,16 @@ func (t *Task) Metrics(ctx context.Context) (interface{}, error) {
func (t *Task) Cgroup() cgroups.Cgroup {
return t.cg
}
func (t *Task) Wait(ctx context.Context) (*runtime.Exit, error) {
r, err := t.shim.Wait(ctx, &shim.WaitRequest{
ID: t.id,
})
if err != nil {
return nil, err
}
return &runtime.Exit{
Timestamp: r.ExitedAt,
Status: r.ExitStatus,
}, nil
}