From 657856521623ee06177b0583904b42462d74158f Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Fri, 7 Jul 2017 16:12:33 -0700 Subject: [PATCH] Use event service post for shim events Signed-off-by: Michael Crosby --- cmd/containerd-shim/main_unix.go | 5 ++ linux/bundle.go | 4 +- linux/runtime.go | 86 ++++++-------------------------- linux/shim/client.go | 32 +++++++----- linux/shim/client_linux.go | 10 ++++ linux/shim/client_unix.go | 9 ++++ linux/shim/local.go | 17 +++++++ linux/shim/service.go | 63 +++++++++++++++++++++-- metrics/cgroups/cgroups.go | 13 ++--- plugin/context.go | 1 + runtime/monitor.go | 13 ----- server/server.go | 4 ++ 12 files changed, 149 insertions(+), 108 deletions(-) create mode 100644 linux/shim/client_linux.go create mode 100644 linux/shim/client_unix.go diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go index 8c83a9e63..16889183b 100644 --- a/cmd/containerd-shim/main_unix.go +++ b/cmd/containerd-shim/main_unix.go @@ -49,6 +49,10 @@ func main() { Name: "socket,s", Usage: "abstract socket path to serve on", }, + cli.StringFlag{ + Name: "address,a", + Usage: "grpc address back to containerd", + }, } app.Before = func(context *cli.Context) error { if context.GlobalBool("debug") { @@ -74,6 +78,7 @@ func main() { sv, err := shim.NewService( path, context.GlobalString("namespace"), + context.GlobalString("address"), ) if err != nil { return err diff --git a/linux/bundle.go b/linux/bundle.go index 06a7eb3cc..bdac0b1ce 100644 --- a/linux/bundle.go +++ b/linux/bundle.go @@ -56,8 +56,8 @@ type bundle struct { } // NewShim connects to the shim managing the bundle and tasks -func (b *bundle) NewShim(ctx context.Context, binary string, remote bool) (*client.Client, error) { - opt := client.WithStart(binary) +func (b *bundle) NewShim(ctx context.Context, binary, grpcAddress string, remote bool) (*client.Client, error) { + opt := client.WithStart(binary, grpcAddress) if !remote { opt = client.WithLocal } diff --git a/linux/runtime.go b/linux/runtime.go index 32aa6e398..37a06801e 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -8,7 +8,6 @@ import ( "io/ioutil" "os" "path/filepath" - "strings" "google.golang.org/grpc" @@ -84,22 +83,17 @@ func New(ic *plugin.InitContext) (interface{}, error) { return nil, err } cfg := ic.Config.(*Config) - c, cancel := context.WithCancel(ic.Context) r := &Runtime{ - root: ic.Root, - remote: !cfg.NoShim, - shim: cfg.Shim, - runtime: cfg.Runtime, - events: make(chan *eventsapi.RuntimeEvent, 2048), - eventsContext: c, - eventsCancel: cancel, - monitor: monitor.(runtime.TaskMonitor), - tasks: newTaskList(), - emitter: events.GetPoster(ic.Context), - db: m.(*bolt.DB), + root: ic.Root, + remote: !cfg.NoShim, + shim: cfg.Shim, + runtime: cfg.Runtime, + monitor: monitor.(runtime.TaskMonitor), + tasks: newTaskList(), + emitter: events.GetPoster(ic.Context), + db: m.(*bolt.DB), + address: ic.Address, } - // set the events output for a monitor if it generates events - r.monitor.Events(r.events) tasks, err := r.restoreTasks(ic.Context) if err != nil { return nil, err @@ -108,9 +102,6 @@ func New(ic *plugin.InitContext) (interface{}, error) { if err := r.tasks.addWithNamespace(t.namespace, t); err != nil { return nil, err } - if err := r.handleEvents(ic.Context, t.shim); err != nil { - return nil, err - } } return r, nil } @@ -120,14 +111,12 @@ type Runtime struct { shim string runtime string remote bool + address string - events chan *eventsapi.RuntimeEvent - eventsContext context.Context - eventsCancel func() - monitor runtime.TaskMonitor - tasks *taskList - emitter events.Poster - db *bolt.DB + monitor runtime.TaskMonitor + tasks *taskList + emitter events.Poster + db *bolt.DB } func (r *Runtime) ID() string { @@ -148,7 +137,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts bundle.Delete() } }() - s, err := bundle.NewShim(ctx, r.shim, r.remote) + s, err := bundle.NewShim(ctx, r.shim, r.address, r.remote) if err != nil { return nil, err } @@ -159,9 +148,6 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts } } }() - if err = r.handleEvents(ctx, s); err != nil { - return nil, err - } sopts := &shim.CreateTaskRequest{ ID: id, Bundle: bundle.path, @@ -332,48 +318,6 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { return o, nil } -func (r *Runtime) handleEvents(ctx context.Context, s *client.Client) error { - events, err := s.Stream(r.eventsContext, &shim.StreamEventsRequest{}) - if err != nil { - return err - } - go r.forward(ctx, events) - return nil -} - -// forward forwards events from a shim to the events service and monitors -func (r *Runtime) forward(ctx context.Context, events shim.Shim_StreamClient) { - for { - e, err := events.Recv() - if err != nil { - if !strings.HasSuffix(err.Error(), "transport is closing") { - log.G(r.eventsContext).WithError(err).Error("get event from shim") - } - return - } - r.events <- e - if err := r.emit(ctx, "/runtime/"+getTopic(e), e); err != nil { - return - } - } -} - -func getTopic(e *eventsapi.RuntimeEvent) string { - switch e.Type { - case eventsapi.RuntimeEvent_CREATE: - return "task-create" - case eventsapi.RuntimeEvent_START: - return "task-start" - case eventsapi.RuntimeEvent_EXEC_ADDED: - return "task-execadded" - case eventsapi.RuntimeEvent_OOM: - return "task-oom" - case eventsapi.RuntimeEvent_EXIT: - return "task-exit" - } - return "" -} - func (r *Runtime) terminate(ctx context.Context, bundle *bundle, ns, id string) error { ctx = namespaces.WithNamespace(ctx, ns) rt, err := r.getRuntime(ctx, ns, id) diff --git a/linux/shim/client.go b/linux/shim/client.go index 2b762c068..134bbfda2 100644 --- a/linux/shim/client.go +++ b/linux/shim/client.go @@ -1,4 +1,4 @@ -// +build linux +// +build !windows package shim @@ -10,7 +10,6 @@ import ( "os" "os/exec" "strings" - "syscall" "time" "golang.org/x/sys/unix" @@ -28,7 +27,7 @@ import ( type ClientOpt func(context.Context, Config) (shim.ShimClient, io.Closer, error) // WithStart executes a new shim process -func WithStart(binary string) ClientOpt { +func WithStart(binary, address string) ClientOpt { return func(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) { socket, err := newSocket(config) if err != nil { @@ -41,7 +40,7 @@ func WithStart(binary string) ClientOpt { } defer f.Close() - cmd := newCommand(binary, config, f) + cmd := newCommand(binary, address, config, f) if err := reaper.Default.Start(cmd); err != nil { return nil, nil, errors.Wrapf(err, "failed to start shim") } @@ -56,9 +55,10 @@ func WithStart(binary string) ClientOpt { } } -func newCommand(binary string, config Config, socket *os.File) *exec.Cmd { +func newCommand(binary, address string, config Config, socket *os.File) *exec.Cmd { args := []string{ "--namespace", config.Namespace, + "--address", address, } if config.Debug { args = append(args, "--debug") @@ -68,11 +68,12 @@ func newCommand(binary string, config Config, socket *os.File) *exec.Cmd { // make sure the shim can be re-parented to system init // and is cloned in a new mount namespace because the overlay/filesystems // will be mounted by the shim - cmd.SysProcAttr = &syscall.SysProcAttr{ - Cloneflags: syscall.CLONE_NEWNS, - Setpgid: true, - } + cmd.SysProcAttr = &atter cmd.ExtraFiles = append(cmd.ExtraFiles, socket) + if config.Debug { + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + } return cmd } @@ -88,12 +89,12 @@ func newSocket(config Config) (*net.UnixListener, error) { return l.(*net.UnixListener), nil } -func connect(address string) (*grpc.ClientConn, error) { +func connect(address string, d func(string, time.Duration) (net.Conn, error)) (*grpc.ClientConn, error) { gopts := []grpc.DialOption{ grpc.WithBlock(), grpc.WithInsecure(), grpc.WithTimeout(100 * time.Second), - grpc.WithDialer(dialer), + grpc.WithDialer(d), grpc.FailOnNonTempDialError(true), } conn, err := grpc.Dial(dialAddress(address), gopts...) @@ -104,6 +105,11 @@ func connect(address string) (*grpc.ClientConn, error) { } func dialer(address string, timeout time.Duration) (net.Conn, error) { + address = strings.TrimPrefix(address, "unix://") + return net.DialTimeout("unix", address, timeout) +} + +func annonDialer(address string, timeout time.Duration) (net.Conn, error) { address = strings.TrimPrefix(address, "unix://") return net.DialTimeout("unix", "\x00"+address, timeout) } @@ -114,7 +120,7 @@ func dialAddress(address string) string { // WithConnect connects to an existing shim func WithConnect(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) { - conn, err := connect(config.Address) + conn, err := connect(config.Address, annonDialer) if err != nil { return nil, nil, err } @@ -123,7 +129,7 @@ func WithConnect(ctx context.Context, config Config) (shim.ShimClient, io.Closer // WithLocal uses an in process shim func WithLocal(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) { - service, err := NewService(config.Path, config.Namespace) + service, err := NewService(config.Path, config.Namespace, "") if err != nil { return nil, nil, err } diff --git a/linux/shim/client_linux.go b/linux/shim/client_linux.go new file mode 100644 index 000000000..7bbb516f5 --- /dev/null +++ b/linux/shim/client_linux.go @@ -0,0 +1,10 @@ +// +build linux + +package shim + +import "syscall" + +var atter = syscall.SysProcAttr{ + Cloneflags: syscall.CLONE_NEWNS, + Setpgid: true, +} diff --git a/linux/shim/client_unix.go b/linux/shim/client_unix.go new file mode 100644 index 000000000..9eab8b9c8 --- /dev/null +++ b/linux/shim/client_unix.go @@ -0,0 +1,9 @@ +// +build !linux,!windows + +package shim + +import "syscall" + +var atter = syscall.SysProcAttr{ + Setpgid: true, +} diff --git a/linux/shim/local.go b/linux/shim/local.go index fdde79e90..b74301221 100644 --- a/linux/shim/local.go +++ b/linux/shim/local.go @@ -6,6 +6,7 @@ import ( "path/filepath" events "github.com/containerd/containerd/api/services/events/v1" + evt "github.com/containerd/containerd/events" shimapi "github.com/containerd/containerd/linux/shim/v1" google_protobuf "github.com/golang/protobuf/ptypes/empty" "golang.org/x/net/context" @@ -129,3 +130,19 @@ func (e *streamEvents) SendMsg(m interface{}) error { func (e *streamEvents) RecvMsg(m interface{}) error { return nil } + +type poster interface { + Post(ctx context.Context, in *events.PostEventRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) +} + +type localEventsClient struct { + emitter evt.Poster +} + +func (l *localEventsClient) Post(ctx context.Context, r *events.PostEventRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { + ctx = evt.WithTopic(ctx, r.Envelope.Topic) + if err := l.emitter.Post(ctx, r.Envelope); err != nil { + return nil, err + } + return empty, nil +} diff --git a/linux/shim/service.go b/linux/shim/service.go index a530bd61b..cac9cb048 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "sync" + "time" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -13,8 +14,12 @@ import ( "github.com/containerd/console" events "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/api/types/task" + evt "github.com/containerd/containerd/events" shimapi "github.com/containerd/containerd/linux/shim/v1" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/reaper" + "github.com/containerd/containerd/typeurl" google_protobuf "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" "golang.org/x/net/context" @@ -29,16 +34,32 @@ var empty = &google_protobuf.Empty{} const RuncRoot = "/run/containerd/runc" // NewService returns a new shim service that can be used via GRPC -func NewService(path, namespace string) (*Service, error) { +func NewService(path, namespace, address string) (*Service, error) { if namespace == "" { return nil, fmt.Errorf("shim namespace cannot be empty") } - return &Service{ + context := namespaces.WithNamespace(context.Background(), namespace) + var client poster + if address != "" { + conn, err := connect(address, dialer) + if err != nil { + return nil, errors.Wrapf(err, "failed to dial %q", address) + } + client = events.NewEventsClient(conn) + } else { + client = &localEventsClient{ + emitter: evt.GetPoster(context), + } + } + s := &Service{ path: path, processes: make(map[string]process), events: make(chan *events.RuntimeEvent, 4096), namespace: namespace, - }, nil + context: context, + } + go s.forward(client) + return s, nil } type Service struct { @@ -52,6 +73,7 @@ type Service struct { eventsMu sync.Mutex deferredEvent *events.RuntimeEvent namespace string + context context.Context } func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*shimapi.CreateTaskResponse, error) { @@ -367,3 +389,38 @@ func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, er } return pids, nil } + +func (s *Service) forward(client poster) { + for e := range s.events { + a, err := typeurl.MarshalAny(e) + if err != nil { + log.G(s.context).WithError(err).Error("marshal event") + continue + } + if _, err := client.Post(s.context, &events.PostEventRequest{ + Envelope: &events.Envelope{ + Timestamp: time.Now(), + Topic: "/runtime/" + getTopic(e), + Event: a, + }, + }); err != nil { + log.G(s.context).WithError(err).Error("post event") + } + } +} + +func getTopic(e *events.RuntimeEvent) string { + switch e.Type { + case events.RuntimeEvent_CREATE: + return "task-create" + case events.RuntimeEvent_START: + return "task-start" + case events.RuntimeEvent_EXEC_ADDED: + return "task-execadded" + case events.RuntimeEvent_OOM: + return "task-oom" + case events.RuntimeEvent_EXIT: + return "task-exit" + } + return "?" +} diff --git a/metrics/cgroups/cgroups.go b/metrics/cgroups/cgroups.go index 373ed079b..ddb811317 100644 --- a/metrics/cgroups/cgroups.go +++ b/metrics/cgroups/cgroups.go @@ -7,6 +7,8 @@ import ( "github.com/containerd/cgroups" events "github.com/containerd/containerd/api/services/events/v1" + evt "github.com/containerd/containerd/events" + "github.com/containerd/containerd/log" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime" metrics "github.com/docker/go-metrics" @@ -35,6 +37,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { collector: collector, oom: oom, context: ic.Context, + emitter: ic.Emitter, }, nil } @@ -42,7 +45,7 @@ type cgroupsMonitor struct { collector *Collector oom *OOMCollector context context.Context - events chan<- *events.RuntimeEvent + emitter *evt.Emitter } func (m *cgroupsMonitor) Monitor(c runtime.Task) error { @@ -67,15 +70,13 @@ func (m *cgroupsMonitor) Stop(c runtime.Task) error { return nil } -func (m *cgroupsMonitor) Events(events chan<- *events.RuntimeEvent) { - m.events = events -} - func (m *cgroupsMonitor) trigger(id string, cg cgroups.Cgroup) { - m.events <- &events.RuntimeEvent{ + if err := m.emitter.Post(m.context, &events.RuntimeEvent{ Timestamp: time.Now(), Type: events.RuntimeEvent_OOM, ID: id, ContainerID: id, + }); err != nil { + log.G(m.context).WithError(err).Error("post OOM event") } } diff --git a/plugin/context.go b/plugin/context.go index 7b84745bd..57e190d95 100644 --- a/plugin/context.go +++ b/plugin/context.go @@ -19,6 +19,7 @@ func NewContext(ctx context.Context, plugins map[PluginType]map[string]interface type InitContext struct { Root string + Address string Context context.Context Config interface{} Emitter *events.Emitter diff --git a/runtime/monitor.go b/runtime/monitor.go index 9e3c3f11a..4c61858ba 100644 --- a/runtime/monitor.go +++ b/runtime/monitor.go @@ -1,15 +1,11 @@ package runtime -import events "github.com/containerd/containerd/api/services/events/v1" - // TaskMonitor provides an interface for monitoring of containers within containerd type TaskMonitor interface { // Monitor adds the provided container to the monitor Monitor(Task) error // Stop stops and removes the provided container from the monitor Stop(Task) error - // Events emits events to the channel for the monitor - Events(chan<- *events.RuntimeEvent) } func NewMultiTaskMonitor(monitors ...TaskMonitor) TaskMonitor { @@ -33,9 +29,6 @@ func (mm *noopTaskMonitor) Stop(c Task) error { return nil } -func (mm *noopTaskMonitor) Events(events chan<- *events.RuntimeEvent) { -} - type multiTaskMonitor struct { monitors []TaskMonitor } @@ -57,9 +50,3 @@ func (mm *multiTaskMonitor) Stop(c Task) error { } return nil } - -func (mm *multiTaskMonitor) Events(events chan<- *events.RuntimeEvent) { - for _, m := range mm.monitors { - m.Events(events) - } -} diff --git a/server/server.go b/server/server.go index 244346c6a..009fc7b01 100644 --- a/server/server.go +++ b/server/server.go @@ -13,6 +13,7 @@ import ( containers "github.com/containerd/containerd/api/services/containers/v1" content "github.com/containerd/containerd/api/services/content/v1" diff "github.com/containerd/containerd/api/services/diff/v1" + eventsapi "github.com/containerd/containerd/api/services/events/v1" images "github.com/containerd/containerd/api/services/images/v1" namespaces "github.com/containerd/containerd/api/services/namespaces/v1" snapshot "github.com/containerd/containerd/api/services/snapshot/v1" @@ -68,6 +69,7 @@ func New(ctx context.Context, config *Config) (*Server, error) { id, ) initContext.Emitter = s.emitter + initContext.Address = config.GRPC.Address // load the plugin specific configuration if it is provided if p.Config != nil { @@ -203,6 +205,8 @@ func interceptor( ctx = log.WithModule(ctx, "diff") case namespaces.NamespacesServer: ctx = log.WithModule(ctx, "namespaces") + case eventsapi.EventsServer: + ctx = log.WithModule(ctx, "events") default: log.G(ctx).Warnf("unknown GRPC server type: %#v\n", info.Server) }