Add version to shim protocol
Document environment variables and test shim start response parsing. Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
parent
a18709442b
commit
dba6f9db18
@ -42,17 +42,26 @@ This command will launch new shims.
|
||||
The start command MUST accept the following flags:
|
||||
|
||||
* `-namespace` the namespace for the container
|
||||
* `-address` the address of the containerd's main socket
|
||||
* `-address` the address of the containerd's main grpc socket
|
||||
* `-publish-binary` the binary path to publish events back to containerd
|
||||
* `-id` the id of the container
|
||||
|
||||
The start command, as well as all binary calls to the shim, has the bundle for the container set as the `cwd`.
|
||||
|
||||
The start command may have the following containerd specific environment variables set:
|
||||
|
||||
* `TTRPC_ADDRESS` the address of containerd's ttrpc API socket
|
||||
* `GRPC_ADDRESS` the address of containerd's grpc API socket (1.7+)
|
||||
* `MAX_SHIM_VERSION` the maximum shim version supported by the client, always `2` for shim v2 (1.7+)
|
||||
* `SCHED_CORE` enable core scheduling if available (1.6+)
|
||||
* `NAMESPACE` an optional namespace the shim is operating in or inheriting (1.7+)
|
||||
|
||||
The start command MUST write to stdout either the ttrpc address that the shim is serving its API on, or _(experimental)_
|
||||
a JSON structure in the following format (where protocol can be either "ttrpc" or "grpc"):
|
||||
|
||||
```json
|
||||
{
|
||||
"version": 2,
|
||||
"address": "/address/of/task/service",
|
||||
"protocol": "grpc"
|
||||
}
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
gruntime "runtime"
|
||||
"strings"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
@ -120,7 +119,7 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%s: %w", out, err)
|
||||
}
|
||||
address := strings.TrimSpace(string(out))
|
||||
response := bytes.TrimSpace(out)
|
||||
|
||||
onCloseWithShimLog := func() {
|
||||
onClose()
|
||||
@ -132,7 +131,12 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn, err := makeConnection(ctx, address, onCloseWithShimLog)
|
||||
params, err := parseStartResponse(ctx, response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn, err := makeConnection(ctx, params, onCloseWithShimLog)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -60,12 +60,12 @@ func init() {
|
||||
timeout.Set(shutdownTimeout, 3*time.Second)
|
||||
}
|
||||
|
||||
func loadAddress(path string) (string, error) {
|
||||
func loadAddress(path string) ([]byte, error) {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
return string(data), nil
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstance, retErr error) {
|
||||
@ -109,7 +109,12 @@ func loadShim(ctx context.Context, bundle *Bundle, onClose func()) (_ ShimInstan
|
||||
f.Close()
|
||||
}
|
||||
|
||||
conn, err := makeConnection(ctx, address, onCloseWithShimLog)
|
||||
params, err := parseStartResponse(ctx, address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn, err := makeConnection(ctx, params, onCloseWithShimLog)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -203,24 +208,25 @@ type ShimInstance interface {
|
||||
Delete(ctx context.Context) error
|
||||
}
|
||||
|
||||
// makeConnection creates a new TTRPC or GRPC connection object from address.
|
||||
// address can be either a socket path for TTRPC or JSON serialized BootstrapParams.
|
||||
func makeConnection(ctx context.Context, address string, onClose func()) (_ io.Closer, retErr error) {
|
||||
var (
|
||||
payload = []byte(address)
|
||||
params client.BootstrapParams
|
||||
)
|
||||
func parseStartResponse(ctx context.Context, response []byte) (client.BootstrapParams, error) {
|
||||
var params client.BootstrapParams
|
||||
|
||||
if json.Valid(payload) {
|
||||
if err := json.Unmarshal([]byte(address), ¶ms); err != nil {
|
||||
return nil, fmt.Errorf("unable to unmarshal bootstrap params: %w", err)
|
||||
}
|
||||
} else {
|
||||
if err := json.Unmarshal(response, ¶ms); err != nil || params.Version < 2 {
|
||||
// Use TTRPC for legacy shims
|
||||
params.Address = address
|
||||
params.Address = string(response)
|
||||
params.Protocol = "ttrpc"
|
||||
}
|
||||
|
||||
if params.Version > 2 {
|
||||
return client.BootstrapParams{}, fmt.Errorf("unsupported shim version (%d): %w", params.Version, errdefs.ErrNotImplemented)
|
||||
}
|
||||
|
||||
return params, nil
|
||||
}
|
||||
|
||||
// makeConnection creates a new TTRPC or GRPC connection object from address.
|
||||
// address can be either a socket path for TTRPC or JSON serialized BootstrapParams.
|
||||
func makeConnection(ctx context.Context, params client.BootstrapParams, onClose func()) (_ io.Closer, retErr error) {
|
||||
log.G(ctx).WithFields(log.Fields{
|
||||
"address": params.Address,
|
||||
"protocol": params.Protocol,
|
||||
|
@ -59,12 +59,12 @@ type StartOpts struct {
|
||||
|
||||
// BootstrapParams is a JSON payload returned in stdout from shim.Start call.
|
||||
type BootstrapParams struct {
|
||||
// Version is the version of shim parameters (expected 2 for shim v2)
|
||||
Version int `json:"version"`
|
||||
// Address is a address containerd should use to connect to shim.
|
||||
Address string `json:"address"`
|
||||
// Protocol is either TTRPC or GRPC.
|
||||
Protocol string `json:"protocol"`
|
||||
// Caps is a list of capabilities supported by shim implementation (reserved for future)
|
||||
//Caps []string `json:"caps"`
|
||||
}
|
||||
|
||||
type StopStatus struct {
|
||||
@ -147,6 +147,8 @@ var (
|
||||
|
||||
const (
|
||||
ttrpcAddressEnv = "TTRPC_ADDRESS"
|
||||
grpcAddressEnv = "GRPC_ADDRESS"
|
||||
namespaceEnv = "NAMESPACE"
|
||||
)
|
||||
|
||||
func parseFlags() {
|
||||
|
@ -69,6 +69,8 @@ func Command(ctx context.Context, config *CommandConfig) (*exec.Cmd, error) {
|
||||
os.Environ(),
|
||||
"GOMAXPROCS=2",
|
||||
fmt.Sprintf("%s=%s", ttrpcAddressEnv, config.TTRPCAddress),
|
||||
fmt.Sprintf("%s=%s", grpcAddressEnv, config.Address),
|
||||
fmt.Sprintf("%s=%s", namespaceEnv, ns),
|
||||
)
|
||||
if config.SchedCore {
|
||||
cmd.Env = append(cmd.Env, "SCHED_CORE=1")
|
||||
|
100
runtime/v2/shim_test.go
Normal file
100
runtime/v2/shim_test.go
Normal file
@ -0,0 +1,100 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package v2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
client "github.com/containerd/containerd/runtime/v2/shim"
|
||||
)
|
||||
|
||||
func TestParseStartResponse(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
Name string
|
||||
Response string
|
||||
Expected client.BootstrapParams
|
||||
Err error
|
||||
}{
|
||||
{
|
||||
Name: "v2 shim",
|
||||
Response: "/somedirectory/somesocket",
|
||||
Expected: client.BootstrapParams{
|
||||
Version: 0,
|
||||
Address: "/somedirectory/somesocket",
|
||||
Protocol: "ttrpc",
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "v2 shim using grpc",
|
||||
Response: `{"version":2,"address":"/somedirectory/somesocket","protocol":"grpc"}`,
|
||||
Expected: client.BootstrapParams{
|
||||
Version: 2,
|
||||
Address: "/somedirectory/somesocket",
|
||||
Protocol: "grpc",
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "v2 shim using ttrpc",
|
||||
Response: `{"version":2,"address":"/somedirectory/somesocket","protocol":"ttrpc"}`,
|
||||
Expected: client.BootstrapParams{
|
||||
Version: 2,
|
||||
Address: "/somedirectory/somesocket",
|
||||
Protocol: "ttrpc",
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "invalid shim v2 response",
|
||||
Response: `{"address":"/somedirectory/somesocket","protocol":"ttrpc"}`,
|
||||
Expected: client.BootstrapParams{
|
||||
Version: 0,
|
||||
Address: `{"address":"/somedirectory/somesocket","protocol":"ttrpc"}`,
|
||||
Protocol: "ttrpc",
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "later unsupported shim",
|
||||
Response: `{"Version": 3,"Address":"/somedirectory/somesocket","Protocol":"ttrpc"}`,
|
||||
Expected: client.BootstrapParams{},
|
||||
Err: errdefs.ErrNotImplemented,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
params, err := parseStartResponse(context.Background(), []byte(tc.Response))
|
||||
if err != nil {
|
||||
if !errors.Is(err, tc.Err) {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
return
|
||||
} else if tc.Err != nil {
|
||||
t.Fatal("expected error")
|
||||
}
|
||||
if params.Version != tc.Expected.Version {
|
||||
t.Errorf("unexpected version %d, expected %d", params.Version, tc.Expected.Version)
|
||||
}
|
||||
if params.Protocol != tc.Expected.Protocol {
|
||||
t.Errorf("unexpected protocol %q, expected %q", params.Protocol, tc.Expected.Protocol)
|
||||
}
|
||||
if params.Address != tc.Expected.Address {
|
||||
t.Errorf("unexpected address %q, expected %q", params.Address, tc.Expected.Address)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user