Save bootstrap.json instead of address file
Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
parent
e03bf32b86
commit
8061cb0237
@ -23,7 +23,7 @@ import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
taskapi "github.com/containerd/containerd/api/runtime/task/v2"
|
||||
taskapi "github.com/containerd/containerd/api/runtime/task/v3"
|
||||
"github.com/containerd/containerd/oci"
|
||||
"github.com/containerd/containerd/pkg/failpoint"
|
||||
"github.com/containerd/containerd/pkg/shutdown"
|
||||
@ -79,11 +79,11 @@ var (
|
||||
|
||||
type taskServiceWithFp struct {
|
||||
fps map[string]*failpoint.Failpoint
|
||||
local taskapi.TaskService
|
||||
local taskapi.TTRPCTaskService
|
||||
}
|
||||
|
||||
func (s *taskServiceWithFp) RegisterTTRPC(server *ttrpc.Server) error {
|
||||
taskapi.RegisterTaskService(server, s.local)
|
||||
taskapi.RegisterTTRPCTaskService(server, s.local)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -129,7 +129,7 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
|
||||
return nil, err
|
||||
}
|
||||
|
||||
params, err := parseStartResponse(ctx, response)
|
||||
params, err := parseStartResponse(response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -139,6 +139,11 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Save bootstrap configuration (so containerd can restore shims after restart).
|
||||
if err := writeBootstrapParams(filepath.Join(b.bundle.Path, "bootstrap.json"), params); err != nil {
|
||||
return nil, fmt.Errorf("failed to write bootstrap.json: %w", err)
|
||||
}
|
||||
|
||||
return &shim{
|
||||
bundle: b.bundle,
|
||||
client: conn,
|
||||
|
@ -205,14 +205,13 @@ func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateO
|
||||
return nil, err
|
||||
}
|
||||
|
||||
address, err := shimbinary.ReadAddress(filepath.Join(m.state, process.Namespace(), opts.SandboxID, "address"))
|
||||
params, err := restoreBootstrapParams(filepath.Join(m.state, process.Namespace(), opts.SandboxID, "bootstrap.json"))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get socket address for sandbox %q: %w", opts.SandboxID, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Use sandbox's socket address to handle task requests for this container.
|
||||
if err := shimbinary.WriteAddress(filepath.Join(bundle.Path, "address"), address); err != nil {
|
||||
return nil, err
|
||||
if err := writeBootstrapParams(filepath.Join(bundle.Path, "bootstrap.json"), params); err != nil {
|
||||
return nil, fmt.Errorf("failed to write bootstrap.json for bundle %s: %w", bundle.Path, err)
|
||||
}
|
||||
|
||||
shim, err := loadShim(ctx, bundle, func() {})
|
||||
@ -284,6 +283,40 @@ func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string,
|
||||
return shim, nil
|
||||
}
|
||||
|
||||
// restoreBootstrapParams reads bootstrap.json to restore shim configuration.
|
||||
// If its an old shim, this will perform migration - read address file and write default bootstrap
|
||||
// configuration (version = 2, protocol = ttrpc, and address).
|
||||
func restoreBootstrapParams(bundlePath string) (shimbinary.BootstrapParams, error) {
|
||||
filePath := filepath.Join(bundlePath, "bootstrap.json")
|
||||
params, err := readBootstrapParams(filePath)
|
||||
if err == nil {
|
||||
return params, nil
|
||||
}
|
||||
|
||||
if !os.IsNotExist(err) {
|
||||
return shimbinary.BootstrapParams{}, fmt.Errorf("failed to read bootstrap.json for bundle %s: %w", bundlePath, err)
|
||||
}
|
||||
|
||||
// File not found, likely its an older shim. Try migrate.
|
||||
|
||||
address, err := shimbinary.ReadAddress(filepath.Join(bundlePath, "address"))
|
||||
if err != nil {
|
||||
return shimbinary.BootstrapParams{}, fmt.Errorf("unable to migrate shim: failed to get socket address for bundle %s: %w", bundlePath, err)
|
||||
}
|
||||
|
||||
params = shimbinary.BootstrapParams{
|
||||
Version: 2,
|
||||
Address: address,
|
||||
Protocol: "ttrpc",
|
||||
}
|
||||
|
||||
if err := writeBootstrapParams(filePath, params); err != nil {
|
||||
return shimbinary.BootstrapParams{}, fmt.Errorf("unable to migrate: failed to write bootstrap.json file: %w", err)
|
||||
}
|
||||
|
||||
return params, nil
|
||||
}
|
||||
|
||||
func (m *ShimManager) resolveRuntimePath(runtime string) (string, error) {
|
||||
if runtime == "" {
|
||||
return "", fmt.Errorf("no runtime name")
|
||||
|
@ -153,9 +153,6 @@ func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ shi
|
||||
return params, fmt.Errorf("create new shim socket: %w", err)
|
||||
}
|
||||
if shim.CanConnect(address) {
|
||||
if err := shim.WriteAddress("address", address); err != nil {
|
||||
return params, fmt.Errorf("write existing socket for shim: %w", err)
|
||||
}
|
||||
params.Address = address
|
||||
return params, nil
|
||||
}
|
||||
@ -173,11 +170,6 @@ func (manager) Start(ctx context.Context, id string, opts shim.StartOpts) (_ shi
|
||||
}
|
||||
}()
|
||||
|
||||
// make sure that reexec shim-v2 binary use the value if need
|
||||
if err := shim.WriteAddress("address", address); err != nil {
|
||||
return params, err
|
||||
}
|
||||
|
||||
f, err := socket.File()
|
||||
if err != nil {
|
||||
return params, err
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/pkg/atomicfile"
|
||||
"github.com/containerd/ttrpc"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
@ -59,20 +60,7 @@ func init() {
|
||||
timeout.Set(shutdownTimeout, 3*time.Second)
|
||||
}
|
||||
|
||||
func loadAddress(path string) ([]byte, error) {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstance, retErr error) {
|
||||
address, err := loadAddress(filepath.Join(bundle.Path, "address"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
shimCtx, cancelShimLog := context.WithCancel(ctx)
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
@ -108,9 +96,9 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan
|
||||
f.Close()
|
||||
}
|
||||
|
||||
params, err := parseStartResponse(ctx, address)
|
||||
params, err := restoreBootstrapParams(filepath.Join(bundle.Path, "bootstrap.json"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to read boostrap.json when restoring bundle %q: %w", bundle.ID, err)
|
||||
}
|
||||
|
||||
conn, err := makeConnection(ctx, params, onCloseWithShimLog)
|
||||
@ -127,6 +115,7 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan
|
||||
shim := &shim{
|
||||
bundle: bundle,
|
||||
client: conn,
|
||||
version: params.Version,
|
||||
}
|
||||
|
||||
return shim, nil
|
||||
@ -199,7 +188,7 @@ type ShimInstance interface {
|
||||
Version() int
|
||||
}
|
||||
|
||||
func parseStartResponse(ctx context.Context, response []byte) (client.BootstrapParams, error) {
|
||||
func parseStartResponse(response []byte) (client.BootstrapParams, error) {
|
||||
var params client.BootstrapParams
|
||||
|
||||
if err := json.Unmarshal(response, ¶ms); err != nil || params.Version < 2 {
|
||||
@ -216,6 +205,45 @@ func parseStartResponse(ctx context.Context, response []byte) (client.BootstrapP
|
||||
return params, nil
|
||||
}
|
||||
|
||||
// writeBootstrapParams writes shim's bootstrap configuration (e.g. how to connect, version, etc).
|
||||
func writeBootstrapParams(path string, params client.BootstrapParams) error {
|
||||
path, err := filepath.Abs(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
f, err := atomicfile.New(path, 0o666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := json.Marshal(¶ms)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = f.Write(data)
|
||||
if err != nil {
|
||||
f.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
return f.Close()
|
||||
}
|
||||
|
||||
func readBootstrapParams(path string) (client.BootstrapParams, error) {
|
||||
path, err := filepath.Abs(path)
|
||||
if err != nil {
|
||||
return client.BootstrapParams{}, err
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return client.BootstrapParams{}, err
|
||||
}
|
||||
|
||||
return parseStartResponse(data)
|
||||
}
|
||||
|
||||
// makeConnection creates a new TTRPC or GRPC connection object from address.
|
||||
// address can be either a socket path for TTRPC or JSON serialized BootstrapParams.
|
||||
func makeConnection(ctx context.Context, params client.BootstrapParams, onClose func()) (_ io.Closer, retErr error) {
|
||||
|
@ -138,24 +138,6 @@ func WritePidFile(path string, pid int) error {
|
||||
return f.Close()
|
||||
}
|
||||
|
||||
// WriteAddress writes a address file atomically
|
||||
func WriteAddress(path, address string) error {
|
||||
path, err := filepath.Abs(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
f, err := atomicfile.New(path, 0o666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = f.Write([]byte(address))
|
||||
if err != nil {
|
||||
f.Cancel()
|
||||
return err
|
||||
}
|
||||
return f.Close()
|
||||
}
|
||||
|
||||
// ErrNoAddress is returned when the address file has no content
|
||||
var ErrNoAddress = errors.New("no shim address")
|
||||
|
||||
|
@ -17,12 +17,14 @@
|
||||
package v2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
client "github.com/containerd/containerd/runtime/v2/shim"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestParseStartResponse(t *testing.T) {
|
||||
@ -36,7 +38,7 @@ func TestParseStartResponse(t *testing.T) {
|
||||
Name: "v2 shim",
|
||||
Response: "/somedirectory/somesocket",
|
||||
Expected: client.BootstrapParams{
|
||||
Version: 0,
|
||||
Version: 2,
|
||||
Address: "/somedirectory/somesocket",
|
||||
Protocol: "ttrpc",
|
||||
},
|
||||
@ -63,20 +65,20 @@ func TestParseStartResponse(t *testing.T) {
|
||||
Name: "invalid shim v2 response",
|
||||
Response: `{"address":"/somedirectory/somesocket","protocol":"ttrpc"}`,
|
||||
Expected: client.BootstrapParams{
|
||||
Version: 0,
|
||||
Version: 2,
|
||||
Address: `{"address":"/somedirectory/somesocket","protocol":"ttrpc"}`,
|
||||
Protocol: "ttrpc",
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "later unsupported shim",
|
||||
Response: `{"Version": 3,"Address":"/somedirectory/somesocket","Protocol":"ttrpc"}`,
|
||||
Response: `{"Version": 4,"Address":"/somedirectory/somesocket","Protocol":"ttrpc"}`,
|
||||
Expected: client.BootstrapParams{},
|
||||
Err: errdefs.ErrNotImplemented,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
params, err := parseStartResponse(context.Background(), []byte(tc.Response))
|
||||
params, err := parseStartResponse([]byte(tc.Response))
|
||||
if err != nil {
|
||||
if !errors.Is(err, tc.Err) {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
@ -96,5 +98,27 @@ func TestParseStartResponse(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestRestoreBootstrapParams(t *testing.T) {
|
||||
bundlePath := t.TempDir()
|
||||
|
||||
err := os.WriteFile(filepath.Join(bundlePath, "address"), []byte("unix://123"), 0o666)
|
||||
require.NoError(t, err)
|
||||
|
||||
restored, err := restoreBootstrapParams(bundlePath)
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := client.BootstrapParams{
|
||||
Version: 2,
|
||||
Address: "unix://123",
|
||||
Protocol: "ttrpc",
|
||||
}
|
||||
|
||||
require.EqualValues(t, expected, restored)
|
||||
|
||||
loaded, err := readBootstrapParams(filepath.Join(bundlePath, "bootstrap.json"))
|
||||
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, expected, loaded)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user