Merge pull request #3555 from kevpar/ttrpc-address

Allow explicit configuration of TTRPC address
This commit is contained in:
Michael Crosby 2019-08-22 14:37:56 -04:00 committed by GitHub
commit 1be6ee5396
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 73 additions and 49 deletions

View File

@ -148,13 +148,16 @@ func App() *cli.App {
for _, w := range warnings { for _, w := range warnings {
log.G(ctx).WithError(w).Warn("cleanup temp mount") log.G(ctx).WithError(w).Warn("cleanup temp mount")
} }
var (
address = config.GRPC.Address if config.GRPC.Address == "" {
ttrpcAddress = fmt.Sprintf("%s.ttrpc", config.GRPC.Address)
)
if address == "" {
return errors.Wrap(errdefs.ErrInvalidArgument, "grpc address cannot be empty") return errors.Wrap(errdefs.ErrInvalidArgument, "grpc address cannot be empty")
} }
if config.TTRPC.Address == "" {
// If TTRPC was not explicitly configured, use defaults based on GRPC.
config.TTRPC.Address = fmt.Sprintf("%s.ttrpc", config.GRPC.Address)
config.TTRPC.UID = config.GRPC.UID
config.TTRPC.GID = config.GRPC.GID
}
log.G(ctx).WithFields(logrus.Fields{ log.G(ctx).WithFields(logrus.Fields{
"version": version.Version, "version": version.Version,
"revision": version.Revision, "revision": version.Revision,
@ -193,7 +196,7 @@ func App() *cli.App {
serve(ctx, l, server.ServeMetrics) serve(ctx, l, server.ServeMetrics)
} }
// setup the ttrpc endpoint // setup the ttrpc endpoint
tl, err := sys.GetLocalListener(ttrpcAddress, config.GRPC.UID, config.GRPC.GID) tl, err := sys.GetLocalListener(config.TTRPC.Address, config.TTRPC.UID, config.TTRPC.GID)
if err != nil { if err != nil {
return errors.Wrapf(err, "failed to get listener for main ttrpc endpoint") return errors.Wrapf(err, "failed to get listener for main ttrpc endpoint")
} }
@ -207,7 +210,7 @@ func App() *cli.App {
serve(ctx, l, server.ServeTCP) serve(ctx, l, server.ServeTCP)
} }
// setup the main grpc endpoint // setup the main grpc endpoint
l, err := sys.GetLocalListener(address, config.GRPC.UID, config.GRPC.GID) l, err := sys.GetLocalListener(config.GRPC.Address, config.GRPC.UID, config.GRPC.GID)
if err != nil { if err != nil {
return errors.Wrapf(err, "failed to get listener for main endpoint") return errors.Wrapf(err, "failed to get listener for main endpoint")
} }

View File

@ -28,12 +28,13 @@ import (
// InitContext is used for plugin inititalization // InitContext is used for plugin inititalization
type InitContext struct { type InitContext struct {
Context context.Context Context context.Context
Root string Root string
State string State string
Config interface{} Config interface{}
Address string Address string
Events *exchange.Exchange TTRPCAddress string
Events *exchange.Exchange
Meta *Meta // plugins can fill in metadata at init. Meta *Meta // plugins can fill in metadata at init.

View File

@ -35,22 +35,24 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
func shimBinary(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, events *exchange.Exchange, rt *runtime.TaskList) *binary { func shimBinary(ctx context.Context, bundle *Bundle, runtime, containerdAddress string, containerdTTRPCAddress string, events *exchange.Exchange, rt *runtime.TaskList) *binary {
return &binary{ return &binary{
bundle: bundle, bundle: bundle,
runtime: runtime, runtime: runtime,
containerdAddress: containerdAddress, containerdAddress: containerdAddress,
events: events, containerdTTRPCAddress: containerdTTRPCAddress,
rtTasks: rt, events: events,
rtTasks: rt,
} }
} }
type binary struct { type binary struct {
runtime string runtime string
containerdAddress string containerdAddress string
bundle *Bundle containerdTTRPCAddress string
events *exchange.Exchange bundle *Bundle
rtTasks *runtime.TaskList events *exchange.Exchange
rtTasks *runtime.TaskList
} }
func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) { func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) {
@ -64,6 +66,7 @@ func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_
ctx, ctx,
b.runtime, b.runtime,
b.containerdAddress, b.containerdAddress,
b.containerdTTRPCAddress,
b.bundle.Path, b.bundle.Path,
opts, opts,
args..., args...,
@ -124,6 +127,7 @@ func (b *binary) Delete(ctx context.Context) (*runtime.Exit, error) {
cmd, err := client.Command(ctx, cmd, err := client.Command(ctx,
b.runtime, b.runtime,
b.containerdAddress, b.containerdAddress,
b.containerdTTRPCAddress,
bundlePath, bundlePath,
nil, nil,
"-id", b.bundle.ID, "-id", b.bundle.ID,

View File

@ -44,7 +44,7 @@ type service struct {
} }
// StartShim is a binary call that executes a new shim returning the address // StartShim is a binary call that executes a new shim returning the address
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) { func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) {
return "", nil return "", nil
} }

View File

@ -69,25 +69,26 @@ func init() {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return New(ic.Context, ic.Root, ic.State, ic.Address, ic.Events, m.(*metadata.DB)) return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, ic.Events, m.(*metadata.DB))
}, },
}) })
} }
// New task manager for v2 shims // New task manager for v2 shims
func New(ctx context.Context, root, state, containerdAddress string, events *exchange.Exchange, db *metadata.DB) (*TaskManager, error) { func New(ctx context.Context, root, state, containerdAddress, containerdTTRPCAddress string, events *exchange.Exchange, db *metadata.DB) (*TaskManager, error) {
for _, d := range []string{root, state} { for _, d := range []string{root, state} {
if err := os.MkdirAll(d, 0711); err != nil { if err := os.MkdirAll(d, 0711); err != nil {
return nil, err return nil, err
} }
} }
m := &TaskManager{ m := &TaskManager{
root: root, root: root,
state: state, state: state,
containerdAddress: containerdAddress, containerdAddress: containerdAddress,
tasks: runtime.NewTaskList(), containerdTTRPCAddress: containerdTTRPCAddress,
events: events, tasks: runtime.NewTaskList(),
db: db, events: events,
db: db,
} }
if err := m.loadExistingTasks(ctx); err != nil { if err := m.loadExistingTasks(ctx); err != nil {
return nil, err return nil, err
@ -97,9 +98,10 @@ func New(ctx context.Context, root, state, containerdAddress string, events *exc
// TaskManager manages v2 shim's and their tasks // TaskManager manages v2 shim's and their tasks
type TaskManager struct { type TaskManager struct {
root string root string
state string state string
containerdAddress string containerdAddress string
containerdTTRPCAddress string
tasks *runtime.TaskList tasks *runtime.TaskList
events *exchange.Exchange events *exchange.Exchange
@ -131,7 +133,7 @@ func (m *TaskManager) Create(ctx context.Context, id string, opts runtime.Create
topts = opts.RuntimeOptions topts = opts.RuntimeOptions
} }
b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.events, m.tasks) b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
shim, err := b.Start(ctx, topts, func() { shim, err := b.Start(ctx, topts, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected") log.G(ctx).WithField("id", id).Info("shim disconnected")
_, err := m.tasks.Get(ctx, id) _, err := m.tasks.Get(ctx, id)
@ -254,7 +256,7 @@ func (m *TaskManager) loadTasks(ctx context.Context) error {
bundle.Delete() bundle.Delete()
continue continue
} }
binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.events, m.tasks) binaryCall := shimBinary(ctx, bundle, container.Runtime.Name, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks)
shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() { shim, err := loadShim(ctx, bundle, m.events, m.tasks, func() {
log.G(ctx).WithField("id", id).Info("shim disconnected") log.G(ctx).WithField("id", id).Info("shim disconnected")
_, err := m.tasks.Get(ctx, id) _, err := m.tasks.Get(ctx, id)

View File

@ -102,7 +102,7 @@ type service struct {
cancel func() cancel func()
} }
func newCommand(ctx context.Context, id, containerdBinary, containerdAddress string) (*exec.Cmd, error) { func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) {
ns, err := namespaces.NamespaceRequired(ctx) ns, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -119,6 +119,7 @@ func newCommand(ctx context.Context, id, containerdBinary, containerdAddress str
"-namespace", ns, "-namespace", ns,
"-id", id, "-id", id,
"-address", containerdAddress, "-address", containerdAddress,
"-ttrpc-address", containerdTTRPCAddress,
} }
cmd := exec.Command(self, args...) cmd := exec.Command(self, args...)
cmd.Dir = cwd cmd.Dir = cwd
@ -129,8 +130,8 @@ func newCommand(ctx context.Context, id, containerdBinary, containerdAddress str
return cmd, nil return cmd, nil
} }
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) { func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) {
cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress) cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress, containerdTTRPCAddress)
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@ -118,7 +118,7 @@ type service struct {
cancel func() cancel func()
} }
func newCommand(ctx context.Context, id, containerdBinary, containerdAddress string) (*exec.Cmd, error) { func newCommand(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (*exec.Cmd, error) {
ns, err := namespaces.NamespaceRequired(ctx) ns, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -135,6 +135,7 @@ func newCommand(ctx context.Context, id, containerdBinary, containerdAddress str
"-namespace", ns, "-namespace", ns,
"-id", id, "-id", id,
"-address", containerdAddress, "-address", containerdAddress,
"-ttrpc-address", containerdTTRPCAddress,
} }
cmd := exec.Command(self, args...) cmd := exec.Command(self, args...)
cmd.Dir = cwd cmd.Dir = cwd
@ -158,8 +159,8 @@ func readSpec() (*spec, error) {
return &s, nil return &s, nil
} }
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) { func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error) {
cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress) cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress, containerdTTRPCAddress)
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@ -57,7 +57,7 @@ type Init func(context.Context, string, Publisher, func()) (Shim, error)
type Shim interface { type Shim interface {
shimapi.TaskService shimapi.TaskService
Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error) Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error)
StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) StartShim(ctx context.Context, id, containerdBinary, containerdAddress, containerdTTRPCAddress string) (string, error)
} }
// OptsKey is the context key for the Opts value. // OptsKey is the context key for the Opts value.
@ -89,6 +89,7 @@ var (
socketFlag string socketFlag string
bundlePath string bundlePath string
addressFlag string addressFlag string
ttrpcAddressFlag string
containerdBinaryFlag string containerdBinaryFlag string
action string action string
) )
@ -101,6 +102,7 @@ func parseFlags() {
flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir") flag.StringVar(&bundlePath, "bundle", "", "path to the bundle if not workdir")
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd") flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
flag.StringVar(&ttrpcAddressFlag, "ttrpc-address", "", "ttrpc address back to main containerd")
flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)") flag.StringVar(&containerdBinaryFlag, "publish-binary", "containerd", "path to publish binary (used for publishing events)")
flag.Parse() flag.Parse()
@ -163,8 +165,7 @@ func run(id string, initFunc Init, config Config) error {
} }
} }
address := fmt.Sprintf("%s.ttrpc", addressFlag) publisher, err := newPublisher(ttrpcAddressFlag)
publisher, err := newPublisher(address)
if err != nil { if err != nil {
return err return err
} }
@ -203,7 +204,7 @@ func run(id string, initFunc Init, config Config) error {
} }
return nil return nil
case "start": case "start":
address, err := service.StartShim(ctx, idFlag, containerdBinaryFlag, addressFlag) address, err := service.StartShim(ctx, idFlag, containerdBinaryFlag, addressFlag, ttrpcAddressFlag)
if err != nil { if err != nil {
return err return err
} }

View File

@ -38,7 +38,7 @@ import (
var runtimePaths sync.Map var runtimePaths sync.Map
// Command returns the shim command with the provided args and configuration // Command returns the shim command with the provided args and configuration
func Command(ctx context.Context, runtime, containerdAddress, path string, opts *types.Any, cmdArgs ...string) (*exec.Cmd, error) { func Command(ctx context.Context, runtime, containerdAddress, containerdTTRPCAddress, path string, opts *types.Any, cmdArgs ...string) (*exec.Cmd, error) {
ns, err := namespaces.NamespaceRequired(ctx) ns, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -50,6 +50,7 @@ func Command(ctx context.Context, runtime, containerdAddress, path string, opts
args := []string{ args := []string{
"-namespace", ns, "-namespace", ns,
"-address", containerdAddress, "-address", containerdAddress,
"-ttrpc-address", containerdTTRPCAddress,
"-publish-binary", self, "-publish-binary", self,
} }
args = append(args, cmdArgs...) args = append(args, cmdArgs...)

View File

@ -37,6 +37,8 @@ type Config struct {
PluginDir string `toml:"plugin_dir"` PluginDir string `toml:"plugin_dir"`
// GRPC configuration settings // GRPC configuration settings
GRPC GRPCConfig `toml:"grpc"` GRPC GRPCConfig `toml:"grpc"`
// TTRPC configuration settings
TTRPC TTRPCConfig `toml:"ttrpc"`
// Debug and profiling settings // Debug and profiling settings
Debug Debug `toml:"debug"` Debug Debug `toml:"debug"`
// Metrics and monitoring settings // Metrics and monitoring settings
@ -125,6 +127,13 @@ type GRPCConfig struct {
MaxSendMsgSize int `toml:"max_send_message_size"` MaxSendMsgSize int `toml:"max_send_message_size"`
} }
// TTRPCConfig provides TTRPC configuration for the socket
type TTRPCConfig struct {
Address string `toml:"address"`
UID int `toml:"uid"`
GID int `toml:"gid"`
}
// Debug provides debug configuration // Debug provides debug configuration
type Debug struct { type Debug struct {
Address string `toml:"address"` Address string `toml:"address"`

View File

@ -154,6 +154,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
) )
initContext.Events = s.events initContext.Events = s.events
initContext.Address = config.GRPC.Address initContext.Address = config.GRPC.Address
initContext.TTRPCAddress = config.TTRPC.Address
// load the plugin specific configuration if it is provided // load the plugin specific configuration if it is provided
if p.Config != nil { if p.Config != nil {