Move local sandbox controller under plugins package

Add options to sandbox controller interface.
Update sandbox controller interface to fully utilize sandbox controller
interface.
Move grpc error conversion to service.

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2023-01-19 00:05:47 -08:00
parent 2717685dad
commit a788f6c799
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
9 changed files with 384 additions and 269 deletions

View File

@ -269,7 +269,7 @@ func (c *Controller) Start(ctx context.Context, id string) (cin sandbox.Controll
return return
} }
func (c *Controller) Create(ctx context.Context, _id string) error { func (c *Controller) Create(ctx context.Context, _id string, _ ...sandbox.CreateOpt) error {
// Not used by pod-sandbox implementation as there is no need to split pause containers logic. // Not used by pod-sandbox implementation as there is no need to split pause containers logic.
return nil return nil
} }

View File

@ -29,9 +29,10 @@ import (
sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox" sandboxstore "github.com/containerd/containerd/pkg/cri/store/sandbox"
ctrdutil "github.com/containerd/containerd/pkg/cri/util" ctrdutil "github.com/containerd/containerd/pkg/cri/util"
"github.com/containerd/containerd/protobuf" "github.com/containerd/containerd/protobuf"
"github.com/containerd/containerd/sandbox"
) )
func (c *Controller) Stop(ctx context.Context, sandboxID string) error { func (c *Controller) Stop(ctx context.Context, sandboxID string, _ ...sandbox.StopOpt) error {
sandbox, err := c.sandboxStore.Get(sandboxID) sandbox, err := c.sandboxStore.Get(sandboxID)
if err != nil { if err != nil {
return fmt.Errorf("an error occurred when try to find sandbox %q: %w", return fmt.Errorf("an error occurred when try to find sandbox %q: %w",

View File

@ -88,6 +88,8 @@ const (
TransferPlugin Type = "io.containerd.transfer.v1" TransferPlugin Type = "io.containerd.transfer.v1"
// SandboxStorePlugin implements a sandbox store // SandboxStorePlugin implements a sandbox store
SandboxStorePlugin Type = "io.containerd.sandbox.store.v1" SandboxStorePlugin Type = "io.containerd.sandbox.store.v1"
// SandboxControllerPlugin implements a sandbox controller
SandboxControllerPlugin Type = "io.containerd.sandbox.controller.v1"
) )
const ( const (

View File

@ -0,0 +1,263 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sandbox
import (
"context"
"fmt"
runtimeAPI "github.com/containerd/containerd/api/runtime/sandbox/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
v2 "github.com/containerd/containerd/runtime/v2"
"github.com/containerd/containerd/sandbox"
"google.golang.org/protobuf/types/known/anypb"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.SandboxControllerPlugin,
ID: "local",
Requires: []plugin.Type{
plugin.RuntimePluginV2,
plugin.EventPlugin,
plugin.SandboxStorePlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
shimPlugin, err := ic.GetByID(plugin.RuntimePluginV2, "shim")
if err != nil {
return nil, err
}
exchangePlugin, err := ic.GetByID(plugin.EventPlugin, "exchange")
if err != nil {
return nil, err
}
sbPlugin, err := ic.GetByID(plugin.SandboxStorePlugin, "local")
if err != nil {
return nil, err
}
var (
shims = shimPlugin.(*v2.ShimManager)
publisher = exchangePlugin.(*exchange.Exchange)
store = sbPlugin.(sandbox.Store)
)
return &controllerLocal{
shims: shims,
store: store,
publisher: publisher,
}, nil
},
})
}
type controllerLocal struct {
shims *v2.ShimManager
store sandbox.Store
publisher events.Publisher
}
var _ sandbox.Controller = (*controllerLocal)(nil)
func (c *controllerLocal) Create(ctx context.Context, sandboxID string, opts ...sandbox.CreateOpt) error {
var coptions sandbox.CreateOptions
for _, opt := range opts {
opt(&coptions)
}
if _, err := c.shims.Get(ctx, sandboxID); err == nil {
return fmt.Errorf("sandbox %s already running: %w", sandboxID, errdefs.ErrAlreadyExists)
}
info, err := c.store.Get(ctx, sandboxID)
if err != nil {
return fmt.Errorf("failed to query sandbox metadata from store: %w", err)
}
shim, err := c.shims.Start(ctx, sandboxID, runtime.CreateOpts{
Spec: info.Spec,
RuntimeOptions: info.Runtime.Options,
Runtime: info.Runtime.Name,
TaskOptions: nil,
})
if err != nil {
return fmt.Errorf("failed to start new sandbox: %w", err)
}
svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())
var options *anypb.Any
if coptions.Options != nil {
options = &anypb.Any{
TypeUrl: coptions.Options.GetTypeUrl(),
Value: coptions.Options.GetValue(),
}
}
if _, err := svc.CreateSandbox(ctx, &runtimeAPI.CreateSandboxRequest{
SandboxID: sandboxID,
BundlePath: shim.Bundle(),
Rootfs: coptions.Rootfs,
Options: options,
}); err != nil {
// TODO: Delete sandbox shim here.
return fmt.Errorf("failed to start sandbox %s: %w", sandboxID, errdefs.FromGRPC(err))
}
return nil
}
func (c *controllerLocal) Start(ctx context.Context, sandboxID string) (sandbox.ControllerInstance, error) {
shim, err := c.shims.Get(ctx, sandboxID)
if err != nil {
return sandbox.ControllerInstance{}, fmt.Errorf("unable to find sandbox %q", sandboxID)
}
svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())
resp, err := svc.StartSandbox(ctx, &runtimeAPI.StartSandboxRequest{SandboxID: sandboxID})
if err != nil {
return sandbox.ControllerInstance{}, fmt.Errorf("failed to start sandbox %s: %w", sandboxID, errdefs.FromGRPC(err))
}
return sandbox.ControllerInstance{
SandboxID: sandboxID,
Pid: resp.GetPid(),
CreatedAt: resp.GetCreatedAt().AsTime(),
}, nil
}
func (c *controllerLocal) Platform(ctx context.Context, sandboxID string) (platforms.Platform, error) {
svc, err := c.getSandbox(ctx, sandboxID)
if err != nil {
return platforms.Platform{}, err
}
response, err := svc.Platform(ctx, &runtimeAPI.PlatformRequest{SandboxID: sandboxID})
if err != nil {
return platforms.Platform{}, fmt.Errorf("failed to get sandbox platform: %w", errdefs.FromGRPC(err))
}
var platform platforms.Platform
if p := response.GetPlatform(); p != nil {
platform.OS = p.OS
platform.Architecture = p.Architecture
platform.Variant = p.Variant
}
return platform, nil
}
func (c *controllerLocal) Stop(ctx context.Context, sandboxID string, opts ...sandbox.StopOpt) error {
var soptions sandbox.StopOptions
for _, opt := range opts {
opt(&soptions)
}
req := &runtimeAPI.StopSandboxRequest{SandboxID: sandboxID}
if soptions.Timeout != nil {
req.TimeoutSecs = uint32(soptions.Timeout.Seconds())
}
svc, err := c.getSandbox(ctx, sandboxID)
if err != nil {
return err
}
if _, err := svc.StopSandbox(ctx, req); err != nil {
return fmt.Errorf("failed to stop sandbox: %w", errdefs.FromGRPC(err))
}
return nil
}
func (c *controllerLocal) Shutdown(ctx context.Context, sandboxID string) error {
svc, err := c.getSandbox(ctx, sandboxID)
if err != nil {
return err
}
_, err = svc.ShutdownSandbox(ctx, &runtimeAPI.ShutdownSandboxRequest{SandboxID: sandboxID})
if err != nil {
return fmt.Errorf("failed to shutdown sandbox: %w", errdefs.FromGRPC(err))
}
if err := c.shims.Delete(ctx, sandboxID); err != nil {
return fmt.Errorf("failed to delete sandbox shim: %w", err)
}
return nil
}
func (c *controllerLocal) Wait(ctx context.Context, sandboxID string) (sandbox.ExitStatus, error) {
svc, err := c.getSandbox(ctx, sandboxID)
if err != nil {
return sandbox.ExitStatus{}, err
}
resp, err := svc.WaitSandbox(ctx, &runtimeAPI.WaitSandboxRequest{
SandboxID: sandboxID,
})
if err != nil {
return sandbox.ExitStatus{}, fmt.Errorf("failed to wait sandbox %s: %w", sandboxID, errdefs.FromGRPC(err))
}
return sandbox.ExitStatus{
ExitStatus: resp.GetExitStatus(),
ExitedAt: resp.GetExitedAt().AsTime(),
}, nil
}
func (c *controllerLocal) Status(ctx context.Context, sandboxID string, verbose bool) (sandbox.ControllerStatus, error) {
svc, err := c.getSandbox(ctx, sandboxID)
if err != nil {
return sandbox.ControllerStatus{}, err
}
resp, err := svc.SandboxStatus(ctx, &runtimeAPI.SandboxStatusRequest{
SandboxID: sandboxID,
Verbose: verbose,
})
if err != nil {
return sandbox.ControllerStatus{}, fmt.Errorf("failed to query sandbox %s status: %w", sandboxID, err)
}
return sandbox.ControllerStatus{
SandboxID: resp.GetSandboxID(),
Pid: resp.GetPid(),
State: resp.GetState(),
ExitedAt: resp.GetCreatedAt().AsTime(),
Extra: resp.GetExtra(),
}, nil
}
func (c *controllerLocal) getSandbox(ctx context.Context, id string) (runtimeAPI.TTRPCSandboxService, error) {
shim, err := c.shims.Get(ctx, id)
if err != nil {
return nil, errdefs.ErrNotFound
}
svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())
return svc, nil
}

View File

@ -20,23 +20,57 @@ import (
"context" "context"
"time" "time"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/platforms" "github.com/containerd/containerd/platforms"
"github.com/containerd/typeurl" "github.com/containerd/typeurl"
) )
type CreateOptions struct {
Rootfs []*types.Mount
Options typeurl.Any
}
type CreateOpt func(*CreateOptions)
// WithRootFS is used to create a sandbox with the provided rootfs mount
// TODO: Switch to mount.Mount once target added
func WithRootFS(m []*types.Mount) CreateOpt {
return func(co *CreateOptions) {
co.Rootfs = m
}
}
func WithOptions(a typeurl.Any) CreateOpt {
return func(co *CreateOptions) {
co.Options = a
}
}
type StopOptions struct {
Timeout *time.Duration
}
type StopOpt func(*StopOptions)
func WithTimeout(timeout time.Duration) StopOpt {
return func(so *StopOptions) {
so.Timeout = &timeout
}
}
// Controller is an interface to manage sandboxes at runtime. // Controller is an interface to manage sandboxes at runtime.
// When running in sandbox mode, shim expected to implement `SandboxService`. // When running in sandbox mode, shim expected to implement `SandboxService`.
// Shim lifetimes are now managed manually via sandbox API by the containerd's client. // Shim lifetimes are now managed manually via sandbox API by the containerd's client.
type Controller interface { type Controller interface {
// Create is used to initialize sandbox environment. // Create is used to initialize sandbox environment. (mounts, any)
Create(ctx context.Context, sandboxID string) error Create(ctx context.Context, sandboxID string, opts ...CreateOpt) error
// Start will start previously created sandbox. // Start will start previously created sandbox.
Start(ctx context.Context, sandboxID string) (ControllerInstance, error) Start(ctx context.Context, sandboxID string) (ControllerInstance, error)
// Platform returns target sandbox OS that will be used by Controller. // Platform returns target sandbox OS that will be used by Controller.
// containerd will rely on this to generate proper OCI spec. // containerd will rely on this to generate proper OCI spec.
Platform(_ctx context.Context, _sandboxID string) (platforms.Platform, error) Platform(_ctx context.Context, _sandboxID string) (platforms.Platform, error)
// Stop will stop sandbox instance // Stop will stop sandbox instance
Stop(ctx context.Context, sandboxID string) error Stop(ctx context.Context, sandboxID string, opts ...StopOpt) error
// Wait blocks until sandbox process exits. // Wait blocks until sandbox process exits.
Wait(ctx context.Context, sandboxID string) (ExitStatus, error) Wait(ctx context.Context, sandboxID string) (ExitStatus, error)
// Status will query sandbox process status. It is heavier than Ping call and must be used whenever you need to // Status will query sandbox process status. It is heavier than Ping call and must be used whenever you need to

View File

@ -22,7 +22,9 @@ import (
api "github.com/containerd/containerd/api/services/sandbox/v1" api "github.com/containerd/containerd/api/services/sandbox/v1"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/platforms" "github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/sandbox"
sb "github.com/containerd/containerd/sandbox" sb "github.com/containerd/containerd/sandbox"
"google.golang.org/protobuf/types/known/anypb"
) )
// remoteSandboxController is a low level GRPC client for containerd's sandbox controller service // remoteSandboxController is a low level GRPC client for containerd's sandbox controller service
@ -37,8 +39,19 @@ func NewSandboxController(client api.ControllerClient) sb.Controller {
return &remoteSandboxController{client: client} return &remoteSandboxController{client: client}
} }
func (s *remoteSandboxController) Create(ctx context.Context, sandboxID string) error { func (s *remoteSandboxController) Create(ctx context.Context, sandboxID string, opts ...sandbox.CreateOpt) error {
_, err := s.client.Create(ctx, &api.ControllerCreateRequest{SandboxID: sandboxID}) var options sandbox.CreateOptions
for _, opt := range opts {
opt(&options)
}
_, err := s.client.Create(ctx, &api.ControllerCreateRequest{
SandboxID: sandboxID,
Rootfs: options.Rootfs,
Options: &anypb.Any{
TypeUrl: options.Options.GetTypeUrl(),
Value: options.Options.GetValue(),
},
})
if err != nil { if err != nil {
return errdefs.FromGRPC(err) return errdefs.FromGRPC(err)
} }
@ -74,8 +87,16 @@ func (s *remoteSandboxController) Platform(ctx context.Context, sandboxID string
}, nil }, nil
} }
func (s *remoteSandboxController) Stop(ctx context.Context, sandboxID string) error { func (s *remoteSandboxController) Stop(ctx context.Context, sandboxID string, opts ...sandbox.StopOpt) error {
_, err := s.client.Stop(ctx, &api.ControllerStopRequest{SandboxID: sandboxID}) var soptions sandbox.StopOptions
for _, opt := range opts {
opt(&soptions)
}
req := &api.ControllerStopRequest{SandboxID: sandboxID}
if soptions.Timeout != nil {
req.TimeoutSecs = uint32(soptions.Timeout.Seconds())
}
_, err := s.client.Stop(ctx, req)
if err != nil { if err != nil {
return errdefs.FromGRPC(err) return errdefs.FromGRPC(err)
} }

View File

@ -1,236 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sandbox
import (
"context"
"fmt"
runtimeAPI "github.com/containerd/containerd/api/runtime/sandbox/v1"
api "github.com/containerd/containerd/api/services/sandbox/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
v2 "github.com/containerd/containerd/runtime/v2"
"github.com/containerd/containerd/sandbox"
"github.com/containerd/containerd/services"
"google.golang.org/grpc"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.ServicePlugin,
ID: services.SandboxControllerService,
Requires: []plugin.Type{
plugin.RuntimePluginV2,
plugin.EventPlugin,
plugin.SandboxStorePlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
shimPlugin, err := ic.GetByID(plugin.RuntimePluginV2, "shim")
if err != nil {
return nil, err
}
exchangePlugin, err := ic.GetByID(plugin.EventPlugin, "exchange")
if err != nil {
return nil, err
}
storePlugin, err := ic.GetByID(plugin.SandboxStorePlugin, "local")
if err != nil {
return nil, err
}
var (
shims = shimPlugin.(*v2.ShimManager)
publisher = exchangePlugin.(*exchange.Exchange)
store = storePlugin.(sandbox.Store)
)
return &controllerLocal{
shims: shims,
store: store,
publisher: publisher,
}, nil
},
})
}
type controllerLocal struct {
shims *v2.ShimManager
store sandbox.Store
publisher events.Publisher
}
var _ api.ControllerClient = (*controllerLocal)(nil)
func (c *controllerLocal) Create(ctx context.Context, in *api.ControllerCreateRequest, opts ...grpc.CallOption) (*api.ControllerCreateResponse, error) {
if _, err := c.shims.Get(ctx, in.SandboxID); err == nil {
return nil, fmt.Errorf("sandbox %s already running: %w", in.SandboxID, errdefs.ErrAlreadyExists)
}
info, err := c.store.Get(ctx, in.SandboxID)
if err != nil {
return nil, fmt.Errorf("failed to query sandbox metadata from store: %w", err)
}
shim, err := c.shims.Start(ctx, in.SandboxID, runtime.CreateOpts{
Spec: info.Spec,
RuntimeOptions: info.Runtime.Options,
Runtime: info.Runtime.Name,
TaskOptions: nil,
})
if err != nil {
return nil, fmt.Errorf("failed to start new sandbox: %w", err)
}
svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())
if _, err := svc.CreateSandbox(ctx, &runtimeAPI.CreateSandboxRequest{
SandboxID: in.SandboxID,
BundlePath: shim.Bundle(),
Rootfs: in.Rootfs,
Options: in.Options,
}); err != nil {
// TODO: Delete sandbox shim here.
return nil, fmt.Errorf("failed to start sandbox %s: %w", in.SandboxID, err)
}
return &api.ControllerCreateResponse{SandboxID: in.SandboxID}, nil
}
func (c *controllerLocal) Start(ctx context.Context, in *api.ControllerStartRequest, opts ...grpc.CallOption) (*api.ControllerStartResponse, error) {
shim, err := c.shims.Get(ctx, in.GetSandboxID())
if err != nil {
return nil, fmt.Errorf("unable to find sandbox %q", in.GetSandboxID())
}
svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())
resp, err := svc.StartSandbox(ctx, &runtimeAPI.StartSandboxRequest{SandboxID: in.SandboxID})
if err != nil {
return nil, fmt.Errorf("failed to start sandbox %s: %w", in.SandboxID, err)
}
return &api.ControllerStartResponse{
SandboxID: in.SandboxID,
Pid: resp.Pid,
CreatedAt: resp.CreatedAt,
}, nil
}
func (c *controllerLocal) Platform(ctx context.Context, in *api.ControllerPlatformRequest, opts ...grpc.CallOption) (*api.ControllerPlatformResponse, error) {
svc, err := c.getSandbox(ctx, in.SandboxID)
if err != nil {
return nil, err
}
response, err := svc.Platform(ctx, &runtimeAPI.PlatformRequest{SandboxID: in.GetSandboxID()})
if err != nil {
return nil, fmt.Errorf("failed to get sandbox platform: %w", err)
}
return &api.ControllerPlatformResponse{
Platform: response.GetPlatform(),
}, nil
}
func (c *controllerLocal) Stop(ctx context.Context, in *api.ControllerStopRequest, opts ...grpc.CallOption) (*api.ControllerStopResponse, error) {
svc, err := c.getSandbox(ctx, in.SandboxID)
if err != nil {
return nil, err
}
if _, err := svc.StopSandbox(ctx, &runtimeAPI.StopSandboxRequest{
SandboxID: in.SandboxID,
TimeoutSecs: in.TimeoutSecs,
}); err != nil {
return nil, fmt.Errorf("failed to stop sandbox: %w", err)
}
return &api.ControllerStopResponse{}, nil
}
func (c *controllerLocal) Shutdown(ctx context.Context, in *api.ControllerShutdownRequest, opts ...grpc.CallOption) (*api.ControllerShutdownResponse, error) {
svc, err := c.getSandbox(ctx, in.SandboxID)
if err != nil {
return nil, err
}
_, err = svc.ShutdownSandbox(ctx, &runtimeAPI.ShutdownSandboxRequest{SandboxID: in.SandboxID})
if err != nil {
return nil, errdefs.ToGRPC(fmt.Errorf("failed to shutdown sandbox: %w", err))
}
if err := c.shims.Delete(ctx, in.SandboxID); err != nil {
return nil, errdefs.ToGRPC(fmt.Errorf("failed to delete sandbox shim: %w", err))
}
return &api.ControllerShutdownResponse{}, nil
}
func (c *controllerLocal) Wait(ctx context.Context, in *api.ControllerWaitRequest, opts ...grpc.CallOption) (*api.ControllerWaitResponse, error) {
svc, err := c.getSandbox(ctx, in.SandboxID)
if err != nil {
return nil, err
}
resp, err := svc.WaitSandbox(ctx, &runtimeAPI.WaitSandboxRequest{
SandboxID: in.SandboxID,
})
if err != nil {
return nil, fmt.Errorf("failed to wait sandbox %s: %w", in.SandboxID, err)
}
return &api.ControllerWaitResponse{
ExitStatus: resp.ExitStatus,
ExitedAt: resp.ExitedAt,
}, nil
}
func (c *controllerLocal) Status(ctx context.Context, in *api.ControllerStatusRequest, opts ...grpc.CallOption) (*api.ControllerStatusResponse, error) {
svc, err := c.getSandbox(ctx, in.SandboxID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
resp, err := svc.SandboxStatus(ctx, &runtimeAPI.SandboxStatusRequest{SandboxID: in.SandboxID})
if err != nil {
return nil, fmt.Errorf("failed to query sandbox %s status: %w", in.SandboxID, err)
}
return &api.ControllerStatusResponse{
SandboxID: resp.SandboxID,
Pid: resp.Pid,
State: resp.State,
ExitedAt: resp.ExitedAt,
Extra: resp.Extra,
}, nil
}
func (c *controllerLocal) getSandbox(ctx context.Context, id string) (runtimeAPI.TTRPCSandboxService, error) {
shim, err := c.shims.Get(ctx, id)
if err != nil {
return nil, errdefs.ErrNotFound
}
svc := runtimeAPI.NewTTRPCSandboxClient(shim.Client())
return svc, nil
}

View File

@ -18,13 +18,15 @@ package sandbox
import ( import (
"context" "context"
"errors"
api "github.com/containerd/containerd/api/services/sandbox/v1" api "github.com/containerd/containerd/api/services/sandbox/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services" "github.com/containerd/containerd/protobuf"
"github.com/containerd/containerd/sandbox"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/protobuf/types/known/anypb"
) )
func init() { func init() {
@ -32,33 +34,23 @@ func init() {
Type: plugin.GRPCPlugin, Type: plugin.GRPCPlugin,
ID: "sandbox-controllers", ID: "sandbox-controllers",
Requires: []plugin.Type{ Requires: []plugin.Type{
plugin.ServicePlugin, plugin.SandboxControllerPlugin,
}, },
InitFn: func(ic *plugin.InitContext) (interface{}, error) { InitFn: func(ic *plugin.InitContext) (interface{}, error) {
plugins, err := ic.GetByType(plugin.ServicePlugin) sc, err := ic.GetByID(plugin.SandboxControllerPlugin, "local")
if err != nil {
return nil, err
}
p, ok := plugins[services.SandboxControllerService]
if !ok {
return nil, errors.New("sandbox service not found")
}
i, err := p.Instance()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &controllerService{ return &controllerService{
local: i.(api.ControllerClient), local: sc.(sandbox.Controller),
}, nil }, nil
}, },
}) })
} }
type controllerService struct { type controllerService struct {
local api.ControllerClient local sandbox.Controller
api.UnimplementedControllerServer api.UnimplementedControllerServer
} }
@ -71,30 +63,68 @@ func (s *controllerService) Register(server *grpc.Server) error {
func (s *controllerService) Create(ctx context.Context, req *api.ControllerCreateRequest) (*api.ControllerCreateResponse, error) { func (s *controllerService) Create(ctx context.Context, req *api.ControllerCreateRequest) (*api.ControllerCreateResponse, error) {
log.G(ctx).WithField("req", req).Debug("create sandbox") log.G(ctx).WithField("req", req).Debug("create sandbox")
return s.local.Create(ctx, req) // TODO: Rootfs, any
err := s.local.Create(ctx, req.GetSandboxID())
if err != nil {
return &api.ControllerCreateResponse{}, errdefs.ToGRPC(err)
}
return &api.ControllerCreateResponse{
SandboxID: req.GetSandboxID(),
}, nil
} }
func (s *controllerService) Start(ctx context.Context, req *api.ControllerStartRequest) (*api.ControllerStartResponse, error) { func (s *controllerService) Start(ctx context.Context, req *api.ControllerStartRequest) (*api.ControllerStartResponse, error) {
log.G(ctx).WithField("req", req).Debug("start sandbox") log.G(ctx).WithField("req", req).Debug("start sandbox")
return s.local.Start(ctx, req) inst, err := s.local.Start(ctx, req.GetSandboxID())
if err != nil {
return &api.ControllerStartResponse{}, errdefs.ToGRPC(err)
}
return &api.ControllerStartResponse{
SandboxID: inst.SandboxID,
Pid: inst.Pid,
CreatedAt: protobuf.ToTimestamp(inst.CreatedAt),
Labels: inst.Labels,
}, nil
} }
func (s *controllerService) Stop(ctx context.Context, req *api.ControllerStopRequest) (*api.ControllerStopResponse, error) { func (s *controllerService) Stop(ctx context.Context, req *api.ControllerStopRequest) (*api.ControllerStopResponse, error) {
log.G(ctx).WithField("req", req).Debug("delete sandbox") log.G(ctx).WithField("req", req).Debug("delete sandbox")
return s.local.Stop(ctx, req) return &api.ControllerStopResponse{}, errdefs.ToGRPC(s.local.Stop(ctx, req.GetSandboxID()))
} }
func (s *controllerService) Wait(ctx context.Context, req *api.ControllerWaitRequest) (*api.ControllerWaitResponse, error) { func (s *controllerService) Wait(ctx context.Context, req *api.ControllerWaitRequest) (*api.ControllerWaitResponse, error) {
log.G(ctx).WithField("req", req).Debug("wait sandbox") log.G(ctx).WithField("req", req).Debug("wait sandbox")
return s.local.Wait(ctx, req) exitStatus, err := s.local.Wait(ctx, req.GetSandboxID())
if err != nil {
return &api.ControllerWaitResponse{}, errdefs.ToGRPC(err)
}
return &api.ControllerWaitResponse{
ExitStatus: exitStatus.ExitStatus,
ExitedAt: protobuf.ToTimestamp(exitStatus.ExitedAt),
}, nil
} }
func (s *controllerService) Status(ctx context.Context, req *api.ControllerStatusRequest) (*api.ControllerStatusResponse, error) { func (s *controllerService) Status(ctx context.Context, req *api.ControllerStatusRequest) (*api.ControllerStatusResponse, error) {
log.G(ctx).WithField("req", req).Debug("sandbox status") log.G(ctx).WithField("req", req).Debug("sandbox status")
return s.local.Status(ctx, req) cstatus, err := s.local.Status(ctx, req.GetSandboxID(), req.GetVerbose())
if err != nil {
return &api.ControllerStatusResponse{}, errdefs.ToGRPC(err)
}
return &api.ControllerStatusResponse{
SandboxID: cstatus.SandboxID,
Pid: cstatus.Pid,
State: cstatus.State,
Info: cstatus.Info,
CreatedAt: protobuf.ToTimestamp(cstatus.CreatedAt),
ExitedAt: protobuf.ToTimestamp(cstatus.ExitedAt),
Extra: &anypb.Any{
TypeUrl: cstatus.Extra.GetTypeUrl(),
Value: cstatus.Extra.GetValue(),
},
}, nil
} }
func (s *controllerService) Shutdown(ctx context.Context, req *api.ControllerShutdownRequest) (*api.ControllerShutdownResponse, error) { func (s *controllerService) Shutdown(ctx context.Context, req *api.ControllerShutdownRequest) (*api.ControllerShutdownResponse, error) {
log.G(ctx).WithField("req", req).Debug("shutdown sandbox") log.G(ctx).WithField("req", req).Debug("shutdown sandbox")
return s.local.Shutdown(ctx, req) return &api.ControllerShutdownResponse{}, errdefs.ToGRPC(s.local.Shutdown(ctx, req.GetSandboxID()))
} }

View File

@ -33,10 +33,10 @@ const (
DiffService = "diff-service" DiffService = "diff-service"
// IntrospectionService is the id of introspection service // IntrospectionService is the id of introspection service
IntrospectionService = "introspection-service" IntrospectionService = "introspection-service"
// SandboxStoreService is the id of Sandbox's store service
SandboxStoreService = "sandbox-store-service"
// SandboxControllerService is the id of Sandbox's controller service // SandboxControllerService is the id of Sandbox's controller service
SandboxControllerService = "sandbox-controller-service" SandboxControllerService = "sandbox-controller-service"
// SandboxStoreService is the id of Sandbox's store service
SandboxStoreService = "sandbox-store-service"
// Streaming service is the id of the streaming service // Streaming service is the id of the streaming service
StreamingService = "streaming-service" StreamingService = "streaming-service"
) )