diff --git a/container_test.go b/container_test.go index db79aa0bd..d7254a2d0 100644 --- a/container_test.go +++ b/container_test.go @@ -3,9 +3,14 @@ package containerd import ( "bytes" "context" + "syscall" "testing" ) +func empty() IOCreation { + return BufferedIO(bytes.NewBuffer(nil), bytes.NewBuffer(nil), bytes.NewBuffer(nil)) +} + func TestContainerList(t *testing.T) { if testing.Short() { t.Skip() @@ -64,7 +69,6 @@ func TestNewContainer(t *testing.T) { func TestContainerStart(t *testing.T) { if testing.Short() { t.Skip() - return } client, err := New(address) if err != nil { @@ -133,7 +137,6 @@ func TestContainerStart(t *testing.T) { func TestContainerOutput(t *testing.T) { if testing.Short() { t.Skip() - return } client, err := New(address) if err != nil { @@ -201,3 +204,88 @@ func TestContainerOutput(t *testing.T) { t.Errorf("expected output %q but received %q", expected, actual) } } + +func TestContainerExec(t *testing.T) { + if testing.Short() { + t.Skip() + } + client, err := New(address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + var ( + ctx = context.Background() + id = "ContainerExec" + ) + image, err := client.GetImage(ctx, testImage) + if err != nil { + t.Error(err) + return + } + spec, err := GenerateSpec(WithImageConfig(ctx, image), WithProcessArgs("sleep", "100")) + if err != nil { + t.Error(err) + return + } + container, err := client.NewContainer(ctx, id, spec, WithImage(image), WithNewRootFS(id, image)) + if err != nil { + t.Error(err) + return + } + defer container.Delete(ctx) + + task, err := container.NewTask(ctx, empty()) + if err != nil { + t.Error(err) + return + } + defer task.Delete(ctx) + + finished := make(chan struct{}, 1) + go func() { + if _, err := task.Wait(ctx); err != nil { + t.Error(err) + } + close(finished) + }() + + // start an exec process without running the original container process info + processSpec := spec.Process + processSpec.Args = []string{ + "sh", "-c", + "exit 6", + } + + process, err := task.Exec(ctx, &processSpec, empty()) + if err != nil { + t.Error(err) + return + } + processStatusC := make(chan uint32, 1) + go func() { + status, err := process.Wait(ctx) + if err != nil { + t.Error(err) + return + } + processStatusC <- status + }() + + if err := process.Start(ctx); err != nil { + t.Error(err) + return + } + + // wait for the exec to return + status := <-processStatusC + + if status != 6 { + t.Errorf("expected exec exit code 6 but received %d", status) + } + if err := task.Kill(ctx, syscall.SIGKILL); err != nil { + t.Error(err) + } + <-finished +} diff --git a/linux/shim/service.go b/linux/shim/service.go index fb460cb3b..59add659d 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -101,19 +101,18 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecRequest) (*shimapi.Ex s.mu.Lock() defer s.mu.Unlock() s.execID++ - reaper.Default.Lock() + process, err := newExecProcess(ctx, s.path, r, s.initProcess, s.execID) if err != nil { - reaper.Default.Unlock() return nil, err } pid := process.Pid() - s.processes[pid] = process cmd := &reaper.Cmd{ ExitCh: make(chan int, 1), } - reaper.Default.RegisterNL(pid, cmd) - reaper.Default.Unlock() + reaper.Default.Register(pid, cmd) + s.processes[pid] = process + s.events <- &task.Event{ Type: task.Event_EXEC_ADDED, ID: s.id, @@ -290,6 +289,8 @@ func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointRequest) func (s *Service) waitExit(p process, pid int, cmd *reaper.Cmd) { status := <-cmd.ExitCh p.Exited(status) + + reaper.Default.Delete(pid) s.events <- &task.Event{ Type: task.Event_EXIT, ID: s.id, diff --git a/process.go b/process.go new file mode 100644 index 000000000..8aa31585e --- /dev/null +++ b/process.go @@ -0,0 +1,87 @@ +package containerd + +import ( + "context" + "encoding/json" + "syscall" + + "github.com/containerd/containerd/api/services/execution" + taskapi "github.com/containerd/containerd/api/types/task" + protobuf "github.com/gogo/protobuf/types" + specs "github.com/opencontainers/runtime-spec/specs-go" +) + +type process struct { + task *task + + // this is a hack to make a blocking Wait work + // exec does not have a create/start split so if a quick exiting process like `exit 1` + // run, the wait does not have enough time to get the pid catch the event. So we need + // to lock this on process struct create and only unlock it after the pid is set + // this allow the wait to be called before calling process start and not race with the exit event + pidSync chan struct{} + + io *IO + pid uint32 + spec *specs.Process +} + +func (p *process) Pid() uint32 { + return p.pid +} + +func (p *process) Start(ctx context.Context) error { + data, err := json.Marshal(p.spec) + if err != nil { + return err + } + request := &execution.ExecRequest{ + ContainerID: p.task.containerID, + Terminal: p.io.Terminal, + Stdin: p.io.Stdin, + Stdout: p.io.Stdout, + Stderr: p.io.Stderr, + Spec: &protobuf.Any{ + TypeUrl: specs.Version, + Value: data, + }, + } + response, err := p.task.client.TaskService().Exec(ctx, request) + if err != nil { + return err + } + p.pid = response.Pid + close(p.pidSync) + return nil +} + +func (p *process) Kill(ctx context.Context, s syscall.Signal) error { + _, err := p.task.client.TaskService().Kill(ctx, &execution.KillRequest{ + Signal: uint32(s), + ContainerID: p.task.containerID, + PidOrAll: &execution.KillRequest_Pid{ + Pid: p.pid, + }, + }) + return err +} + +func (p *process) Wait(ctx context.Context) (uint32, error) { + events, err := p.task.client.TaskService().Events(ctx, &execution.EventsRequest{}) + if err != nil { + return UnknownExitStatus, err + } + <-p.pidSync + for { + e, err := events.Recv() + if err != nil { + return UnknownExitStatus, err + } + if e.Type != taskapi.Event_EXIT { + continue + } + if e.ID == p.task.containerID && e.Pid == p.pid { + return e.ExitStatus, nil + } + } +} diff --git a/reaper/reaper.go b/reaper/reaper.go index 599bafaff..7fceac740 100644 --- a/reaper/reaper.go +++ b/reaper/reaper.go @@ -19,30 +19,32 @@ func Reap() error { for _, e := range exits { Default.Lock() c, ok := Default.cmds[e.Pid] - Default.Unlock() if !ok { + Default.unknown[e.Pid] = e.Status + Default.Unlock() continue } + Default.Unlock() if c.c != nil { // after we get an exit, call wait on the go process to make sure all // pipes are closed and finalizers are run on the process c.c.Wait() } c.ExitCh <- e.Status - Default.Lock() - delete(Default.cmds, e.Pid) - Default.Unlock() } return err } var Default = &Monitor{ - cmds: make(map[int]*Cmd), + cmds: make(map[int]*Cmd), + unknown: make(map[int]int), } type Monitor struct { sync.Mutex - cmds map[int]*Cmd + + cmds map[int]*Cmd + unknown map[int]int } func (m *Monitor) Output(c *exec.Cmd) ([]byte, error) { @@ -93,6 +95,13 @@ func (m *Monitor) Wait(c *exec.Cmd) (int, error) { func (m *Monitor) Register(pid int, c *Cmd) { m.Lock() + if status, ok := m.unknown[pid]; ok { + delete(m.unknown, pid) + m.cmds[pid] = c + m.Unlock() + c.ExitCh <- status + return + } m.cmds[pid] = c m.Unlock() } @@ -117,6 +126,19 @@ func (m *Monitor) WaitPid(pid int) (int, error) { return ec, nil } +// Command returns the registered pid for the command created +func (m *Monitor) Command(pid int) *Cmd { + m.Lock() + defer m.Unlock() + return m.cmds[pid] +} + +func (m *Monitor) Delete(pid int) { + m.Lock() + delete(m.cmds, pid) + m.Unlock() +} + type Cmd struct { c *exec.Cmd ExitCh chan int diff --git a/task.go b/task.go index ab37ba4ca..5172da99d 100644 --- a/task.go +++ b/task.go @@ -6,6 +6,7 @@ import ( "github.com/containerd/containerd/api/services/execution" taskapi "github.com/containerd/containerd/api/types/task" + specs "github.com/opencontainers/runtime-spec/specs-go" ) const UnknownExitStatus = 255 @@ -21,14 +22,22 @@ const ( ) type Task interface { + Pid() uint32 Delete(context.Context) (uint32, error) Kill(context.Context, syscall.Signal) error Pause(context.Context) error Resume(context.Context) error - Pid() uint32 Start(context.Context) error Status(context.Context) (TaskStatus, error) Wait(context.Context) (uint32, error) + Exec(context.Context, *specs.Process, IOCreation) (Process, error) +} + +type Process interface { + Pid() uint32 + Start(context.Context) error + Kill(context.Context, syscall.Signal) error + Wait(context.Context) (uint32, error) } var _ = (Task)(&task{}) @@ -121,3 +130,16 @@ func (t *task) Delete(ctx context.Context) (uint32, error) { } return r.ExitStatus, cerr } + +func (t *task) Exec(ctx context.Context, spec *specs.Process, ioCreate IOCreation) (Process, error) { + i, err := ioCreate() + if err != nil { + return nil, err + } + return &process{ + task: t, + io: i, + spec: spec, + pidSync: make(chan struct{}), + }, nil +}