Allow explicit configuration of TTRPC address
Previously the TTRPC address was generated as "<GRPC address>.ttrpc". This change now allows explicit configuration of the TTRPC address, with the default still being the old format if no value is specified. As part of this change, a new configuration section is added for TTRPC listener options. Signed-off-by: Kevin Parsons <kevpar@microsoft.com>
This commit is contained in:
parent
bd46ea5191
commit
d7e1b25384
@ -148,13 +148,16 @@ func App() *cli.App {
|
||||
for _, w := range warnings {
|
||||
log.G(ctx).WithError(w).Warn("cleanup temp mount")
|
||||
}
|
||||
var (
|
||||
address = config.GRPC.Address
|
||||
ttrpcAddress = fmt.Sprintf("%s.ttrpc", config.GRPC.Address)
|
||||
)
|
||||
if address == "" {
|
||||
|
||||
if config.GRPC.Address == "" {
|
||||
return errors.Wrap(errdefs.ErrInvalidArgument, "grpc address cannot be empty")
|
||||
}
|
||||
if config.TTRPC.Address == "" {
|
||||
// If TTRPC was not explicitly configured, use defaults based on GRPC.
|
||||
config.TTRPC.Address = fmt.Sprintf("%s.ttrpc", config.GRPC.Address)
|
||||
config.TTRPC.UID = config.GRPC.UID
|
||||
config.TTRPC.GID = config.GRPC.GID
|
||||
}
|
||||
log.G(ctx).WithFields(logrus.Fields{
|
||||
"version": version.Version,
|
||||
"revision": version.Revision,
|
||||
@ -193,7 +196,7 @@ func App() *cli.App {
|
||||
serve(ctx, l, server.ServeMetrics)
|
||||
}
|
||||
// setup the ttrpc endpoint
|
||||
tl, err := sys.GetLocalListener(ttrpcAddress, config.GRPC.UID, config.GRPC.GID)
|
||||
tl, err := sys.GetLocalListener(config.TTRPC.Address, config.TTRPC.UID, config.TTRPC.GID)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to get listener for main ttrpc endpoint")
|
||||
}
|
||||
@ -207,7 +210,7 @@ func App() *cli.App {
|
||||
serve(ctx, l, server.ServeTCP)
|
||||
}
|
||||
// setup the main grpc endpoint
|
||||
l, err := sys.GetLocalListener(address, config.GRPC.UID, config.GRPC.GID)
|
||||
l, err := sys.GetLocalListener(config.GRPC.Address, config.GRPC.UID, config.GRPC.GID)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to get listener for main endpoint")
|
||||
}
|
||||
|
@ -28,12 +28,13 @@ import (
|
||||
|
||||
// InitContext is used for plugin inititalization
|
||||
type InitContext struct {
|
||||
Context context.Context
|
||||
Root string
|
||||
State string
|
||||
Config interface{}
|
||||
Address string
|
||||
Events *exchange.Exchange
|
||||
Context context.Context
|
||||
Root string
|
||||
State string
|
||||
Config interface{}
|
||||
Address string
|
||||
TTRPCAddress string
|
||||
Events *exchange.Exchange
|
||||
|
||||
Meta *Meta // plugins can fill in metadata at init.
|
||||
|
||||
|
@ -35,22 +35,24 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func shimBinary(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, events *exchange.Exchange, rt *runtime.TaskList) *binary {
|
||||
func shimBinary(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, containerdTTRPCAddress string, events *exchange.Exchange, rt *runtime.TaskList) *binary {
|
||||
return &binary{
|
||||
bundle: bundle,
|
||||
runtime: runtime,
|
||||
containerdAddress: containerdAddress,
|
||||
events: events,
|
||||
rtTasks: rt,
|
||||
bundle: bundle,
|
||||
runtime: runtime,
|
||||
containerdAddress: containerdAddress,
|
||||
containerdTTRPCAddress: containerdTTRPCAddress,
|
||||
events: events,
|
||||
rtTasks: rt,
|
||||
}
|
||||
}
|
||||
|
||||
type binary struct {
|
||||
runtime string
|
||||
containerdAddress string
|
||||
bundle *Bundle
|
||||
events *exchange.Exchange
|
||||
rtTasks *runtime.TaskList
|
||||
runtime string
|
||||
containerdAddress string
|
||||
containerdTTRPCAddress string
|
||||
bundle *Bundle
|
||||
events *exchange.Exchange
|
||||
rtTasks *runtime.TaskList
|
||||
}
|
||||
|
||||
func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) {
|
||||
@ -64,6 +66,7 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
|
||||
ctx,
|
||||
b.runtime,
|
||||
b.containerdAddress,
|
||||
b.containerdTTRPCAddress,
|
||||
b.bundle.Path,
|
||||
opts,
|
||||
args...,
|
||||
@ -124,6 +127,7 @@ func (b *binary) Delete(ctx context.Context) (*runtime.Exit, error) {
|
||||
cmd, err := client.Command(ctx,
|
||||
b.runtime,
|
||||
b.containerdAddress,
|
||||
b.containerdTTRPCAddress,
|
||||
bundlePath,
|
||||
nil,
|
||||
"-id", b.bundle.ID,
|
||||
|
@ -44,7 +44,7 @@ type service struct {
|
||||
}
|
||||
|
||||
// StartShim is a binary call that executes a new shim returning the address
|
||||
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) {
|
||||
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
|
@ -69,25 +69,26 @@ func init() {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return New(ic.Context, ic.Root, ic.State, ic.Address, ic.Events, m.(*metadata.DB))
|
||||
return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, ic.Events, m.(*metadata.DB))
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// New task manager for v2 shims
|
||||
func New(ctx context.Context, root, state, containerdAddress string, events *exchange.Exchange, db *metadata.DB) (*TaskManager, error) {
|
||||
func New(ctx context.Context, root, state, containerdAddress, containerdTTRPCAddress string, events *exchange.Exchange, db *metadata.DB) (*TaskManager, error) {
|
||||
for _, d := range []string{root, state} {
|
||||
if err := os.MkdirAll(d, 0711); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
m := &TaskManager{
|
||||
root: root,
|
||||
state: state,
|
||||
containerdAddress: containerdAddress,
|
||||
tasks: runtime.NewTaskList(),
|
||||
events: events,
|
||||
db: db,
|
||||
root: root,
|
||||
state: state,
|
||||
containerdAddress: containerdAddress,
|
||||
containerdTTRPCAddress: containerdTTRPCAddress,
|
||||
tasks: runtime.NewTaskList(),
|
||||
events: events,
|
||||
db: db,
|
||||
}
|
||||
if err := m.loadExistingTasks(ctx); err != nil {
|
||||
return nil, err
|
||||
@ -97,9 +98,10 @@ func New(ctx context.Context, root, state, containerdAddress string, events *exc
|
||||
|
||||
// TaskManager manages v2 shim's and their tasks
|
||||
type TaskManager struct {
|
||||
root string
|
||||
state string
|
||||
containerdAddress string
|
||||
root string
|
||||
state string
|
||||
containerdAddress string
|
||||
containerdTTRPCAddress string
|
||||
|
||||
tasks *runtime.TaskList
|
||||
events *exchange.Exchange
|
||||
@ -131,7 +133,7 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create
|
||||
topts = opts.RuntimeOptions
|
||||
}
|
||||
|
||||
b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.events, m.tasks)
|
||||
b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
|
||||
shim, err := b.Start(ctx, topts, func() {
|
||||
log.G(ctx).WithField("id", id).Info("shim disconnected")
|
||||
_, err := m.tasks.Get(ctx, id)
|
||||
@ -254,7 +256,7 @@ func (m *TaskManager) loadTasks(ctx context.Context) error {
|
||||
bundle.Delete()
|
||||
continue
|
||||
}
|
||||
binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.events, m.tasks)
|
||||
binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
|
||||
shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() {
|
||||
log.G(ctx).WithField("id", id).Info("shim disconnected")
|
||||
_, err := m.tasks.Get(ctx, id)
|
||||
|
@ -102,7 +102,7 @@ type service struct {
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func newCommand(ctx context.Context, id, containerdBinary, containerdAddress string) (*exec.Cmd, error) {
|
||||
func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) {
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -119,6 +119,7 @@ func newCommand(ctx context.Context, id, containerdBinary, containerdAddress str
|
||||
"-namespace", ns,
|
||||
"-id", id,
|
||||
"-address", containerdAddress,
|
||||
"-ttrpc-address", containerdTTRPCAddress,
|
||||
}
|
||||
cmd := exec.Command(self, args...)
|
||||
cmd.Dir = cwd
|
||||
@ -129,8 +130,8 @@ func newCommand(ctx context.Context, id, containerdBinary, containerdAddress str
|
||||
return cmd, nil
|
||||
}
|
||||
|
||||
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) {
|
||||
cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress)
|
||||
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) {
|
||||
cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress, containerdTTRPCAddress)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -118,7 +118,7 @@ type service struct {
|
||||
cancel func()
|
||||
}
|
||||
|
||||
func newCommand(ctx context.Context, id, containerdBinary, containerdAddress string) (*exec.Cmd, error) {
|
||||
func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) {
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -135,6 +135,7 @@ func newCommand(ctx context.Context, id, containerdBinary, containerdAddress str
|
||||
"-namespace", ns,
|
||||
"-id", id,
|
||||
"-address", containerdAddress,
|
||||
"-ttrpc-address", containerdTTRPCAddress,
|
||||
}
|
||||
cmd := exec.Command(self, args...)
|
||||
cmd.Dir = cwd
|
||||
@ -158,8 +159,8 @@ func readSpec() (*spec, error) {
|
||||
return &s, nil
|
||||
}
|
||||
|
||||
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) {
|
||||
cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress)
|
||||
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) {
|
||||
cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress, containerdTTRPCAddress)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -57,7 +57,7 @@ type Init func(context.Context, string, Publisher, func()) (Shim, error)
|
||||
type Shim interface {
|
||||
shimapi.TaskService
|
||||
Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error)
|
||||
StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error)
|
||||
StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error)
|
||||
}
|
||||
|
||||
// OptsKey is the context key for the Opts value.
|
||||
@ -89,6 +89,7 @@ var (
|
||||
socketFlag string
|
||||
bundlePath string
|
||||
addressFlag string
|
||||
ttrpcAddressFlag string
|
||||
containerdBinaryFlag string
|
||||
action string
|
||||
)
|
||||
@ -101,6 +102,7 @@ func parseFlags() {
|
||||
flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir")
|
||||
|
||||
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
|
||||
flag.StringVar(&ttrpcAddressFlag, "ttrpc-address", "", "ttrpc address back to main containerd")
|
||||
flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)")
|
||||
|
||||
flag.Parse()
|
||||
@ -163,8 +165,7 @@ func run(id string, initFunc Init, config Config) error {
|
||||
}
|
||||
}
|
||||
|
||||
address := fmt.Sprintf("%s.ttrpc", addressFlag)
|
||||
publisher, err := newPublisher(address)
|
||||
publisher, err := newPublisher(ttrpcAddressFlag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -203,7 +204,7 @@ func run(id string, initFunc Init, config Config) error {
|
||||
}
|
||||
return nil
|
||||
case "start":
|
||||
address, err := service.StartShim(ctx, idFlag, containerdBinaryFlag, addressFlag)
|
||||
address, err := service.StartShim(ctx, idFlag, containerdBinaryFlag, addressFlag, ttrpcAddressFlag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ import (
|
||||
var runtimePaths sync.Map
|
||||
|
||||
// Command returns the shim command with the provided args and configuration
|
||||
func Command(ctx context.Context, runtime, containerdAddress, path string, opts *types.Any, cmdArgs ...string) (*exec.Cmd, error) {
|
||||
func Command(ctx context.Context, runtime, containerdAddress, containerdTTRPCAddress, path string, opts *types.Any, cmdArgs ...string) (*exec.Cmd, error) {
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -50,6 +50,7 @@ func Command(ctx context.Context, runtime, containerdAddress, path string, opts
|
||||
args := []string{
|
||||
"-namespace", ns,
|
||||
"-address", containerdAddress,
|
||||
"-ttrpc-address", containerdTTRPCAddress,
|
||||
"-publish-binary", self,
|
||||
}
|
||||
args = append(args, cmdArgs...)
|
||||
|
@ -37,6 +37,8 @@ type Config struct {
|
||||
PluginDir string `toml:"plugin_dir"`
|
||||
// GRPC configuration settings
|
||||
GRPC GRPCConfig `toml:"grpc"`
|
||||
// TTRPC configuration settings
|
||||
TTRPC TTRPCConfig `toml:"ttrpc"`
|
||||
// Debug and profiling settings
|
||||
Debug Debug `toml:"debug"`
|
||||
// Metrics and monitoring settings
|
||||
@ -125,6 +127,13 @@ type GRPCConfig struct {
|
||||
MaxSendMsgSize int `toml:"max_send_message_size"`
|
||||
}
|
||||
|
||||
// TTRPCConfig provides TTRPC configuration for the socket
|
||||
type TTRPCConfig struct {
|
||||
Address string `toml:"address"`
|
||||
UID int `toml:"uid"`
|
||||
GID int `toml:"gid"`
|
||||
}
|
||||
|
||||
// Debug provides debug configuration
|
||||
type Debug struct {
|
||||
Address string `toml:"address"`
|
||||
|
@ -154,6 +154,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
|
||||
)
|
||||
initContext.Events = s.events
|
||||
initContext.Address = config.GRPC.Address
|
||||
initContext.TTRPCAddress = config.TTRPC.Address
|
||||
|
||||
// load the plugin specific configuration if it is provided
|
||||
if p.Config != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user