Merge pull request #1729 from crosbymichael/shim-size
Reduce shim size by reducing imports
This commit is contained in:
commit
844e957834
@ -22,6 +22,7 @@ import (
|
|||||||
versionservice "github.com/containerd/containerd/api/services/version/v1"
|
versionservice "github.com/containerd/containerd/api/services/version/v1"
|
||||||
"github.com/containerd/containerd/containers"
|
"github.com/containerd/containerd/containers"
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
|
"github.com/containerd/containerd/dialer"
|
||||||
"github.com/containerd/containerd/diff"
|
"github.com/containerd/containerd/diff"
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
"github.com/containerd/containerd/images"
|
"github.com/containerd/containerd/images"
|
||||||
@ -72,7 +73,7 @@ func New(address string, opts ...ClientOpt) (*Client, error) {
|
|||||||
grpc.WithTimeout(60 * time.Second),
|
grpc.WithTimeout(60 * time.Second),
|
||||||
grpc.FailOnNonTempDialError(true),
|
grpc.FailOnNonTempDialError(true),
|
||||||
grpc.WithBackoffMaxDelay(3 * time.Second),
|
grpc.WithBackoffMaxDelay(3 * time.Second),
|
||||||
grpc.WithDialer(Dialer),
|
grpc.WithDialer(dialer.Dialer),
|
||||||
}
|
}
|
||||||
if len(copts.dialOptions) > 0 {
|
if len(copts.dialOptions) > 0 {
|
||||||
gopts = copts.dialOptions
|
gopts = copts.dialOptions
|
||||||
@ -84,7 +85,7 @@ func New(address string, opts ...ClientOpt) (*Client, error) {
|
|||||||
grpc.WithStreamInterceptor(stream),
|
grpc.WithStreamInterceptor(stream),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
conn, err := grpc.Dial(DialAddress(address), gopts...)
|
conn, err := grpc.Dial(dialer.DialAddress(address), gopts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "failed to dial %q", address)
|
return nil, errors.Wrapf(err, "failed to dial %q", address)
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
@ -13,19 +14,17 @@ import (
|
|||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/containerd"
|
|
||||||
eventsapi "github.com/containerd/containerd/api/services/events/v1"
|
eventsapi "github.com/containerd/containerd/api/services/events/v1"
|
||||||
|
"github.com/containerd/containerd/dialer"
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
"github.com/containerd/containerd/events"
|
"github.com/containerd/containerd/events"
|
||||||
"github.com/containerd/containerd/linux/shim"
|
"github.com/containerd/containerd/linux/shim"
|
||||||
shimapi "github.com/containerd/containerd/linux/shim/v1"
|
shimapi "github.com/containerd/containerd/linux/shim/v1"
|
||||||
"github.com/containerd/containerd/reaper"
|
"github.com/containerd/containerd/reaper"
|
||||||
"github.com/containerd/containerd/version"
|
|
||||||
"github.com/containerd/typeurl"
|
"github.com/containerd/typeurl"
|
||||||
google_protobuf "github.com/golang/protobuf/ptypes/empty"
|
google_protobuf "github.com/golang/protobuf/ptypes/empty"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/urfave/cli"
|
|
||||||
"golang.org/x/sys/unix"
|
"golang.org/x/sys/unix"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
@ -40,101 +39,83 @@ const usage = `
|
|||||||
shim for container lifecycle and reconnection
|
shim for container lifecycle and reconnection
|
||||||
`
|
`
|
||||||
|
|
||||||
|
var (
|
||||||
|
debugFlag bool
|
||||||
|
namespaceFlag string
|
||||||
|
socketFlag string
|
||||||
|
addressFlag string
|
||||||
|
workdirFlag string
|
||||||
|
runtimeRootFlag string
|
||||||
|
criuFlag string
|
||||||
|
systemdCgroupFlag bool
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
flag.BoolVar(&debugFlag, "debug", false, "enable debug output in logs")
|
||||||
|
flag.StringVar(&namespaceFlag, "namespace", "", "namespace that owns the shim")
|
||||||
|
flag.StringVar(&socketFlag, "socket", "", "abstract socket path to serve")
|
||||||
|
flag.StringVar(&addressFlag, "address", "", "grpc address back to main containerd")
|
||||||
|
flag.StringVar(&workdirFlag, "workdir", "", "path used to storge large temporary data")
|
||||||
|
flag.StringVar(&runtimeRootFlag, "runtime-root", shim.RuncRoot, "root directory for the runtime")
|
||||||
|
flag.StringVar(&criuFlag, "criu", "", "path to criu binary")
|
||||||
|
flag.BoolVar(&systemdCgroupFlag, "systemd-cgroup", false, "set runtime to use systemd-cgroup")
|
||||||
|
flag.Parse()
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
app := cli.NewApp()
|
if debugFlag {
|
||||||
app.Name = "containerd-shim"
|
logrus.SetLevel(logrus.DebugLevel)
|
||||||
app.Version = version.Version
|
|
||||||
app.Usage = usage
|
|
||||||
app.Flags = []cli.Flag{
|
|
||||||
cli.BoolFlag{
|
|
||||||
Name: "debug",
|
|
||||||
Usage: "enable debug output in logs",
|
|
||||||
},
|
|
||||||
cli.StringFlag{
|
|
||||||
Name: "namespace,n",
|
|
||||||
Usage: "namespace that owns the task",
|
|
||||||
},
|
|
||||||
cli.StringFlag{
|
|
||||||
Name: "socket,s",
|
|
||||||
Usage: "abstract socket path to serve on",
|
|
||||||
},
|
|
||||||
cli.StringFlag{
|
|
||||||
Name: "address,a",
|
|
||||||
Usage: "grpc address back to containerd",
|
|
||||||
},
|
|
||||||
cli.StringFlag{
|
|
||||||
Name: "workdir,w",
|
|
||||||
Usage: "path used to store large temporary data",
|
|
||||||
},
|
|
||||||
cli.StringFlag{
|
|
||||||
Name: "runtime-root",
|
|
||||||
Usage: "root directory for the runtime",
|
|
||||||
Value: shim.RuncRoot,
|
|
||||||
},
|
|
||||||
cli.StringFlag{
|
|
||||||
Name: "criu,c",
|
|
||||||
Usage: "path to criu",
|
|
||||||
},
|
|
||||||
cli.BoolFlag{
|
|
||||||
Name: "systemd-cgroup",
|
|
||||||
Usage: "set runtime to use systemd-cgroup",
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
app.Before = func(context *cli.Context) error {
|
if err := executeShim(); err != nil {
|
||||||
if context.GlobalBool("debug") {
|
|
||||||
logrus.SetLevel(logrus.DebugLevel)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
app.Action = func(context *cli.Context) error {
|
|
||||||
// start handling signals as soon as possible so that things are properly reaped
|
|
||||||
// or if runtime exits before we hit the handler
|
|
||||||
signals, err := setupSignals()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
path, err := os.Getwd()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
server := newServer()
|
|
||||||
e, err := connectEvents(context.GlobalString("address"))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
sv, err := shim.NewService(
|
|
||||||
shim.Config{
|
|
||||||
Path: path,
|
|
||||||
Namespace: context.GlobalString("namespace"),
|
|
||||||
WorkDir: context.GlobalString("workdir"),
|
|
||||||
Criu: context.GlobalString("criu"),
|
|
||||||
SystemdCgroup: context.GlobalBool("systemd-cgroup"),
|
|
||||||
RuntimeRoot: context.GlobalString("runtime-root"),
|
|
||||||
},
|
|
||||||
&remoteEventsPublisher{client: e},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
logrus.Debug("registering grpc server")
|
|
||||||
shimapi.RegisterShimServer(server, sv)
|
|
||||||
socket := context.GlobalString("socket")
|
|
||||||
if err := serve(server, socket); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
logger := logrus.WithFields(logrus.Fields{
|
|
||||||
"pid": os.Getpid(),
|
|
||||||
"path": path,
|
|
||||||
"namespace": context.GlobalString("namespace"),
|
|
||||||
})
|
|
||||||
return handleSignals(logger, signals, server, sv)
|
|
||||||
}
|
|
||||||
if err := app.Run(os.Args); err != nil {
|
|
||||||
fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err)
|
fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func executeShim() error {
|
||||||
|
// start handling signals as soon as possible so that things are properly reaped
|
||||||
|
// or if runtime exits before we hit the handler
|
||||||
|
signals, err := setupSignals()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
path, err := os.Getwd()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
server := newServer()
|
||||||
|
e, err := connectEvents(addressFlag)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sv, err := shim.NewService(
|
||||||
|
shim.Config{
|
||||||
|
Path: path,
|
||||||
|
Namespace: namespaceFlag,
|
||||||
|
WorkDir: workdirFlag,
|
||||||
|
Criu: criuFlag,
|
||||||
|
SystemdCgroup: systemdCgroupFlag,
|
||||||
|
RuntimeRoot: runtimeRootFlag,
|
||||||
|
},
|
||||||
|
&remoteEventsPublisher{client: e},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
logrus.Debug("registering grpc server")
|
||||||
|
shimapi.RegisterShimServer(server, sv)
|
||||||
|
socket := socketFlag
|
||||||
|
if err := serve(server, socket); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
logger := logrus.WithFields(logrus.Fields{
|
||||||
|
"pid": os.Getpid(),
|
||||||
|
"path": path,
|
||||||
|
"namespace": namespaceFlag,
|
||||||
|
})
|
||||||
|
return handleSignals(logger, signals, server, sv)
|
||||||
|
}
|
||||||
|
|
||||||
// serve serves the grpc API over a unix socket at the provided path
|
// serve serves the grpc API over a unix socket at the provided path
|
||||||
// this function does not block
|
// this function does not block
|
||||||
func serve(server *grpc.Server, path string) error {
|
func serve(server *grpc.Server, path string) error {
|
||||||
@ -212,7 +193,7 @@ func dumpStacks(logger *logrus.Entry) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func connectEvents(address string) (eventsapi.EventsClient, error) {
|
func connectEvents(address string) (eventsapi.EventsClient, error) {
|
||||||
conn, err := connect(address, containerd.Dialer)
|
conn, err := connect(address, dialer.Dialer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "failed to dial %q", address)
|
return nil, errors.Wrapf(err, "failed to dial %q", address)
|
||||||
}
|
}
|
||||||
@ -228,7 +209,7 @@ func connect(address string, d func(string, time.Duration) (net.Conn, error)) (*
|
|||||||
grpc.FailOnNonTempDialError(true),
|
grpc.FailOnNonTempDialError(true),
|
||||||
grpc.WithBackoffMaxDelay(3 * time.Second),
|
grpc.WithBackoffMaxDelay(3 * time.Second),
|
||||||
}
|
}
|
||||||
conn, err := grpc.Dial(containerd.DialAddress(address), gopts...)
|
conn, err := grpc.Dial(dialer.DialAddress(address), gopts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "failed to dial %q", address)
|
return nil, errors.Wrapf(err, "failed to dial %q", address)
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package containerd
|
package dialer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net"
|
"net"
|
@ -1,6 +1,6 @@
|
|||||||
// +build !windows
|
// +build !windows
|
||||||
|
|
||||||
package containerd
|
package dialer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -11,6 +11,12 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// DialAddress returns the address with unix:// prepended to the
|
||||||
|
// provided address
|
||||||
|
func DialAddress(address string) string {
|
||||||
|
return fmt.Sprintf("unix://%s", address)
|
||||||
|
}
|
||||||
|
|
||||||
func isNoent(err error) bool {
|
func isNoent(err error) bool {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if nerr, ok := err.(*net.OpError); ok {
|
if nerr, ok := err.(*net.OpError); ok {
|
||||||
@ -28,9 +34,3 @@ func dialer(address string, timeout time.Duration) (net.Conn, error) {
|
|||||||
address = strings.TrimPrefix(address, "unix://")
|
address = strings.TrimPrefix(address, "unix://")
|
||||||
return net.DialTimeout("unix", address, timeout)
|
return net.DialTimeout("unix", address, timeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DialAddress returns the address with unix:// prepended to the
|
|
||||||
// provided address
|
|
||||||
func DialAddress(address string) string {
|
|
||||||
return fmt.Sprintf("unix://%s", address)
|
|
||||||
}
|
|
@ -1,4 +1,4 @@
|
|||||||
package containerd
|
package dialer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"net"
|
"net"
|
@ -1,12 +1,13 @@
|
|||||||
package events
|
package exchange
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
events "github.com/containerd/containerd/api/services/events/v1"
|
v1 "github.com/containerd/containerd/api/services/events/v1"
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
|
"github.com/containerd/containerd/events"
|
||||||
"github.com/containerd/containerd/filters"
|
"github.com/containerd/containerd/filters"
|
||||||
"github.com/containerd/containerd/identifiers"
|
"github.com/containerd/containerd/identifiers"
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
@ -34,7 +35,7 @@ func NewExchange() *Exchange {
|
|||||||
//
|
//
|
||||||
// This is useful when an event is forwaded on behalf of another namespace or
|
// This is useful when an event is forwaded on behalf of another namespace or
|
||||||
// when the event is propagated on behalf of another publisher.
|
// when the event is propagated on behalf of another publisher.
|
||||||
func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err error) {
|
func (e *Exchange) Forward(ctx context.Context, envelope *v1.Envelope) (err error) {
|
||||||
if err := validateEnvelope(envelope); err != nil {
|
if err := validateEnvelope(envelope); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -59,11 +60,11 @@ func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err
|
|||||||
// Publish packages and sends an event. The caller will be considered the
|
// Publish packages and sends an event. The caller will be considered the
|
||||||
// initial publisher of the event. This means the timestamp will be calculated
|
// initial publisher of the event. This means the timestamp will be calculated
|
||||||
// at this point and this method may read from the calling context.
|
// at this point and this method may read from the calling context.
|
||||||
func (e *Exchange) Publish(ctx context.Context, topic string, event Event) (err error) {
|
func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event) (err error) {
|
||||||
var (
|
var (
|
||||||
namespace string
|
namespace string
|
||||||
encoded *types.Any
|
encoded *types.Any
|
||||||
envelope events.Envelope
|
envelope v1.Envelope
|
||||||
)
|
)
|
||||||
|
|
||||||
namespace, err = namespaces.NamespaceRequired(ctx)
|
namespace, err = namespaces.NamespaceRequired(ctx)
|
||||||
@ -108,9 +109,9 @@ func (e *Exchange) Publish(ctx context.Context, topic string, event Event) (err
|
|||||||
// Zero or more filters may be provided as strings. Only events that match
|
// Zero or more filters may be provided as strings. Only events that match
|
||||||
// *any* of the provided filters will be sent on the channel. The filters use
|
// *any* of the provided filters will be sent on the channel. The filters use
|
||||||
// the standard containerd filters package syntax.
|
// the standard containerd filters package syntax.
|
||||||
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) {
|
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *v1.Envelope, errs <-chan error) {
|
||||||
var (
|
var (
|
||||||
evch = make(chan *events.Envelope)
|
evch = make(chan *v1.Envelope)
|
||||||
errq = make(chan error, 1)
|
errq = make(chan error, 1)
|
||||||
channel = goevents.NewChannel(0)
|
channel = goevents.NewChannel(0)
|
||||||
queue = goevents.NewQueue(channel)
|
queue = goevents.NewQueue(channel)
|
||||||
@ -150,7 +151,7 @@ func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *even
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case ev := <-channel.C:
|
case ev := <-channel.C:
|
||||||
env, ok := ev.(*events.Envelope)
|
env, ok := ev.(*v1.Envelope)
|
||||||
if !ok {
|
if !ok {
|
||||||
// TODO(stevvooe): For the most part, we are well protected
|
// TODO(stevvooe): For the most part, we are well protected
|
||||||
// from this condition. Both Forward and Publish protect
|
// from this condition. Both Forward and Publish protect
|
||||||
@ -204,7 +205,7 @@ func validateTopic(topic string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateEnvelope(envelope *events.Envelope) error {
|
func validateEnvelope(envelope *v1.Envelope) error {
|
||||||
if err := namespaces.Validate(envelope.Namespace); err != nil {
|
if err := namespaces.Validate(envelope.Namespace); err != nil {
|
||||||
return errors.Wrapf(err, "event envelope has invalid namespace")
|
return errors.Wrapf(err, "event envelope has invalid namespace")
|
||||||
}
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package events
|
package exchange
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -8,8 +8,9 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
events "github.com/containerd/containerd/api/services/events/v1"
|
v1 "github.com/containerd/containerd/api/services/events/v1"
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
|
"github.com/containerd/containerd/events"
|
||||||
"github.com/containerd/containerd/namespaces"
|
"github.com/containerd/containerd/namespaces"
|
||||||
"github.com/containerd/typeurl"
|
"github.com/containerd/typeurl"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
@ -17,10 +18,10 @@ import (
|
|||||||
|
|
||||||
func TestExchangeBasic(t *testing.T) {
|
func TestExchangeBasic(t *testing.T) {
|
||||||
ctx := namespaces.WithNamespace(context.Background(), t.Name())
|
ctx := namespaces.WithNamespace(context.Background(), t.Name())
|
||||||
testevents := []Event{
|
testevents := []events.Event{
|
||||||
&events.ContainerCreate{ID: "asdf"},
|
&v1.ContainerCreate{ID: "asdf"},
|
||||||
&events.ContainerCreate{ID: "qwer"},
|
&v1.ContainerCreate{ID: "qwer"},
|
||||||
&events.ContainerCreate{ID: "zxcv"},
|
&v1.ContainerCreate{ID: "zxcv"},
|
||||||
}
|
}
|
||||||
exchange := NewExchange()
|
exchange := NewExchange()
|
||||||
|
|
||||||
@ -55,7 +56,7 @@ func TestExchangeBasic(t *testing.T) {
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
for _, subscriber := range []struct {
|
for _, subscriber := range []struct {
|
||||||
eventq <-chan *events.Envelope
|
eventq <-chan *v1.Envelope
|
||||||
errq <-chan error
|
errq <-chan error
|
||||||
cancel func()
|
cancel func()
|
||||||
}{
|
}{
|
||||||
@ -70,7 +71,7 @@ func TestExchangeBasic(t *testing.T) {
|
|||||||
cancel: cancel2,
|
cancel: cancel2,
|
||||||
},
|
},
|
||||||
} {
|
} {
|
||||||
var received []Event
|
var received []events.Event
|
||||||
subscribercheck:
|
subscribercheck:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -79,7 +80,7 @@ func TestExchangeBasic(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
received = append(received, ev.(*events.ContainerCreate))
|
received = append(received, ev.(*v1.ContainerCreate))
|
||||||
case err := <-subscriber.errq:
|
case err := <-subscriber.errq:
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -117,7 +118,7 @@ func TestExchangeValidateTopic(t *testing.T) {
|
|||||||
},
|
},
|
||||||
} {
|
} {
|
||||||
t.Run(testcase.input, func(t *testing.T) {
|
t.Run(testcase.input, func(t *testing.T) {
|
||||||
event := &events.ContainerCreate{ID: t.Name()}
|
event := &v1.ContainerCreate{ID: t.Name()}
|
||||||
if err := exchange.Publish(ctx, testcase.input, event); errors.Cause(err) != testcase.err {
|
if err := exchange.Publish(ctx, testcase.input, event); errors.Cause(err) != testcase.err {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("expected error %v, received nil", testcase.err)
|
t.Fatalf("expected error %v, received nil", testcase.err)
|
||||||
@ -131,7 +132,7 @@ func TestExchangeValidateTopic(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
envelope := events.Envelope{
|
envelope := v1.Envelope{
|
||||||
Timestamp: time.Now().UTC(),
|
Timestamp: time.Now().UTC(),
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
Topic: testcase.input,
|
Topic: testcase.input,
|
@ -9,9 +9,10 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"github.com/containerd/containerd/events"
|
"github.com/containerd/containerd/events/exchange"
|
||||||
"github.com/containerd/containerd/linux/runcopts"
|
"github.com/containerd/containerd/linux/runcopts"
|
||||||
client "github.com/containerd/containerd/linux/shim"
|
"github.com/containerd/containerd/linux/shim"
|
||||||
|
"github.com/containerd/containerd/linux/shim/client"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -71,26 +72,26 @@ type bundle struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ShimOpt specifies shim options for initialization and connection
|
// ShimOpt specifies shim options for initialization and connection
|
||||||
type ShimOpt func(*bundle, string, *runcopts.RuncOptions) (client.Config, client.ClientOpt)
|
type ShimOpt func(*bundle, string, *runcopts.RuncOptions) (shim.Config, client.Opt)
|
||||||
|
|
||||||
// ShimRemote is a ShimOpt for connecting and starting a remote shim
|
// ShimRemote is a ShimOpt for connecting and starting a remote shim
|
||||||
func ShimRemote(shim, daemonAddress, cgroup string, nonewns, debug bool, exitHandler func()) ShimOpt {
|
func ShimRemote(shimBinary, daemonAddress, cgroup string, nonewns, debug bool, exitHandler func()) ShimOpt {
|
||||||
return func(b *bundle, ns string, ropts *runcopts.RuncOptions) (client.Config, client.ClientOpt) {
|
return func(b *bundle, ns string, ropts *runcopts.RuncOptions) (shim.Config, client.Opt) {
|
||||||
return b.shimConfig(ns, ropts),
|
return b.shimConfig(ns, ropts),
|
||||||
client.WithStart(shim, b.shimAddress(ns), daemonAddress, cgroup, nonewns, debug, exitHandler)
|
client.WithStart(shimBinary, b.shimAddress(ns), daemonAddress, cgroup, nonewns, debug, exitHandler)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShimLocal is a ShimOpt for using an in process shim implementation
|
// ShimLocal is a ShimOpt for using an in process shim implementation
|
||||||
func ShimLocal(exchange *events.Exchange) ShimOpt {
|
func ShimLocal(exchange *exchange.Exchange) ShimOpt {
|
||||||
return func(b *bundle, ns string, ropts *runcopts.RuncOptions) (client.Config, client.ClientOpt) {
|
return func(b *bundle, ns string, ropts *runcopts.RuncOptions) (shim.Config, client.Opt) {
|
||||||
return b.shimConfig(ns, ropts), client.WithLocal(exchange)
|
return b.shimConfig(ns, ropts), client.WithLocal(exchange)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShimConnect is a ShimOpt for connecting to an existing remote shim
|
// ShimConnect is a ShimOpt for connecting to an existing remote shim
|
||||||
func ShimConnect() ShimOpt {
|
func ShimConnect() ShimOpt {
|
||||||
return func(b *bundle, ns string, ropts *runcopts.RuncOptions) (client.Config, client.ClientOpt) {
|
return func(b *bundle, ns string, ropts *runcopts.RuncOptions) (shim.Config, client.Opt) {
|
||||||
return b.shimConfig(ns, ropts), client.WithConnect(b.shimAddress(ns))
|
return b.shimConfig(ns, ropts), client.WithConnect(b.shimAddress(ns))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -119,7 +120,7 @@ func (b *bundle) shimAddress(namespace string) string {
|
|||||||
return filepath.Join(string(filepath.Separator), "containerd-shim", namespace, b.id, "shim.sock")
|
return filepath.Join(string(filepath.Separator), "containerd-shim", namespace, b.id, "shim.sock")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *bundle) shimConfig(namespace string, runcOptions *runcopts.RuncOptions) client.Config {
|
func (b *bundle) shimConfig(namespace string, runcOptions *runcopts.RuncOptions) shim.Config {
|
||||||
var (
|
var (
|
||||||
criuPath string
|
criuPath string
|
||||||
runtimeRoot string
|
runtimeRoot string
|
||||||
@ -130,7 +131,7 @@ func (b *bundle) shimConfig(namespace string, runcOptions *runcopts.RuncOptions)
|
|||||||
systemdCgroup = runcOptions.SystemdCgroup
|
systemdCgroup = runcOptions.SystemdCgroup
|
||||||
runtimeRoot = runcOptions.RuntimeRoot
|
runtimeRoot = runcOptions.RuntimeRoot
|
||||||
}
|
}
|
||||||
return client.Config{
|
return shim.Config{
|
||||||
Path: b.path,
|
Path: b.path,
|
||||||
WorkDir: b.workDir,
|
WorkDir: b.workDir,
|
||||||
Namespace: namespace,
|
Namespace: namespace,
|
||||||
|
@ -15,7 +15,7 @@ import (
|
|||||||
"github.com/containerd/containerd/api/types"
|
"github.com/containerd/containerd/api/types"
|
||||||
"github.com/containerd/containerd/containers"
|
"github.com/containerd/containerd/containers"
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
"github.com/containerd/containerd/events"
|
"github.com/containerd/containerd/events/exchange"
|
||||||
"github.com/containerd/containerd/identifiers"
|
"github.com/containerd/containerd/identifiers"
|
||||||
"github.com/containerd/containerd/linux/runcopts"
|
"github.com/containerd/containerd/linux/runcopts"
|
||||||
client "github.com/containerd/containerd/linux/shim"
|
client "github.com/containerd/containerd/linux/shim"
|
||||||
@ -143,7 +143,7 @@ type Runtime struct {
|
|||||||
monitor runtime.TaskMonitor
|
monitor runtime.TaskMonitor
|
||||||
tasks *runtime.TaskList
|
tasks *runtime.TaskList
|
||||||
db *metadata.DB
|
db *metadata.DB
|
||||||
events *events.Exchange
|
events *exchange.Exchange
|
||||||
|
|
||||||
config *Config
|
config *Config
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
// +build !windows
|
// +build !windows
|
||||||
|
|
||||||
package shim
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -20,19 +20,23 @@ import (
|
|||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/containerd/containerd/events"
|
"github.com/containerd/containerd/events"
|
||||||
shim "github.com/containerd/containerd/linux/shim/v1"
|
"github.com/containerd/containerd/linux/shim"
|
||||||
|
shimapi "github.com/containerd/containerd/linux/shim/v1"
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
"github.com/containerd/containerd/reaper"
|
"github.com/containerd/containerd/reaper"
|
||||||
"github.com/containerd/containerd/sys"
|
"github.com/containerd/containerd/sys"
|
||||||
|
google_protobuf "github.com/golang/protobuf/ptypes/empty"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ClientOpt is an option for a shim client configuration
|
var empty = &google_protobuf.Empty{}
|
||||||
type ClientOpt func(context.Context, Config) (shim.ShimClient, io.Closer, error)
|
|
||||||
|
// Opt is an option for a shim client configuration
|
||||||
|
type Opt func(context.Context, shim.Config) (shimapi.ShimClient, io.Closer, error)
|
||||||
|
|
||||||
// WithStart executes a new shim process
|
// WithStart executes a new shim process
|
||||||
func WithStart(binary, address, daemonAddress, cgroup string, nonewns, debug bool, exitHandler func()) ClientOpt {
|
func WithStart(binary, address, daemonAddress, cgroup string, nonewns, debug bool, exitHandler func()) Opt {
|
||||||
return func(ctx context.Context, config Config) (_ shim.ShimClient, _ io.Closer, err error) {
|
return func(ctx context.Context, config shim.Config) (_ shimapi.ShimClient, _ io.Closer, err error) {
|
||||||
socket, err := newSocket(address)
|
socket, err := newSocket(address)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
@ -84,24 +88,24 @@ func WithStart(binary, address, daemonAddress, cgroup string, nonewns, debug boo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCommand(binary, daemonAddress string, nonewns, debug bool, config Config, socket *os.File) *exec.Cmd {
|
func newCommand(binary, daemonAddress string, nonewns, debug bool, config shim.Config, socket *os.File) *exec.Cmd {
|
||||||
args := []string{
|
args := []string{
|
||||||
"--namespace", config.Namespace,
|
"-namespace", config.Namespace,
|
||||||
"--workdir", config.WorkDir,
|
"-workdir", config.WorkDir,
|
||||||
"--address", daemonAddress,
|
"-address", daemonAddress,
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.Criu != "" {
|
if config.Criu != "" {
|
||||||
args = append(args, "--criu-path", config.Criu)
|
args = append(args, "-criu-path", config.Criu)
|
||||||
}
|
}
|
||||||
if config.RuntimeRoot != "" {
|
if config.RuntimeRoot != "" {
|
||||||
args = append(args, "--runtime-root", config.RuntimeRoot)
|
args = append(args, "-runtime-root", config.RuntimeRoot)
|
||||||
}
|
}
|
||||||
if config.SystemdCgroup {
|
if config.SystemdCgroup {
|
||||||
args = append(args, "--systemd-cgroup")
|
args = append(args, "-systemd-cgroup")
|
||||||
}
|
}
|
||||||
if debug {
|
if debug {
|
||||||
args = append(args, "--debug")
|
args = append(args, "-debug")
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd := exec.Command(binary, args...)
|
cmd := exec.Command(binary, args...)
|
||||||
@ -160,39 +164,29 @@ func dialAddress(address string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// WithConnect connects to an existing shim
|
// WithConnect connects to an existing shim
|
||||||
func WithConnect(address string) ClientOpt {
|
func WithConnect(address string) Opt {
|
||||||
return func(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) {
|
return func(ctx context.Context, config shim.Config) (shimapi.ShimClient, io.Closer, error) {
|
||||||
conn, err := connect(address, annonDialer)
|
conn, err := connect(address, annonDialer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
return shim.NewShimClient(conn), conn, nil
|
return shimapi.NewShimClient(conn), conn, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithLocal uses an in process shim
|
// WithLocal uses an in process shim
|
||||||
func WithLocal(publisher events.Publisher) func(context.Context, Config) (shim.ShimClient, io.Closer, error) {
|
func WithLocal(publisher events.Publisher) func(context.Context, shim.Config) (shimapi.ShimClient, io.Closer, error) {
|
||||||
return func(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) {
|
return func(ctx context.Context, config shim.Config) (shimapi.ShimClient, io.Closer, error) {
|
||||||
service, err := NewService(config, publisher)
|
service, err := shim.NewService(config, publisher)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
return NewLocal(service), nil, nil
|
return shim.NewLocal(service), nil, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config contains shim specific configuration
|
|
||||||
type Config struct {
|
|
||||||
Path string
|
|
||||||
Namespace string
|
|
||||||
WorkDir string
|
|
||||||
Criu string
|
|
||||||
RuntimeRoot string
|
|
||||||
SystemdCgroup bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// New returns a new shim client
|
// New returns a new shim client
|
||||||
func New(ctx context.Context, config Config, opt ClientOpt) (*Client, error) {
|
func New(ctx context.Context, config shim.Config, opt Opt) (*Client, error) {
|
||||||
s, c, err := opt(ctx, config)
|
s, c, err := opt(ctx, config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -206,7 +200,7 @@ func New(ctx context.Context, config Config, opt ClientOpt) (*Client, error) {
|
|||||||
|
|
||||||
// Client is a shim client containing the connection to a shim
|
// Client is a shim client containing the connection to a shim
|
||||||
type Client struct {
|
type Client struct {
|
||||||
shim.ShimClient
|
shimapi.ShimClient
|
||||||
|
|
||||||
c io.Closer
|
c io.Closer
|
||||||
exitCh chan struct{}
|
exitCh chan struct{}
|
@ -1,6 +1,6 @@
|
|||||||
// +build linux
|
// +build linux
|
||||||
|
|
||||||
package shim
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os/exec"
|
"os/exec"
|
@ -1,6 +1,6 @@
|
|||||||
// +build !linux,!windows
|
// +build !linux,!windows
|
||||||
|
|
||||||
package shim
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os/exec"
|
"os/exec"
|
@ -98,12 +98,16 @@ func (s *Service) newInitProcess(context context.Context, r *shimapi.CreateTaskR
|
|||||||
return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m)
|
return nil, errors.Wrapf(err, "failed to mount rootfs component %v", m)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
root := s.config.RuntimeRoot
|
||||||
|
if root == "" {
|
||||||
|
root = RuncRoot
|
||||||
|
}
|
||||||
runtime := &runc.Runc{
|
runtime := &runc.Runc{
|
||||||
Command: r.Runtime,
|
Command: r.Runtime,
|
||||||
Log: filepath.Join(s.config.Path, "log.json"),
|
Log: filepath.Join(s.config.Path, "log.json"),
|
||||||
LogFormat: runc.JSON,
|
LogFormat: runc.JSON,
|
||||||
PdeathSignal: syscall.SIGKILL,
|
PdeathSignal: syscall.SIGKILL,
|
||||||
Root: filepath.Join(s.config.RuntimeRoot, s.config.Namespace),
|
Root: filepath.Join(root, s.config.Namespace),
|
||||||
Criu: s.config.Criu,
|
Criu: s.config.Criu,
|
||||||
SystemdCgroup: s.config.SystemdCgroup,
|
SystemdCgroup: s.config.SystemdCgroup,
|
||||||
}
|
}
|
||||||
|
@ -32,6 +32,16 @@ var empty = &google_protobuf.Empty{}
|
|||||||
// RuncRoot is the path to the root runc state directory
|
// RuncRoot is the path to the root runc state directory
|
||||||
const RuncRoot = "/run/containerd/runc"
|
const RuncRoot = "/run/containerd/runc"
|
||||||
|
|
||||||
|
// Config contains shim specific configuration
|
||||||
|
type Config struct {
|
||||||
|
Path string
|
||||||
|
Namespace string
|
||||||
|
WorkDir string
|
||||||
|
Criu string
|
||||||
|
RuntimeRoot string
|
||||||
|
SystemdCgroup bool
|
||||||
|
}
|
||||||
|
|
||||||
// NewService returns a new shim service that can be used via GRPC
|
// NewService returns a new shim service that can be used via GRPC
|
||||||
func NewService(config Config, publisher events.Publisher) (*Service, error) {
|
func NewService(config Config, publisher events.Publisher) (*Service, error) {
|
||||||
if config.Namespace == "" {
|
if config.Namespace == "" {
|
||||||
|
@ -11,7 +11,7 @@ import (
|
|||||||
"github.com/containerd/cgroups"
|
"github.com/containerd/cgroups"
|
||||||
"github.com/containerd/containerd/api/types/task"
|
"github.com/containerd/containerd/api/types/task"
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
client "github.com/containerd/containerd/linux/shim"
|
"github.com/containerd/containerd/linux/shim/client"
|
||||||
shim "github.com/containerd/containerd/linux/shim/v1"
|
shim "github.com/containerd/containerd/linux/shim/v1"
|
||||||
"github.com/containerd/containerd/runtime"
|
"github.com/containerd/containerd/runtime"
|
||||||
"github.com/gogo/protobuf/types"
|
"github.com/gogo/protobuf/types"
|
||||||
|
@ -5,7 +5,7 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
"github.com/containerd/containerd/events"
|
"github.com/containerd/containerd/events/exchange"
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
@ -18,7 +18,7 @@ type InitContext struct {
|
|||||||
State string
|
State string
|
||||||
Config interface{}
|
Config interface{}
|
||||||
Address string
|
Address string
|
||||||
Events *events.Exchange
|
Events *exchange.Exchange
|
||||||
|
|
||||||
Meta *Meta // plugins can fill in metadata at init.
|
Meta *Meta // plugins can fill in metadata at init.
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@ import (
|
|||||||
version "github.com/containerd/containerd/api/services/version/v1"
|
version "github.com/containerd/containerd/api/services/version/v1"
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
"github.com/containerd/containerd/content/local"
|
"github.com/containerd/containerd/content/local"
|
||||||
"github.com/containerd/containerd/events"
|
"github.com/containerd/containerd/events/exchange"
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
"github.com/containerd/containerd/metadata"
|
"github.com/containerd/containerd/metadata"
|
||||||
"github.com/containerd/containerd/plugin"
|
"github.com/containerd/containerd/plugin"
|
||||||
@ -65,7 +65,7 @@ func New(ctx context.Context, config *Config) (*Server, error) {
|
|||||||
services []plugin.Service
|
services []plugin.Service
|
||||||
s = &Server{
|
s = &Server{
|
||||||
rpc: rpc,
|
rpc: rpc,
|
||||||
events: events.NewExchange(),
|
events: exchange.NewExchange(),
|
||||||
}
|
}
|
||||||
initialized = plugin.NewPluginSet()
|
initialized = plugin.NewPluginSet()
|
||||||
)
|
)
|
||||||
@ -122,7 +122,7 @@ func New(ctx context.Context, config *Config) (*Server, error) {
|
|||||||
// Server is the containerd main daemon
|
// Server is the containerd main daemon
|
||||||
type Server struct {
|
type Server struct {
|
||||||
rpc *grpc.Server
|
rpc *grpc.Server
|
||||||
events *events.Exchange
|
events *exchange.Exchange
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServeGRPC provides the containerd grpc APIs on the provided listener
|
// ServeGRPC provides the containerd grpc APIs on the provided listener
|
||||||
|
@ -3,7 +3,7 @@ package events
|
|||||||
import (
|
import (
|
||||||
api "github.com/containerd/containerd/api/services/events/v1"
|
api "github.com/containerd/containerd/api/services/events/v1"
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
"github.com/containerd/containerd/events"
|
"github.com/containerd/containerd/events/exchange"
|
||||||
"github.com/containerd/containerd/plugin"
|
"github.com/containerd/containerd/plugin"
|
||||||
"github.com/golang/protobuf/ptypes/empty"
|
"github.com/golang/protobuf/ptypes/empty"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
@ -22,11 +22,11 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type service struct {
|
type service struct {
|
||||||
events *events.Exchange
|
events *exchange.Exchange
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewService returns the GRPC events server
|
// NewService returns the GRPC events server
|
||||||
func NewService(events *events.Exchange) api.EventsServer {
|
func NewService(events *exchange.Exchange) api.EventsServer {
|
||||||
return &service{events: events}
|
return &service{events: events}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user