add task api endpoint in task create options

Signed-off-by: Abel Feng <fshb1988@gmail.com>
This commit is contained in:
Abel Feng 2024-03-27 11:21:12 +08:00
parent 72fe47b2a2
commit c3b306240e
5 changed files with 78 additions and 24 deletions

View File

@ -50,6 +50,24 @@ func WithRuntimePath(absRuntimePath string) NewTaskOpts {
} }
} }
// WithTaskAPIEndpoint allow task service to manage a task through a given endpoint,
// usually it is served inside a sandbox, and we can get it from sandbox status.
func WithTaskAPIEndpoint(address, protocol string, version uint32) NewTaskOpts {
return func(ctx context.Context, client *Client, info *TaskInfo) error {
if info.Options == nil {
info.Options = &options.Options{}
}
opts, ok := info.Options.(*options.Options)
if !ok {
return errors.New("invalid runtime v2 options format")
}
opts.TaskApiAddress = address
opts.TaskApiProtocol = protocol
opts.TaskApiVersion = version
return nil
}
}
// WithTaskCheckpoint allows a task to be created with live runtime and memory data from a // WithTaskCheckpoint allows a task to be created with live runtime and memory data from a
// previous checkpoint. Additional software such as CRIU may be required to // previous checkpoint. Additional software such as CRIU may be required to
// restore a task from a checkpoint // restore a task from a checkpoint

View File

@ -51,6 +51,12 @@ type CreateOpts struct {
Runtime string Runtime string
// SandboxID is an optional ID of sandbox this container belongs to // SandboxID is an optional ID of sandbox this container belongs to
SandboxID string SandboxID string
// Address is an optional Address for Task API server
Address string
// Protocol is an optional Protocol for Task API connection
Protocol string
// Version is an optional Version of the Task API
Version uint32
} }
// Exit information for a process // Exit information for a process

View File

@ -163,9 +163,22 @@ func (m *ShimManager) ID() string {
func (m *ShimManager) Start(ctx context.Context, id string, bundle *Bundle, opts runtime.CreateOpts) (_ ShimInstance, retErr error) { func (m *ShimManager) Start(ctx context.Context, id string, bundle *Bundle, opts runtime.CreateOpts) (_ ShimInstance, retErr error) {
// This container belongs to sandbox which supposed to be already started via sandbox API. // This container belongs to sandbox which supposed to be already started via sandbox API.
if opts.SandboxID != "" { if opts.SandboxID != "" {
process, err := m.Get(ctx, opts.SandboxID) var params shimbinary.BootstrapParams
if err != nil { if opts.Address != "" && opts.Protocol != "" {
return nil, fmt.Errorf("can't find sandbox %s", opts.SandboxID) params = shimbinary.BootstrapParams{
Version: int(opts.Version),
Address: opts.Address,
Protocol: opts.Protocol,
}
} else {
// For those sandbox we can not get endpoint,
// fallback to legacy implementation
p, restoreErr := m.restoreBootstrapParams(ctx, opts.SandboxID)
if restoreErr != nil {
return nil, fmt.Errorf("failed to get bootstrap "+
"params of sandbox %s, %v, legacy restore error %v", opts.SandboxID, err, restoreErr)
}
params = p
} }
// Write sandbox ID this task belongs to. // Write sandbox ID this task belongs to.
@ -173,11 +186,6 @@ func (m *ShimManager) Start(ctx context.Context, id string, bundle *Bundle, opts
return nil, err return nil, err
} }
params, err := restoreBootstrapParams(process.Bundle())
if err != nil {
return nil, err
}
if err := writeBootstrapParams(filepath.Join(bundle.Path, "bootstrap.json"), params); err != nil { if err := writeBootstrapParams(filepath.Join(bundle.Path, "bootstrap.json"), params); err != nil {
return nil, fmt.Errorf("failed to write bootstrap.json for bundle %s: %w", bundle.Path, err) return nil, fmt.Errorf("failed to write bootstrap.json for bundle %s: %w", bundle.Path, err)
} }
@ -251,6 +259,18 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string,
return shim, nil return shim, nil
} }
func (m *ShimManager) restoreBootstrapParams(ctx context.Context, sandboxID string) (shimbinary.BootstrapParams, error) {
process, err := m.Get(ctx, sandboxID)
if err != nil {
return shimbinary.BootstrapParams{}, fmt.Errorf("can't find sandbox %s", sandboxID)
}
params, err := restoreBootstrapParams(filepath.Join(m.state, process.Namespace(), sandboxID))
if err != nil {
return shimbinary.BootstrapParams{}, err
}
return params, nil
}
// restoreBootstrapParams reads bootstrap.json to restore shim configuration. // restoreBootstrapParams reads bootstrap.json to restore shim configuration.
// If its an old shim, this will perform migration - read address file and write default bootstrap // If its an old shim, this will perform migration - read address file and write default bootstrap
// configuration (version = 2, protocol = ttrpc, and address). // configuration (version = 2, protocol = ttrpc, and address).

View File

@ -123,7 +123,7 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
endpoint := sandbox.Endpoint endpoint := sandbox.Endpoint
if endpoint.IsValid() { if endpoint.IsValid() {
taskOpts = append(taskOpts, taskOpts = append(taskOpts,
containerd.WithTaskApiEndpoint(endpoint.Address, endpoint.Protocol, endpoint.Version)) containerd.WithTaskAPIEndpoint(endpoint.Address, endpoint.Protocol, endpoint.Version))
} }
task, err := container.NewTask(ctx, ioCreation, taskOpts...) task, err := container.NewTask(ctx, ioCreation, taskOpts...)

View File

@ -155,10 +155,25 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.
if err != nil { if err != nil {
return nil, errdefs.ToGRPC(err) return nil, errdefs.ToGRPC(err)
} }
checkpointPath, err := getRestorePath(container.Runtime.Name, r.Options)
var (
checkpointPath string
taskAPIAddress string
taskAPIProtocol string
taskAPIVersion uint32
)
if r.Options != nil {
taskOptions, err := formatOptions(container.Runtime.Name, r.Options)
if err != nil { if err != nil {
return nil, err return nil, err
} }
checkpointPath = taskOptions.CriuImagePath
taskAPIAddress = taskOptions.TaskApiAddress
taskAPIProtocol = taskOptions.TaskApiProtocol
taskAPIVersion = taskOptions.TaskApiVersion
}
// jump get checkpointPath from checkpoint image // jump get checkpointPath from checkpoint image
if checkpointPath == "" && r.Checkpoint != nil { if checkpointPath == "" && r.Checkpoint != nil {
checkpointPath, err = os.MkdirTemp(os.Getenv("XDG_RUNTIME_DIR"), "ctrd-checkpoint") checkpointPath, err = os.MkdirTemp(os.Getenv("XDG_RUNTIME_DIR"), "ctrd-checkpoint")
@ -196,6 +211,9 @@ func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.
RuntimeOptions: container.Runtime.Options, RuntimeOptions: container.Runtime.Options,
TaskOptions: r.Options, TaskOptions: r.Options,
SandboxID: container.SandboxID, SandboxID: container.SandboxID,
Address: taskAPIAddress,
Protocol: taskAPIProtocol,
Version: taskAPIVersion,
} }
if r.RuntimePath != "" { if r.RuntimePath != "" {
opts.Runtime = r.RuntimePath opts.Runtime = r.RuntimePath
@ -723,22 +741,14 @@ func getCheckpointPath(runtime string, option *ptypes.Any) (string, error) {
return checkpointPath, nil return checkpointPath, nil
} }
// getRestorePath only suitable for runc runtime now func formatOptions(runtime string, option *ptypes.Any) (*options.Options, error) {
func getRestorePath(runtime string, option *ptypes.Any) (string, error) {
if option == nil {
return "", nil
}
var restorePath string
v, err := typeurl.UnmarshalAny(option) v, err := typeurl.UnmarshalAny(option)
if err != nil { if err != nil {
return "", err return nil, err
} }
opts, ok := v.(*options.Options) opts, ok := v.(*options.Options)
if !ok { if !ok {
return "", fmt.Errorf("invalid task create option for %s", runtime) return nil, fmt.Errorf("invalid task create option for %s", runtime)
} }
restorePath = opts.CriuImagePath return opts, nil
return restorePath, nil
} }