Add shim start for shim creation

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2018-07-11 11:59:28 -04:00
parent da1b5470cd
commit 7e49c601a8
13 changed files with 1317 additions and 414 deletions

106
runtime/v2/binary.go Normal file
View File

@ -0,0 +1,106 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v2
import (
"context"
"strings"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/runtime"
client "github.com/containerd/containerd/runtime/v2/shim"
"github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/ttrpc"
"github.com/pkg/errors"
)
func shimBinary(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, events *exchange.Exchange, rt *runtime.TaskList) *binary {
return &binary{
bundle: bundle,
runtime: runtime,
containerdAddress: containerdAddress,
events: events,
rtTasks: rt,
}
}
type binary struct {
runtime string
containerdAddress string
bundle *Bundle
events *exchange.Exchange
rtTasks *runtime.TaskList
}
func (b *binary) Start(ctx context.Context) (*shim, error) {
cmd, err := client.Command(ctx, b.runtime, b.containerdAddress, b.bundle.Path, "-id", b.bundle.ID, "start")
if err != nil {
return nil, err
}
out, err := cmd.CombinedOutput()
if err != nil {
return nil, errors.Wrapf(err, "%s", out)
}
address := strings.TrimSpace(string(out))
conn, err := client.Connect(address, client.AnonDialer)
if err != nil {
return nil, err
}
client := ttrpc.NewClient(conn)
client.OnClose(func() { conn.Close() })
return &shim{
bundle: b.bundle,
client: client,
task: task.NewTaskClient(client),
events: b.events,
rtTasks: b.rtTasks,
}, nil
}
func (b *binary) Delete(ctx context.Context) (*runtime.Exit, error) {
cmd, err := client.Command(ctx, b.runtime, b.containerdAddress, b.bundle.Path, "-id", b.bundle.ID, "delete")
if err != nil {
return nil, err
}
out, err := cmd.CombinedOutput()
if err != nil {
return nil, errors.Wrapf(err, "%s", out)
}
var response task.DeleteResponse
if err := response.Unmarshal(out); err != nil {
return nil, err
}
if err := b.bundle.Delete(); err != nil {
return nil, err
}
// remove self from the runtime task list
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
b.rtTasks.Delete(ctx, b.bundle.ID)
// shim will send the exit event
b.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
ContainerID: b.bundle.ID,
ExitStatus: response.ExitStatus,
ExitedAt: response.ExitedAt,
Pid: response.Pid,
})
return &runtime.Exit{
Status: response.ExitStatus,
Timestamp: response.ExitedAt,
Pid: response.Pid,
}, nil
}

View File

@ -33,13 +33,10 @@ import (
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
ptypes "github.com/gogo/protobuf/types"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
var empty = &ptypes.Empty{}
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.RuntimePluginV2,
@ -112,7 +109,8 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create
bundle.Delete()
}
}()
shim, err := newShim(ctx, bundle, opts.Runtime, m.containerdAddress, m.events, m.tasks)
b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.events, m.tasks)
shim, err := b.Start(ctx)
if err != nil {
return nil, err
}

View File

@ -40,7 +40,8 @@ func (p *process) ID() string {
func (p *process) Kill(ctx context.Context, signal uint32, _ bool) error {
_, err := p.shim.task.Kill(ctx, &task.KillRequest{
Signal: signal,
ID: p.id,
ID: p.shim.ID(),
ExecID: p.id,
})
if err != nil {
return errdefs.FromGRPC(err)
@ -50,7 +51,8 @@ func (p *process) Kill(ctx context.Context, signal uint32, _ bool) error {
func (p *process) State(ctx context.Context) (runtime.State, error) {
response, err := p.shim.task.State(ctx, &task.StateRequest{
ID: p.id,
ID: p.shim.ID(),
ExecID: p.id,
})
if err != nil {
if errors.Cause(err) != ttrpc.ErrClosed {
@ -86,7 +88,8 @@ func (p *process) State(ctx context.Context) (runtime.State, error) {
// ResizePty changes the side of the process's PTY to the provided width and height
func (p *process) ResizePty(ctx context.Context, size runtime.ConsoleSize) error {
_, err := p.shim.task.ResizePty(ctx, &task.ResizePtyRequest{
ID: p.id,
ID: p.shim.ID(),
ExecID: p.id,
Width: size.Width,
Height: size.Height,
})
@ -99,8 +102,9 @@ func (p *process) ResizePty(ctx context.Context, size runtime.ConsoleSize) error
// CloseIO closes the provided IO pipe for the process
func (p *process) CloseIO(ctx context.Context) error {
_, err := p.shim.task.CloseIO(ctx, &task.CloseIORequest{
ID: p.id,
Stdin: true,
ID: p.shim.ID(),
ExecID: p.id,
Stdin: true,
})
if err != nil {
return errdefs.FromGRPC(err)
@ -111,7 +115,8 @@ func (p *process) CloseIO(ctx context.Context) error {
// Start the process
func (p *process) Start(ctx context.Context) error {
response, err := p.shim.task.Start(ctx, &task.StartRequest{
ID: p.id,
ID: p.shim.ID(),
ExecID: p.id,
})
if err != nil {
return errdefs.FromGRPC(err)
@ -127,7 +132,8 @@ func (p *process) Start(ctx context.Context) error {
// Wait on the process to exit and return the exit status and timestamp
func (p *process) Wait(ctx context.Context) (*runtime.Exit, error) {
response, err := p.shim.task.Wait(ctx, &task.WaitRequest{
ID: p.id,
ID: p.shim.ID(),
ExecID: p.id,
})
if err != nil {
return nil, errdefs.FromGRPC(err)
@ -140,7 +146,8 @@ func (p *process) Wait(ctx context.Context) (*runtime.Exit, error) {
func (p *process) Delete(ctx context.Context) (*runtime.Exit, error) {
response, err := p.shim.task.Delete(ctx, &task.DeleteRequest{
ID: p.id,
ID: p.shim.ID(),
ExecID: p.id,
})
if err != nil {
return nil, errdefs.FromGRPC(err)

View File

@ -22,8 +22,10 @@ import (
"context"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"sync"
"syscall"
"time"
"github.com/containerd/cgroups"
@ -83,6 +85,7 @@ type service struct {
mu sync.Mutex
context context.Context
task rproc.Process
processes map[string]rproc.Process
events chan interface{}
platform rproc.Platform
@ -94,6 +97,77 @@ type service struct {
cg cgroups.Cgroup
}
func newCommand(ctx context.Context, containerdBinary, containerdAddress string) (*exec.Cmd, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
self, err := os.Executable()
if err != nil {
return nil, err
}
cwd, err := os.Getwd()
if err != nil {
return nil, err
}
args := []string{
"-namespace", ns,
"-address", containerdAddress,
"-publish-binary", containerdBinary,
}
cmd := exec.Command(self, args...)
cmd.Dir = cwd
cmd.Env = append(os.Environ(), "GOMAXPROCS=2")
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
return cmd, nil
}
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) {
cmd, err := newCommand(ctx, containerdBinary, containerdAddress)
if err != nil {
return "", err
}
address, err := shim.AbstractAddress(ctx, id)
if err != nil {
return "", err
}
socket, err := shim.NewSocket(address)
if err != nil {
return "", err
}
defer socket.Close()
f, err := socket.File()
if err != nil {
return "", err
}
defer f.Close()
cmd.ExtraFiles = append(cmd.ExtraFiles, f)
if err := cmd.Start(); err != nil {
return "", err
}
defer func() {
if err != nil {
cmd.Process.Kill()
}
}()
// make sure to wait after start
go cmd.Wait()
if err := shim.WritePidFile("shim.pid", cmd.Process.Pid); err != nil {
return "", err
}
if err := shim.WriteAddress("address", address); err != nil {
return "", err
}
if err := shim.SetScore(cmd.Process.Pid); err != nil {
return "", errors.Wrap(err, "failed to set OOM Score on shim")
}
return address, nil
}
func (s *service) Cleanup(ctx context.Context) (*taskAPI.DeleteResponse, error) {
path, err := os.Getwd()
if err != nil {
@ -221,7 +295,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
}
s.cg = cg
}
s.processes[r.ID] = process
s.task = process
return &taskAPI.CreateTaskResponse{
Pid: uint32(pid),
}, nil
@ -232,9 +306,9 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.StartResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[r.ID]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process %s", r.ID)
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
}
if err := p.Start(ctx); err != nil {
return nil, err
@ -247,7 +321,6 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
s.cg = cg
}
return &taskAPI.StartResponse{
ID: p.ID(),
Pid: uint32(p.Pid()),
}, nil
}
@ -256,16 +329,21 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAPI.DeleteResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[r.ID]
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
}
if p == nil {
return nil, errors.Wrapf(errdefs.ErrNotFound, "process %s", r.ID)
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.Delete(ctx); err != nil {
return nil, err
}
delete(s.processes, r.ID)
if r.ID == s.id && s.platform != nil {
isTask := r.ExecID == ""
if !isTask {
delete(s.processes, r.ExecID)
}
if isTask && s.platform != nil {
s.platform.Close()
}
return &taskAPI.DeleteResponse{
@ -279,18 +357,15 @@ func (s *service) Delete(ctx context.Context, r *taskAPI.DeleteRequest) (*taskAP
func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
if p := s.processes[r.ID]; p != nil {
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ID)
if p := s.processes[r.ExecID]; p != nil {
return nil, errdefs.ToGRPCf(errdefs.ErrAlreadyExists, "id %s", r.ExecID)
}
p := s.processes[s.id]
p := s.task
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
process, err := p.(*proc.Init).Exec(ctx, s.bundle, &proc.ExecConfig{
ID: r.ID,
ID: r.ExecID,
Terminal: r.Terminal,
Stdin: r.Stdin,
Stdout: r.Stdout,
@ -300,7 +375,7 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
if err != nil {
return nil, errdefs.ToGRPC(err)
}
s.processes[r.ID] = process
s.processes[r.ExecID] = process
return empty, nil
}
@ -308,17 +383,14 @@ func (s *service) Exec(ctx context.Context, r *taskAPI.ExecProcessRequest) (*pty
func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
if r.ID == "" {
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "id not provided")
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
}
ws := console.WinSize{
Width: uint16(r.Width),
Height: uint16(r.Height),
}
p := s.processes[r.ID]
if p == nil {
return nil, errors.Errorf("process does not exist %s", r.ID)
}
if err := p.Resize(ws); err != nil {
return nil, errdefs.ToGRPC(err)
}
@ -329,9 +401,9 @@ func (s *service) ResizePty(ctx context.Context, r *taskAPI.ResizePtyRequest) (*
func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.StateResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[r.ID]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s", r.ID)
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
}
st, err := p.Status(ctx)
if err != nil {
@ -366,10 +438,10 @@ func (s *service) State(ctx context.Context, r *taskAPI.StateRequest) (*taskAPI.
}
// Pause the container
func (s *service) Pause(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) {
func (s *service) Pause(ctx context.Context, r *taskAPI.PauseRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
p := s.task
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
@ -380,10 +452,10 @@ func (s *service) Pause(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, er
}
// Resume the container
func (s *service) Resume(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, error) {
func (s *service) Resume(ctx context.Context, r *taskAPI.ResumeRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
p := s.task
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
@ -397,20 +469,12 @@ func (s *service) Resume(ctx context.Context, r *ptypes.Empty) (*ptypes.Empty, e
func (s *service) Kill(ctx context.Context, r *taskAPI.KillRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
if r.ID == "" {
p := s.processes[s.id]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.Kill(ctx, r.Signal, r.All); err != nil {
return nil, errdefs.ToGRPC(err)
}
return empty, nil
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
}
p := s.processes[r.ID]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process id %s not found", r.ID)
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.Kill(ctx, r.Signal, r.All); err != nil {
return nil, errdefs.ToGRPC(err)
@ -453,9 +517,9 @@ func (s *service) Pids(ctx context.Context, r *taskAPI.PidsRequest) (*taskAPI.Pi
func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[r.ID]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process does not exist %s", r.ID)
p, err := s.getProcess(r.ExecID)
if err != nil {
return nil, err
}
if stdin := p.Stdin(); stdin != nil {
if err := stdin.Close(); err != nil {
@ -469,7 +533,7 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (*ptyp
func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
p := s.task
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
@ -497,9 +561,13 @@ func (s *service) Checkpoint(ctx context.Context, r *taskAPI.CheckpointTaskReque
// Connect returns shim information such as the shim's pid
func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*taskAPI.ConnectResponse, error) {
var pid int
if s.task != nil {
pid = s.task.Pid()
}
return &taskAPI.ConnectResponse{
ShimPid: uint32(os.Getpid()),
TaskPid: uint32(s.processes[s.id].Pid()),
TaskPid: uint32(pid),
}, nil
}
@ -532,7 +600,7 @@ func (s *service) Stats(ctx context.Context, r *taskAPI.StatsRequest) (*taskAPI.
func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*ptypes.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
p := s.task
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
@ -545,8 +613,11 @@ func (s *service) Update(ctx context.Context, r *taskAPI.UpdateTaskRequest) (*pt
// Wait for a process to exit
func (s *service) Wait(ctx context.Context, r *taskAPI.WaitRequest) (*taskAPI.WaitResponse, error) {
s.mu.Lock()
p := s.processes[r.ID]
p, err := s.getProcess(r.ExecID)
s.mu.Unlock()
if err != nil {
return nil, err
}
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
@ -567,7 +638,8 @@ func (s *service) processExits() {
func (s *service) checkProcesses(e runcC.Exit) {
s.mu.Lock()
defer s.mu.Unlock()
for _, p := range s.processes {
for _, p := range s.allProcesses() {
if p.Pid() == e.Pid {
if ip, ok := p.(*proc.Init); ok {
// Ensure all children are killed
@ -589,14 +661,23 @@ func (s *service) checkProcesses(e runcC.Exit) {
}
}
func (s *service) allProcesses() (o []rproc.Process) {
for _, p := range s.processes {
o = append(o, p)
}
if s.task != nil {
o = append(o, s.task)
}
return o
}
func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, error) {
s.mu.Lock()
defer s.mu.Unlock()
p := s.processes[s.id]
p := s.task
if p == nil {
return nil, errors.Wrapf(errdefs.ErrFailedPrecondition, "container must be created")
}
ps, err := p.(*proc.Init).Runtime().Ps(ctx, id)
if err != nil {
return nil, err
@ -616,6 +697,17 @@ func (s *service) forward(publisher events.Publisher) {
}
}
func (s *service) getProcess(execID string) (rproc.Process, error) {
if execID == "" {
return s.task, nil
}
p := s.processes[execID]
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrNotFound, "process does not exist %s", execID)
}
return p, nil
}
func getTopic(ctx context.Context, e interface{}) string {
switch e.(type) {
case *eventstypes.TaskCreate:

View File

@ -18,6 +18,7 @@ package v2
import (
"context"
"io/ioutil"
"path/filepath"
"time"
@ -29,129 +30,27 @@ import (
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/runtime"
client "github.com/containerd/containerd/runtime/v2/shim"
"github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/ttrpc"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type binary struct {
runtime string
containerdAddress string
bundle *Bundle
events *exchange.Exchange
rtTasks *runtime.TaskList
}
func (b *binary) Delete(ctx context.Context) (*runtime.Exit, error) {
cmd, err := shimCommand(ctx, b.runtime, b.containerdAddress, b.bundle, "-debug", "delete")
func loadAddress(path string) (string, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
return "", err
}
out, err := cmd.CombinedOutput()
if err != nil {
return nil, errors.Wrapf(err, "%s", out)
}
var response task.DeleteResponse
if err := response.Unmarshal(out); err != nil {
return nil, err
}
if err := b.bundle.Delete(); err != nil {
return nil, err
}
// remove self from the runtime task list
// this seems dirty but it cleans up the API across runtimes, tasks, and the service
b.rtTasks.Delete(ctx, b.bundle.ID)
// shim will send the exit event
b.events.Publish(ctx, runtime.TaskDeleteEventTopic, &eventstypes.TaskDelete{
ContainerID: b.bundle.ID,
ExitStatus: response.ExitStatus,
ExitedAt: response.ExitedAt,
Pid: response.Pid,
})
return &runtime.Exit{
Status: response.ExitStatus,
Timestamp: response.ExitedAt,
Pid: response.Pid,
}, nil
}
func shimBinary(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, events *exchange.Exchange, rt *runtime.TaskList) *binary {
return &binary{
bundle: bundle,
runtime: runtime,
containerdAddress: containerdAddress,
events: events,
rtTasks: rt,
}
}
// newShim starts and returns a new shim
func newShim(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, events *exchange.Exchange, rt *runtime.TaskList) (_ *shim, err error) {
cmd, err := shimCommand(ctx, runtime, containerdAddress, bundle, "-debug")
if err != nil {
return nil, err
}
address, err := abstractAddress(ctx, bundle.ID)
if err != nil {
return nil, err
}
socket, err := newSocket(address)
if err != nil {
return nil, err
}
defer socket.Close()
f, err := socket.File()
if err != nil {
return nil, err
}
defer f.Close()
cmd.ExtraFiles = append(cmd.ExtraFiles, f)
if err := cmd.Start(); err != nil {
return nil, err
}
defer func() {
if err != nil {
cmd.Process.Kill()
}
}()
// make sure to wait after start
go cmd.Wait()
if err := writePidFile(filepath.Join(bundle.Path, "shim.pid"), cmd.Process.Pid); err != nil {
return nil, err
}
log.G(ctx).WithFields(logrus.Fields{
"pid": cmd.Process.Pid,
"address": address,
}).Infof("shim %s started", cmd.Args[0])
if err := setScore(cmd.Process.Pid); err != nil {
return nil, errors.Wrap(err, "failed to set OOM Score on shim")
}
conn, err := connect(address, annonDialer)
if err != nil {
return nil, err
}
client := ttrpc.NewClient(conn)
client.OnClose(func() { conn.Close() })
return &shim{
bundle: bundle,
client: client,
task: task.NewTaskClient(client),
shimPid: cmd.Process.Pid,
events: events,
rtTasks: rt,
}, nil
return string(data), nil
}
func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt *runtime.TaskList) (_ *shim, err error) {
address, err := abstractAddress(ctx, bundle.ID)
address, err := loadAddress(filepath.Join(bundle.Path, "address"))
if err != nil {
return nil, err
}
conn, err := connect(address, annonDialer)
conn, err := client.Connect(address, client.AnonDialer)
if err != nil {
return nil, err
}
@ -174,7 +73,6 @@ type shim struct {
bundle *Bundle
client *ttrpc.Client
task task.TaskService
shimPid int
taskPid int
events *exchange.Exchange
rtTasks *runtime.TaskList
@ -185,15 +83,15 @@ func (s *shim) Connect(ctx context.Context) error {
if err != nil {
return err
}
s.shimPid = int(response.ShimPid)
s.taskPid = int(response.TaskPid)
return nil
}
func (s *shim) Shutdown(ctx context.Context) error {
_, err := s.task.Shutdown(ctx, &task.ShutdownRequest{})
if err != nil {
// handle conn closed error type
_, err := s.task.Shutdown(ctx, &task.ShutdownRequest{
ID: s.ID(),
})
if err != nil && err != ttrpc.ErrClosed {
return errdefs.FromGRPC(err)
}
return nil
@ -208,12 +106,8 @@ func (s *shim) waitShutdown(ctx context.Context) error {
close(dead)
}()
select {
case <-time.After(1 * time.Second):
if err := terminate(s.shimPid); err != nil {
log.G(ctx).WithError(err).Error("terminate shim")
}
<-dead
return nil
case <-time.After(3 * time.Second):
return errors.New("failed to shutdown shim in time")
case <-dead:
return nil
}
@ -292,7 +186,9 @@ func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Tas
}
func (s *shim) Pause(ctx context.Context) error {
if _, err := s.task.Pause(ctx, empty); err != nil {
if _, err := s.task.Pause(ctx, &task.PauseRequest{
ID: s.ID(),
}); err != nil {
return errdefs.FromGRPC(err)
}
s.events.Publish(ctx, runtime.TaskPausedEventTopic, &eventstypes.TaskPaused{
@ -302,7 +198,9 @@ func (s *shim) Pause(ctx context.Context) error {
}
func (s *shim) Resume(ctx context.Context) error {
if _, err := s.task.Resume(ctx, empty); err != nil {
if _, err := s.task.Resume(ctx, &task.ResumeRequest{
ID: s.ID(),
}); err != nil {
return errdefs.FromGRPC(err)
}
s.events.Publish(ctx, runtime.TaskResumedEventTopic, &eventstypes.TaskResumed{
@ -342,7 +240,8 @@ func (s *shim) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runt
return nil, errors.Wrapf(err, "invalid exec id %s", id)
}
request := &task.ExecProcessRequest{
ID: id,
ID: s.ID(),
ExecID: id,
Stdin: opts.IO.Stdin,
Stdout: opts.IO.Stdout,
Stderr: opts.IO.Stderr,
@ -428,6 +327,7 @@ func (s *shim) Checkpoint(ctx context.Context, path string, options *ptypes.Any)
func (s *shim) Update(ctx context.Context, resources *ptypes.Any) error {
if _, err := s.task.Update(ctx, &task.UpdateTaskRequest{
ID: s.ID(),
Resources: resources,
}); err != nil {
return errdefs.FromGRPC(err)
@ -436,7 +336,9 @@ func (s *shim) Update(ctx context.Context, resources *ptypes.Any) error {
}
func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) {
response, err := s.task.Stats(ctx, &task.StatsRequest{})
response, err := s.task.Stats(ctx, &task.StatsRequest{
ID: s.ID(),
})
if err != nil {
return nil, errdefs.FromGRPC(err)
}

View File

@ -57,6 +57,7 @@ type Init func(context.Context, string, events.Publisher) (Shim, error)
type Shim interface {
shimapi.TaskService
Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error)
StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error)
}
var (
@ -72,7 +73,7 @@ var (
func parseFlags() {
flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs")
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
flag.StringVar(&idFlag, "id", "", "id of the shim")
flag.StringVar(&idFlag, "id", "", "id of the task")
flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve")
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
@ -92,7 +93,7 @@ func setRuntime() {
if debugFlag {
f, err := os.OpenFile("shim.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
fmt.Fprintf(os.Stderr, "open should log %s", err)
fmt.Fprintf(os.Stderr, "open shim log %s", err)
os.Exit(1)
}
logrus.SetLevel(logrus.DebugLevel)
@ -148,6 +149,15 @@ func Run(initFunc Init) error {
return err
}
return nil
case "start":
address, err := service.StartShim(ctx, idFlag, containerdBinaryFlag, addressFlag)
if err != nil {
return err
}
if _, err := os.Stdout.WriteString(address); err != nil {
return err
}
return nil
default:
client := NewShimClient(ctx, service, signals)
return client.Serve()

View File

@ -14,7 +14,7 @@
limitations under the License.
*/
package v2
package shim
import (
"context"
@ -32,7 +32,8 @@ import (
const shimBinaryFormat = "containerd-shim-%s-%s"
func shimCommand(ctx context.Context, runtime, containerdAddress string, bundle *Bundle, cmdArgs ...string) (*exec.Cmd, error) {
// Command returns the shim command with the provided args and configuration
func Command(ctx context.Context, runtime, containerdAddress, path string, cmdArgs ...string) (*exec.Cmd, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
@ -47,7 +48,7 @@ func shimCommand(ctx context.Context, runtime, containerdAddress string, bundle
"-publish-binary", self,
}
args = append(args, cmdArgs...)
name := getShimBinaryName(runtime)
name := BinaryName(runtime)
if _, err := exec.LookPath(name); err != nil {
if eerr, ok := err.(*exec.Error); ok {
if eerr.Err == exec.ErrNotFound {
@ -56,19 +57,21 @@ func shimCommand(ctx context.Context, runtime, containerdAddress string, bundle
}
}
cmd := exec.Command(name, args...)
cmd.Dir = bundle.Path
cmd.Dir = path
cmd.Env = append(os.Environ(), "GOMAXPROCS=2")
cmd.SysProcAttr = getSysProcAttr()
return cmd, nil
}
func getShimBinaryName(runtime string) string {
// BinaryName returns the shim binary name from the runtime name
func BinaryName(runtime string) string {
parts := strings.Split(runtime, ".")
// TODO: add validation for runtime
return fmt.Sprintf(shimBinaryFormat, parts[len(parts)-2], parts[len(parts)-1])
}
func abstractAddress(ctx context.Context, id string) (string, error) {
// AbstractAddress returns an abstract socket address
func AbstractAddress(ctx context.Context, id string) (string, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return "", err
@ -76,16 +79,19 @@ func abstractAddress(ctx context.Context, id string) (string, error) {
return filepath.Join(string(filepath.Separator), "containerd-shim", ns, id, "shim.sock"), nil
}
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
// Connect to the provided address
func Connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
return d(address, 100*time.Second)
}
func annonDialer(address string, timeout time.Duration) (net.Conn, error) {
// AnonDialer returns a dialer for an abstract socket
func AnonDialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", "\x00"+address, timeout)
}
func newSocket(address string) (*net.UnixListener, error) {
// NewSocket returns a new socket
func NewSocket(address string) (*net.UnixListener, error) {
if len(address) > 106 {
return nil, errors.Errorf("%q: unix socket path too long (> 106)", address)
}
@ -96,7 +102,8 @@ func newSocket(address string) (*net.UnixListener, error) {
return l.(*net.UnixListener), nil
}
func writePidFile(path string, pid int) error {
// WritePidFile writes a pid file atomically
func WritePidFile(path string, pid int) error {
path, err := filepath.Abs(path)
if err != nil {
return err
@ -113,3 +120,22 @@ func writePidFile(path string, pid int) error {
}
return os.Rename(tempPath, path)
}
// WriteAddress writes a address file atomically
func WriteAddress(path, address string) error {
path, err := filepath.Abs(path)
if err != nil {
return err
}
tempPath := filepath.Join(filepath.Dir(path), fmt.Sprintf(".%s", filepath.Base(path)))
f, err := os.OpenFile(tempPath, os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_SYNC, 0666)
if err != nil {
return err
}
_, err = f.WriteString(address)
f.Close()
if err != nil {
return err
}
return os.Rename(tempPath, path)
}

View File

@ -14,13 +14,12 @@
limitations under the License.
*/
package v2
package shim
import (
"syscall"
"github.com/containerd/containerd/sys"
"golang.org/x/sys/unix"
)
func getSysProcAttr() *syscall.SysProcAttr {
@ -29,10 +28,7 @@ func getSysProcAttr() *syscall.SysProcAttr {
}
}
func terminate(pid int) error {
return unix.Kill(pid, unix.SIGKILL)
}
func setScore(pid int) error {
// SetScore sets the oom score for a process
func SetScore(pid int) error {
return sys.SetOOMScore(pid, sys.OOMScoreMaxKillable)
}

View File

@ -16,12 +16,10 @@
limitations under the License.
*/
package v2
package shim
import (
"syscall"
"golang.org/x/sys/unix"
)
func getSysProcAttr() *syscall.SysProcAttr {
@ -30,10 +28,7 @@ func getSysProcAttr() *syscall.SysProcAttr {
}
}
func terminate(pid int) error {
return unix.Kill(pid, unix.SIGKILL)
}
func setScore(pid int) error {
// SetScore sets the oom score for a process
func SetScore(pid int) error {
return nil
}

View File

@ -14,10 +14,9 @@
limitations under the License.
*/
package v2
package shim
import (
"os"
"syscall"
)
@ -25,14 +24,7 @@ func getSysProcAttr() *syscall.SysProcAttr {
return nil
}
func terminate(pid int) error {
p, err := os.FindProcess(pid)
if err != nil {
return err
}
return p.Kill()
}
func setScore(pid int) error {
// SetScore sets the oom score for a process
func SetScore(pid int) error {
return nil
}

File diff suppressed because it is too large Load Diff

View File

@ -21,8 +21,8 @@ service Task {
rpc Start(StartRequest) returns (StartResponse);
rpc Delete(DeleteRequest) returns (DeleteResponse);
rpc Pids(PidsRequest) returns (PidsResponse);
rpc Pause(google.protobuf.Empty) returns (google.protobuf.Empty);
rpc Resume(google.protobuf.Empty) returns (google.protobuf.Empty);
rpc Pause(PauseRequest) returns (google.protobuf.Empty);
rpc Resume(ResumeRequest) returns (google.protobuf.Empty);
rpc Checkpoint(CheckpointTaskRequest) returns (google.protobuf.Empty);
rpc Kill(KillRequest) returns (google.protobuf.Empty);
rpc Exec(ExecProcessRequest) returns (google.protobuf.Empty);
@ -54,6 +54,7 @@ message CreateTaskResponse {
message DeleteRequest {
string id = 1;
string exec_id = 2;
}
message DeleteResponse {
@ -64,11 +65,12 @@ message DeleteResponse {
message ExecProcessRequest {
string id = 1;
bool terminal = 2;
string stdin = 3;
string stdout = 4;
string stderr = 5;
google.protobuf.Any spec = 6;
string exec_id = 2;
bool terminal = 3;
string stdin = 4;
string stdout = 5;
string stderr = 6;
google.protobuf.Any spec = 7;
}
message ExecProcessResponse {
@ -76,12 +78,14 @@ message ExecProcessResponse {
message ResizePtyRequest {
string id = 1;
uint32 width = 2;
uint32 height = 3;
string exec_id = 2;
uint32 width = 3;
uint32 height = 4;
}
message StateRequest {
string id = 1;
string exec_id = 2;
}
message StateResponse {
@ -99,13 +103,15 @@ message StateResponse {
message KillRequest {
string id = 1;
uint32 signal = 2;
bool all = 3;
string exec_id = 2;
uint32 signal = 3;
bool all = 4;
}
message CloseIORequest {
string id = 1;
bool stdin = 2;
string exec_id = 2;
bool stdin = 3;
}
message PidsRequest {
@ -117,25 +123,28 @@ message PidsResponse {
}
message CheckpointTaskRequest {
string path = 1;
google.protobuf.Any options = 2;
string id = 1;
string path = 2;
google.protobuf.Any options = 3;
}
message UpdateTaskRequest {
google.protobuf.Any resources = 1;
string id = 1;
google.protobuf.Any resources = 2;
}
message StartRequest {
string id = 1;
string exec_id = 2;
}
message StartResponse {
string id = 1;
uint32 pid = 2;
uint32 pid = 1;
}
message WaitRequest {
string id = 1;
string exec_id = 2;
}
message WaitResponse {
@ -144,7 +153,7 @@ message WaitResponse {
}
message StatsRequest {
string id = 1;
}
message StatsResponse {
@ -152,7 +161,7 @@ message StatsResponse {
}
message ConnectRequest {
string id = 1;
}
message ConnectResponse {
@ -162,5 +171,14 @@ message ConnectResponse {
}
message ShutdownRequest {
bool now = 1;
string id = 1;
bool now = 2;
}
message PauseRequest {
string id = 1;
}
message ResumeRequest {
string id = 1;
}

View File

@ -585,9 +585,9 @@ func getTasksMetrics(ctx context.Context, filter filters.Filter, tasks []runtime
case "id":
return t.ID(), true
case "namespace":
// return t.Info().Namespace, true
return t.Namespace(), true
case "runtime":
// return t.Info().Runtime, true
// return t.Info().Runtime, true
}
return "", false
})) {