Add Exec IDs

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby
2017-06-28 12:54:10 -07:00
parent e283b3802d
commit f93bfb6233
36 changed files with 1441 additions and 2055 deletions

View File

@@ -6,7 +6,9 @@ import (
"fmt"
"os"
"sync"
"syscall"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"github.com/containerd/console"
events "github.com/containerd/containerd/api/services/events/v1"
@@ -16,7 +18,6 @@ import (
google_protobuf "github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"golang.org/x/net/context"
"golang.org/x/sys/unix"
)
const (
@@ -34,7 +35,7 @@ func NewService(path, namespace string) (*Service, error) {
}
return &Service{
path: path,
processes: make(map[int]process),
processes: make(map[string]process),
events: make(chan *events.RuntimeEvent, 4096),
namespace: namespace,
}, nil
@@ -46,34 +47,38 @@ type Service struct {
id string
bundle string
mu sync.Mutex
processes map[int]process
processes map[string]process
events chan *events.RuntimeEvent
eventsMu sync.Mutex
deferredEvent *events.RuntimeEvent
execID int
namespace string
}
func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) {
if r.ID == "" {
return nil, grpc.Errorf(codes.InvalidArgument, "task id cannot be empty")
}
process, err := newInitProcess(ctx, s.path, s.namespace, r)
if err != nil {
return nil, err
}
s.mu.Lock()
// save the main task id and bundle to the shim for additional requests
s.id = r.ID
s.bundle = r.Bundle
s.initProcess = process
pid := process.Pid()
s.processes[pid] = process
s.processes[r.ID] = process
s.mu.Unlock()
cmd := &reaper.Cmd{
ExitCh: make(chan int, 1),
}
reaper.Default.Register(pid, cmd)
s.events <- &events.RuntimeEvent{
Type: events.RuntimeEvent_CREATE,
ID: r.ID,
Pid: uint32(pid),
Type: events.RuntimeEvent_CREATE,
ID: r.ID,
ContainerID: s.id,
Pid: uint32(pid),
}
go s.waitExit(process, pid, cmd)
return &shimapi.CreateTaskResponse{
@@ -89,9 +94,10 @@ func (s *Service) Start(ctx context.Context, r *google_protobuf.Empty) (*google_
return nil, err
}
s.events <- &events.RuntimeEvent{
Type: events.RuntimeEvent_START,
ID: s.id,
Pid: uint32(s.initProcess.Pid()),
Type: events.RuntimeEvent_START,
ID: s.id,
ContainerID: s.id,
Pid: uint32(s.initProcess.Pid()),
}
return empty, nil
}
@@ -104,7 +110,7 @@ func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimap
// TODO (@crosbymichael): how to handle errors here
p.Delete(ctx)
s.mu.Lock()
delete(s.processes, p.Pid())
delete(s.processes, p.ID())
s.mu.Unlock()
return &shimapi.DeleteResponse{
ExitStatus: uint32(p.Status()),
@@ -117,19 +123,19 @@ func (s *Service) DeleteProcess(ctx context.Context, r *shimapi.DeleteProcessReq
if s.initProcess == nil {
return nil, errors.New(ErrContainerNotCreated)
}
if int(r.Pid) == s.initProcess.pid {
return nil, fmt.Errorf("cannot delete init process with DeleteProcess")
if r.ID == s.initProcess.id {
return nil, grpc.Errorf(codes.InvalidArgument, "cannot delete init process with DeleteProcess")
}
s.mu.Lock()
p, ok := s.processes[int(r.Pid)]
p, ok := s.processes[r.ID]
s.mu.Unlock()
if !ok {
return nil, fmt.Errorf("process %d not found", r.Pid)
return nil, fmt.Errorf("process %s not found", r.ID)
}
// TODO (@crosbymichael): how to handle errors here
p.Delete(ctx)
s.mu.Lock()
delete(s.processes, p.Pid())
delete(s.processes, p.ID())
s.mu.Unlock()
return &shimapi.DeleteResponse{
ExitStatus: uint32(p.Status()),
@@ -144,9 +150,8 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*shi
}
s.mu.Lock()
defer s.mu.Unlock()
s.execID++
process, err := newExecProcess(ctx, s.path, r, s.initProcess, s.execID)
process, err := newExecProcess(ctx, s.path, r, s.initProcess, r.ID)
if err != nil {
return nil, err
}
@@ -155,12 +160,13 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*shi
ExitCh: make(chan int, 1),
}
reaper.Default.Register(pid, cmd)
s.processes[pid] = process
s.processes[r.ID] = process
s.events <- &events.RuntimeEvent{
Type: events.RuntimeEvent_EXEC_ADDED,
ID: s.id,
Pid: uint32(pid),
Type: events.RuntimeEvent_EXEC_ADDED,
ID: r.ID,
ContainerID: s.id,
Pid: uint32(pid),
}
go s.waitExit(process, pid, cmd)
return &shimapi.ExecProcessResponse{
@@ -169,18 +175,18 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*shi
}
func (s *Service) ResizePty(ctx context.Context, r *shimapi.ResizePtyRequest) (*google_protobuf.Empty, error) {
if r.Pid == 0 {
return nil, errors.Errorf("pid not provided in request")
if r.ID == "" {
return nil, grpc.Errorf(codes.InvalidArgument, "id not provided")
}
ws := console.WinSize{
Width: uint16(r.Width),
Height: uint16(r.Height),
}
s.mu.Lock()
p, ok := s.processes[int(r.Pid)]
p, ok := s.processes[r.ID]
s.mu.Unlock()
if !ok {
return nil, errors.Errorf("process does not exist %d", r.Pid)
return nil, errors.Errorf("process does not exist %s", r.ID)
}
if err := p.Resize(ws); err != nil {
return nil, err
@@ -212,10 +218,14 @@ func (s *Service) Stream(r *shimapi.StreamEventsRequest, stream shimapi.Shim_Str
}
}
func (s *Service) State(ctx context.Context, r *google_protobuf.Empty) (*shimapi.StateResponse, error) {
func (s *Service) State(ctx context.Context, r *shimapi.StateRequest) (*shimapi.StateResponse, error) {
if s.initProcess == nil {
return nil, errors.New(ErrContainerNotCreated)
}
p, ok := s.processes[r.ID]
if !ok {
return nil, grpc.Errorf(codes.NotFound, "process id %s not found", r.ID)
}
st, err := s.initProcess.ContainerStatus(ctx)
if err != nil {
return nil, err
@@ -231,38 +241,17 @@ func (s *Service) State(ctx context.Context, r *google_protobuf.Empty) (*shimapi
case "paused":
status = task.StatusPaused
}
o := &shimapi.StateResponse{
ID: s.id,
Bundle: s.bundle,
Pid: uint32(s.initProcess.Pid()),
Status: status,
Processes: []*task.Process{},
Stdin: s.initProcess.stdinPath,
Stdout: s.initProcess.stdoutPath,
Stderr: s.initProcess.stderrPath,
}
s.mu.Lock()
defer s.mu.Unlock()
for _, p := range s.processes {
status := task.StatusRunning
if err := unix.Kill(p.Pid(), 0); err != nil {
if err != syscall.ESRCH {
return nil, err
}
status = task.StatusStopped
}
pp := &task.Process{
Pid: uint32(p.Pid()),
Status: status,
}
if ep, ok := p.(*execProcess); ok {
pp.Stdin = ep.stdinPath
pp.Stdout = ep.stdoutPath
pp.Stderr = ep.stderrPath
}
o.Processes = append(o.Processes, pp)
}
return o, nil
sio := p.Stdio()
return &shimapi.StateResponse{
ID: p.ID(),
Bundle: s.bundle,
Pid: uint32(p.Pid()),
Status: status,
Stdin: sio.stdin,
Stdout: sio.stdout,
Stderr: sio.stderr,
Terminal: sio.terminal,
}, nil
}
func (s *Service) Pause(ctx context.Context, r *google_protobuf.Empty) (*google_protobuf.Empty, error) {
@@ -289,35 +278,19 @@ func (s *Service) Kill(ctx context.Context, r *shimapi.KillRequest) (*google_pro
if s.initProcess == nil {
return nil, errors.New(ErrContainerNotCreated)
}
if r.Pid == 0 {
if r.ID == "" {
if err := s.initProcess.Kill(ctx, r.Signal, r.All); err != nil {
return nil, err
}
return empty, nil
}
if int(r.Pid) == s.initProcess.pid {
if err := s.initProcess.Kill(ctx, r.Signal, r.All); err != nil {
return nil, err
}
return empty, nil
p, ok := s.processes[r.ID]
if !ok {
return nil, grpc.Errorf(codes.NotFound, "process id %s not found", r.ID)
}
pids, err := s.getContainerPids(ctx, s.initProcess.id)
if err != nil {
if err := p.Kill(ctx, r.Signal, r.All); err != nil {
return nil, err
}
valid := false
for _, p := range pids {
if r.Pid == p {
valid = true
break
}
}
if !valid {
return nil, errors.Errorf("process %d does not exist in container", r.Pid)
}
if err := unix.Kill(int(r.Pid), syscall.Signal(r.Signal)); err != nil {
return nil, checkKillError(err)
}
return empty, nil
}
@@ -332,9 +305,9 @@ func (s *Service) ListPids(ctx context.Context, r *shimapi.ListPidsRequest) (*sh
}
func (s *Service) CloseIO(ctx context.Context, r *shimapi.CloseIORequest) (*google_protobuf.Empty, error) {
p, ok := s.processes[int(r.Pid)]
p, ok := s.processes[r.ID]
if !ok {
return nil, fmt.Errorf("process does not exist %d", r.Pid)
return nil, grpc.Errorf(codes.NotFound, "process does not exist %s", r.ID)
}
if err := p.Stdin().Close(); err != nil {
return nil, err
@@ -374,11 +347,12 @@ func (s *Service) waitExit(p process, pid int, cmd *reaper.Cmd) {
reaper.Default.Delete(pid)
s.events <- &events.RuntimeEvent{
Type: events.RuntimeEvent_EXIT,
ID: s.id,
Pid: uint32(pid),
ExitStatus: uint32(status),
ExitedAt: p.ExitedAt(),
Type: events.RuntimeEvent_EXIT,
ID: p.ID(),
ContainerID: s.id,
Pid: uint32(pid),
ExitStatus: uint32(status),
ExitedAt: p.ExitedAt(),
}
}
@@ -387,11 +361,9 @@ func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, er
if err != nil {
return nil, err
}
pids := make([]uint32, 0, len(p))
for _, pid := range p {
pids = append(pids, uint32(pid))
}
return pids, nil
}