Add exec support to client
This also fixes a deadlock in the shim's reaper where execs would lockup and/or miss a quick exiting exec process's exit status. Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
parent
f55f40eeec
commit
ebf935d990
@ -3,9 +3,14 @@ package containerd
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func empty() IOCreation {
|
||||||
|
return BufferedIO(bytes.NewBuffer(nil), bytes.NewBuffer(nil), bytes.NewBuffer(nil))
|
||||||
|
}
|
||||||
|
|
||||||
func TestContainerList(t *testing.T) {
|
func TestContainerList(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip()
|
t.Skip()
|
||||||
@ -64,7 +69,6 @@ func TestNewContainer(t *testing.T) {
|
|||||||
func TestContainerStart(t *testing.T) {
|
func TestContainerStart(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip()
|
t.Skip()
|
||||||
return
|
|
||||||
}
|
}
|
||||||
client, err := New(address)
|
client, err := New(address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -133,7 +137,6 @@ func TestContainerStart(t *testing.T) {
|
|||||||
func TestContainerOutput(t *testing.T) {
|
func TestContainerOutput(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip()
|
t.Skip()
|
||||||
return
|
|
||||||
}
|
}
|
||||||
client, err := New(address)
|
client, err := New(address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -201,3 +204,88 @@ func TestContainerOutput(t *testing.T) {
|
|||||||
t.Errorf("expected output %q but received %q", expected, actual)
|
t.Errorf("expected output %q but received %q", expected, actual)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestContainerExec(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip()
|
||||||
|
}
|
||||||
|
client, err := New(address)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
var (
|
||||||
|
ctx = context.Background()
|
||||||
|
id = "ContainerExec"
|
||||||
|
)
|
||||||
|
image, err := client.GetImage(ctx, testImage)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
spec, err := GenerateSpec(WithImageConfig(ctx, image), WithProcessArgs("sleep", "100"))
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
container, err := client.NewContainer(ctx, id, spec, WithImage(image), WithNewRootFS(id, image))
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer container.Delete(ctx)
|
||||||
|
|
||||||
|
task, err := container.NewTask(ctx, empty())
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer task.Delete(ctx)
|
||||||
|
|
||||||
|
finished := make(chan struct{}, 1)
|
||||||
|
go func() {
|
||||||
|
if _, err := task.Wait(ctx); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
close(finished)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// start an exec process without running the original container process info
|
||||||
|
processSpec := spec.Process
|
||||||
|
processSpec.Args = []string{
|
||||||
|
"sh", "-c",
|
||||||
|
"exit 6",
|
||||||
|
}
|
||||||
|
|
||||||
|
process, err := task.Exec(ctx, &processSpec, empty())
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
processStatusC := make(chan uint32, 1)
|
||||||
|
go func() {
|
||||||
|
status, err := process.Wait(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
processStatusC <- status
|
||||||
|
}()
|
||||||
|
|
||||||
|
if err := process.Start(ctx); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for the exec to return
|
||||||
|
status := <-processStatusC
|
||||||
|
|
||||||
|
if status != 6 {
|
||||||
|
t.Errorf("expected exec exit code 6 but received %d", status)
|
||||||
|
}
|
||||||
|
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
<-finished
|
||||||
|
}
|
||||||
|
@ -101,19 +101,18 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecRequest) (*shimapi.Ex
|
|||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
s.execID++
|
s.execID++
|
||||||
reaper.Default.Lock()
|
|
||||||
process, err := newExecProcess(ctx, s.path, r, s.initProcess, s.execID)
|
process, err := newExecProcess(ctx, s.path, r, s.initProcess, s.execID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
reaper.Default.Unlock()
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
pid := process.Pid()
|
pid := process.Pid()
|
||||||
s.processes[pid] = process
|
|
||||||
cmd := &reaper.Cmd{
|
cmd := &reaper.Cmd{
|
||||||
ExitCh: make(chan int, 1),
|
ExitCh: make(chan int, 1),
|
||||||
}
|
}
|
||||||
reaper.Default.RegisterNL(pid, cmd)
|
reaper.Default.Register(pid, cmd)
|
||||||
reaper.Default.Unlock()
|
s.processes[pid] = process
|
||||||
|
|
||||||
s.events <- &task.Event{
|
s.events <- &task.Event{
|
||||||
Type: task.Event_EXEC_ADDED,
|
Type: task.Event_EXEC_ADDED,
|
||||||
ID: s.id,
|
ID: s.id,
|
||||||
@ -290,6 +289,8 @@ func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointRequest)
|
|||||||
func (s *Service) waitExit(p process, pid int, cmd *reaper.Cmd) {
|
func (s *Service) waitExit(p process, pid int, cmd *reaper.Cmd) {
|
||||||
status := <-cmd.ExitCh
|
status := <-cmd.ExitCh
|
||||||
p.Exited(status)
|
p.Exited(status)
|
||||||
|
|
||||||
|
reaper.Default.Delete(pid)
|
||||||
s.events <- &task.Event{
|
s.events <- &task.Event{
|
||||||
Type: task.Event_EXIT,
|
Type: task.Event_EXIT,
|
||||||
ID: s.id,
|
ID: s.id,
|
||||||
|
87
process.go
Normal file
87
process.go
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
package containerd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd/api/services/execution"
|
||||||
|
taskapi "github.com/containerd/containerd/api/types/task"
|
||||||
|
protobuf "github.com/gogo/protobuf/types"
|
||||||
|
specs "github.com/opencontainers/runtime-spec/specs-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type process struct {
|
||||||
|
task *task
|
||||||
|
|
||||||
|
// this is a hack to make a blocking Wait work
|
||||||
|
// exec does not have a create/start split so if a quick exiting process like `exit 1`
|
||||||
|
// run, the wait does not have enough time to get the pid catch the event. So we need
|
||||||
|
// to lock this on process struct create and only unlock it after the pid is set
|
||||||
|
// this allow the wait to be called before calling process start and not race with the exit event
|
||||||
|
pidSync chan struct{}
|
||||||
|
|
||||||
|
io *IO
|
||||||
|
pid uint32
|
||||||
|
spec *specs.Process
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *process) Pid() uint32 {
|
||||||
|
return p.pid
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *process) Start(ctx context.Context) error {
|
||||||
|
data, err := json.Marshal(p.spec)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
request := &execution.ExecRequest{
|
||||||
|
ContainerID: p.task.containerID,
|
||||||
|
Terminal: p.io.Terminal,
|
||||||
|
Stdin: p.io.Stdin,
|
||||||
|
Stdout: p.io.Stdout,
|
||||||
|
Stderr: p.io.Stderr,
|
||||||
|
Spec: &protobuf.Any{
|
||||||
|
TypeUrl: specs.Version,
|
||||||
|
Value: data,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
response, err := p.task.client.TaskService().Exec(ctx, request)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
p.pid = response.Pid
|
||||||
|
close(p.pidSync)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *process) Kill(ctx context.Context, s syscall.Signal) error {
|
||||||
|
_, err := p.task.client.TaskService().Kill(ctx, &execution.KillRequest{
|
||||||
|
Signal: uint32(s),
|
||||||
|
ContainerID: p.task.containerID,
|
||||||
|
PidOrAll: &execution.KillRequest_Pid{
|
||||||
|
Pid: p.pid,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *process) Wait(ctx context.Context) (uint32, error) {
|
||||||
|
events, err := p.task.client.TaskService().Events(ctx, &execution.EventsRequest{})
|
||||||
|
if err != nil {
|
||||||
|
return UnknownExitStatus, err
|
||||||
|
}
|
||||||
|
<-p.pidSync
|
||||||
|
for {
|
||||||
|
e, err := events.Recv()
|
||||||
|
if err != nil {
|
||||||
|
return UnknownExitStatus, err
|
||||||
|
}
|
||||||
|
if e.Type != taskapi.Event_EXIT {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if e.ID == p.task.containerID && e.Pid == p.pid {
|
||||||
|
return e.ExitStatus, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -19,30 +19,32 @@ func Reap() error {
|
|||||||
for _, e := range exits {
|
for _, e := range exits {
|
||||||
Default.Lock()
|
Default.Lock()
|
||||||
c, ok := Default.cmds[e.Pid]
|
c, ok := Default.cmds[e.Pid]
|
||||||
Default.Unlock()
|
|
||||||
if !ok {
|
if !ok {
|
||||||
|
Default.unknown[e.Pid] = e.Status
|
||||||
|
Default.Unlock()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
Default.Unlock()
|
||||||
if c.c != nil {
|
if c.c != nil {
|
||||||
// after we get an exit, call wait on the go process to make sure all
|
// after we get an exit, call wait on the go process to make sure all
|
||||||
// pipes are closed and finalizers are run on the process
|
// pipes are closed and finalizers are run on the process
|
||||||
c.c.Wait()
|
c.c.Wait()
|
||||||
}
|
}
|
||||||
c.ExitCh <- e.Status
|
c.ExitCh <- e.Status
|
||||||
Default.Lock()
|
|
||||||
delete(Default.cmds, e.Pid)
|
|
||||||
Default.Unlock()
|
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var Default = &Monitor{
|
var Default = &Monitor{
|
||||||
cmds: make(map[int]*Cmd),
|
cmds: make(map[int]*Cmd),
|
||||||
|
unknown: make(map[int]int),
|
||||||
}
|
}
|
||||||
|
|
||||||
type Monitor struct {
|
type Monitor struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
cmds map[int]*Cmd
|
|
||||||
|
cmds map[int]*Cmd
|
||||||
|
unknown map[int]int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Monitor) Output(c *exec.Cmd) ([]byte, error) {
|
func (m *Monitor) Output(c *exec.Cmd) ([]byte, error) {
|
||||||
@ -93,6 +95,13 @@ func (m *Monitor) Wait(c *exec.Cmd) (int, error) {
|
|||||||
|
|
||||||
func (m *Monitor) Register(pid int, c *Cmd) {
|
func (m *Monitor) Register(pid int, c *Cmd) {
|
||||||
m.Lock()
|
m.Lock()
|
||||||
|
if status, ok := m.unknown[pid]; ok {
|
||||||
|
delete(m.unknown, pid)
|
||||||
|
m.cmds[pid] = c
|
||||||
|
m.Unlock()
|
||||||
|
c.ExitCh <- status
|
||||||
|
return
|
||||||
|
}
|
||||||
m.cmds[pid] = c
|
m.cmds[pid] = c
|
||||||
m.Unlock()
|
m.Unlock()
|
||||||
}
|
}
|
||||||
@ -117,6 +126,19 @@ func (m *Monitor) WaitPid(pid int) (int, error) {
|
|||||||
return ec, nil
|
return ec, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Command returns the registered pid for the command created
|
||||||
|
func (m *Monitor) Command(pid int) *Cmd {
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
return m.cmds[pid]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Monitor) Delete(pid int) {
|
||||||
|
m.Lock()
|
||||||
|
delete(m.cmds, pid)
|
||||||
|
m.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
type Cmd struct {
|
type Cmd struct {
|
||||||
c *exec.Cmd
|
c *exec.Cmd
|
||||||
ExitCh chan int
|
ExitCh chan int
|
||||||
|
24
task.go
24
task.go
@ -6,6 +6,7 @@ import (
|
|||||||
|
|
||||||
"github.com/containerd/containerd/api/services/execution"
|
"github.com/containerd/containerd/api/services/execution"
|
||||||
taskapi "github.com/containerd/containerd/api/types/task"
|
taskapi "github.com/containerd/containerd/api/types/task"
|
||||||
|
specs "github.com/opencontainers/runtime-spec/specs-go"
|
||||||
)
|
)
|
||||||
|
|
||||||
const UnknownExitStatus = 255
|
const UnknownExitStatus = 255
|
||||||
@ -21,14 +22,22 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Task interface {
|
type Task interface {
|
||||||
|
Pid() uint32
|
||||||
Delete(context.Context) (uint32, error)
|
Delete(context.Context) (uint32, error)
|
||||||
Kill(context.Context, syscall.Signal) error
|
Kill(context.Context, syscall.Signal) error
|
||||||
Pause(context.Context) error
|
Pause(context.Context) error
|
||||||
Resume(context.Context) error
|
Resume(context.Context) error
|
||||||
Pid() uint32
|
|
||||||
Start(context.Context) error
|
Start(context.Context) error
|
||||||
Status(context.Context) (TaskStatus, error)
|
Status(context.Context) (TaskStatus, error)
|
||||||
Wait(context.Context) (uint32, error)
|
Wait(context.Context) (uint32, error)
|
||||||
|
Exec(context.Context, *specs.Process, IOCreation) (Process, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Process interface {
|
||||||
|
Pid() uint32
|
||||||
|
Start(context.Context) error
|
||||||
|
Kill(context.Context, syscall.Signal) error
|
||||||
|
Wait(context.Context) (uint32, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ = (Task)(&task{})
|
var _ = (Task)(&task{})
|
||||||
@ -121,3 +130,16 @@ func (t *task) Delete(ctx context.Context) (uint32, error) {
|
|||||||
}
|
}
|
||||||
return r.ExitStatus, cerr
|
return r.ExitStatus, cerr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *task) Exec(ctx context.Context, spec *specs.Process, ioCreate IOCreation) (Process, error) {
|
||||||
|
i, err := ioCreate()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &process{
|
||||||
|
task: t,
|
||||||
|
io: i,
|
||||||
|
spec: spec,
|
||||||
|
pidSync: make(chan struct{}),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user