Switch runc shim to task v3

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2023-10-13 10:12:53 -07:00
parent f7af7fce8a
commit daaf67662f
No known key found for this signature in database
5 changed files with 43 additions and 28 deletions

View File

@ -65,8 +65,8 @@ func (m manager) Name() string {
return m.name return m.name
} }
func (m manager) Start(ctx context.Context, id string, opts shim.StartOpts) (string, error) { func (m manager) Start(ctx context.Context, id string, opts shim.StartOpts) (shim.BootstrapParams, error) {
return "", errdefs.ErrNotImplemented return shim.BootstrapParams{}, errdefs.ErrNotImplemented
} }
func (m manager) Stop(ctx context.Context, id string) (shim.StopStatus, error) { func (m manager) Stop(ctx context.Context, id string) (shim.StopStatus, error) {

View File

@ -30,7 +30,7 @@ import (
"github.com/containerd/cgroups/v3/cgroup1" "github.com/containerd/cgroups/v3/cgroup1"
cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2" cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2"
"github.com/containerd/console" "github.com/containerd/console"
"github.com/containerd/containerd/api/runtime/task/v2" "github.com/containerd/containerd/api/runtime/task/v3"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/mount" "github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"

View File

@ -118,15 +118,19 @@ func (m manager) Name() string {
return m.name return m.name
} }
func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ string, retErr error) { func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ shim.BootstrapParams, retErr error) {
var params shim.BootstrapParams
params.Version = 2
params.Protocol = "ttrpc"
cmd, err := newCommand(ctx, id, opts.Address, opts.TTRPCAddress, opts.Debug) cmd, err := newCommand(ctx, id, opts.Address, opts.TTRPCAddress, opts.Debug)
if err != nil { if err != nil {
return "", err return params, err
} }
grouping := id grouping := id
spec, err := readSpec() spec, err := readSpec()
if err != nil { if err != nil {
return "", err return params, err
} }
for _, group := range groupLabels { for _, group := range groupLabels {
if groupID, ok := spec.Annotations[group]; ok { if groupID, ok := spec.Annotations[group]; ok {
@ -136,7 +140,7 @@ func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ str
} }
address, err := shim.SocketAddress(ctx, opts.Address, grouping) address, err := shim.SocketAddress(ctx, opts.Address, grouping)
if err != nil { if err != nil {
return "", err return params, err
} }
socket, err := shim.NewSocket(address) socket, err := shim.NewSocket(address)
@ -146,19 +150,20 @@ func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ str
// grouping functionality where the new process should be run with the same // grouping functionality where the new process should be run with the same
// shim as an existing container // shim as an existing container
if !shim.SocketEaddrinuse(err) { if !shim.SocketEaddrinuse(err) {
return "", fmt.Errorf("create new shim socket: %w", err) return params, fmt.Errorf("create new shim socket: %w", err)
} }
if shim.CanConnect(address) { if shim.CanConnect(address) {
if err := shim.WriteAddress("address", address); err != nil { if err := shim.WriteAddress("address", address); err != nil {
return "", fmt.Errorf("write existing socket for shim: %w", err) return params, fmt.Errorf("write existing socket for shim: %w", err)
} }
return address, nil params.Address = address
return params, nil
} }
if err := shim.RemoveSocket(address); err != nil { if err := shim.RemoveSocket(address); err != nil {
return "", fmt.Errorf("remove pre-existing socket: %w", err) return params, fmt.Errorf("remove pre-existing socket: %w", err)
} }
if socket, err = shim.NewSocket(address); err != nil { if socket, err = shim.NewSocket(address); err != nil {
return "", fmt.Errorf("try create new shim socket 2x: %w", err) return params, fmt.Errorf("try create new shim socket 2x: %w", err)
} }
} }
defer func() { defer func() {
@ -170,12 +175,12 @@ func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ str
// make sure that reexec shim-v2 binary use the value if need // make sure that reexec shim-v2 binary use the value if need
if err := shim.WriteAddress("address", address); err != nil { if err := shim.WriteAddress("address", address); err != nil {
return "", err return params, err
} }
f, err := socket.File() f, err := socket.File()
if err != nil { if err != nil {
return "", err return params, err
} }
cmd.ExtraFiles = append(cmd.ExtraFiles, f) cmd.ExtraFiles = append(cmd.ExtraFiles, f)
@ -183,13 +188,13 @@ func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ str
goruntime.LockOSThread() goruntime.LockOSThread()
if os.Getenv("SCHED_CORE") != "" { if os.Getenv("SCHED_CORE") != "" {
if err := schedcore.Create(schedcore.ProcessGroup); err != nil { if err := schedcore.Create(schedcore.ProcessGroup); err != nil {
return "", fmt.Errorf("enable sched core support: %w", err) return params, fmt.Errorf("enable sched core support: %w", err)
} }
} }
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
f.Close() f.Close()
return "", err return params, err
} }
goruntime.UnlockOSThread() goruntime.UnlockOSThread()
@ -207,27 +212,29 @@ func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ str
if cgroups.Mode() == cgroups.Unified { if cgroups.Mode() == cgroups.Unified {
cg, err := cgroupsv2.Load(opts.ShimCgroup) cg, err := cgroupsv2.Load(opts.ShimCgroup)
if err != nil { if err != nil {
return "", fmt.Errorf("failed to load cgroup %s: %w", opts.ShimCgroup, err) return params, fmt.Errorf("failed to load cgroup %s: %w", opts.ShimCgroup, err)
} }
if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil { if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil {
return "", fmt.Errorf("failed to join cgroup %s: %w", opts.ShimCgroup, err) return params, fmt.Errorf("failed to join cgroup %s: %w", opts.ShimCgroup, err)
} }
} else { } else {
cg, err := cgroup1.Load(cgroup1.StaticPath(opts.ShimCgroup)) cg, err := cgroup1.Load(cgroup1.StaticPath(opts.ShimCgroup))
if err != nil { if err != nil {
return "", fmt.Errorf("failed to load cgroup %s: %w", opts.ShimCgroup, err) return params, fmt.Errorf("failed to load cgroup %s: %w", opts.ShimCgroup, err)
} }
if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil { if err := cg.AddProc(uint64(cmd.Process.Pid)); err != nil {
return "", fmt.Errorf("failed to join cgroup %s: %w", opts.ShimCgroup, err) return params, fmt.Errorf("failed to join cgroup %s: %w", opts.ShimCgroup, err)
} }
} }
} }
} }
if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil { if err := shim.AdjustOOMScore(cmd.Process.Pid); err != nil {
return "", fmt.Errorf("failed to adjust OOM score for shim: %w", err) return params, fmt.Errorf("failed to adjust OOM score for shim: %w", err)
} }
return address, nil
params.Address = address
return params, nil
} }
func (manager) Stop(ctx context.Context, id string) (shim.StopStatus, error) { func (manager) Stop(ctx context.Context, id string) (shim.StopStatus, error) {

View File

@ -28,7 +28,7 @@ import (
"github.com/containerd/cgroups/v3/cgroup1" "github.com/containerd/cgroups/v3/cgroup1"
cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2" cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2"
eventstypes "github.com/containerd/containerd/api/events" eventstypes "github.com/containerd/containerd/api/events"
taskAPI "github.com/containerd/containerd/api/runtime/task/v2" taskAPI "github.com/containerd/containerd/api/runtime/task/v3"
"github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
@ -58,7 +58,7 @@ var (
) )
// NewTaskService creates a new instance of a task service // NewTaskService creates a new instance of a task service
func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TaskService, error) { func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.Service) (taskAPI.TTRPCTaskService, error) {
var ( var (
ep oom.Watcher ep oom.Watcher
err error err error
@ -252,7 +252,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
} }
func (s *service) RegisterTTRPC(server *ttrpc.Server) error { func (s *service) RegisterTTRPC(server *ttrpc.Server) error {
taskAPI.RegisterTaskService(server, s) taskAPI.RegisterTTRPCTaskService(server, s)
return nil return nil
} }

View File

@ -18,6 +18,7 @@ package shim
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
@ -76,7 +77,7 @@ type StopStatus struct {
// Manager is the interface which manages the shim process // Manager is the interface which manages the shim process
type Manager interface { type Manager interface {
Name() string Name() string
Start(ctx context.Context, id string, opts StartOpts) (string, error) Start(ctx context.Context, id string, opts StartOpts) (BootstrapParams, error)
Stop(ctx context.Context, id string) (StopStatus, error) Stop(ctx context.Context, id string) (StopStatus, error)
} }
@ -268,13 +269,20 @@ func run(ctx context.Context, manager Manager, name string, config Config) error
Debug: debugFlag, Debug: debugFlag,
} }
address, err := manager.Start(ctx, id, opts) params, err := manager.Start(ctx, id, opts)
if err != nil { if err != nil {
return err return err
} }
if _, err := os.Stdout.WriteString(address); err != nil {
data, err := json.Marshal(&params)
if err != nil {
return fmt.Errorf("failed to marshal bootstrap params to json: %w", err)
}
if _, err := os.Stdout.Write(data); err != nil {
return err return err
} }
return nil return nil
} }