From 7ed88c1e36bc11f591ca0df7d293808ee4e7e283 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Mon, 31 Jul 2017 13:27:45 -0700 Subject: [PATCH] linux/shim: use events.Publisher interface Signed-off-by: Stephen J Day --- cmd/containerd-shim/main_unix.go | 36 +++++++++++++++----- linux/shim/client.go | 4 +-- linux/shim/local.go | 17 ---------- linux/shim/service.go | 57 ++++++++++++++------------------ 4 files changed, 54 insertions(+), 60 deletions(-) diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go index 88fbffda9..1259de596 100644 --- a/cmd/containerd-shim/main_unix.go +++ b/cmd/containerd-shim/main_unix.go @@ -3,6 +3,7 @@ package main import ( + "context" "fmt" "net" "os" @@ -10,18 +11,19 @@ import ( "strings" "time" - "golang.org/x/sys/unix" - - "google.golang.org/grpc" - - events "github.com/containerd/containerd/api/services/events/v1" + eventsapi "github.com/containerd/containerd/api/services/events/v1" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" "github.com/containerd/containerd/linux/shim" shimapi "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/reaper" + "github.com/containerd/containerd/typeurl" "github.com/containerd/containerd/version" "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/urfave/cli" + "golang.org/x/sys/unix" + "google.golang.org/grpc" ) const usage = ` @@ -82,7 +84,7 @@ func main() { sv, err := shim.NewService( path, context.GlobalString("namespace"), - e, + &remoteEventsPublisher{client: e}, ) if err != nil { return err @@ -163,12 +165,12 @@ func dumpStacks() { logrus.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) } -func connectEvents(address string) (events.EventsClient, error) { +func connectEvents(address string) (eventsapi.EventsClient, error) { conn, err := connect(address, dialer) if err != nil { return nil, errors.Wrapf(err, "failed to dial %q", address) } - return events.NewEventsClient(conn), nil + return eventsapi.NewEventsClient(conn), nil } func connect(address string, d func(string, time.Duration) (net.Conn, error)) (*grpc.ClientConn, error) { @@ -194,3 +196,21 @@ func dialer(address string, timeout time.Duration) (net.Conn, error) { func dialAddress(address string) string { return fmt.Sprintf("unix://%s", address) } + +type remoteEventsPublisher struct { + client eventsapi.EventsClient +} + +func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { + encoded, err := typeurl.MarshalAny(event) + if err != nil { + return err + } + if _, err := l.client.Publish(ctx, &eventsapi.PublishRequest{ + Topic: topic, + Event: encoded, + }); err != nil { + return errdefs.FromGRPC(err) + } + return nil +} diff --git a/linux/shim/client.go b/linux/shim/client.go index d2c1c02be..e80cb239d 100644 --- a/linux/shim/client.go +++ b/linux/shim/client.go @@ -150,9 +150,9 @@ func WithConnect(ctx context.Context, config Config) (shim.ShimClient, io.Closer } // WithLocal uses an in process shim -func WithLocal(events *events.Exchange) func(context.Context, Config) (shim.ShimClient, io.Closer, error) { +func WithLocal(publisher events.Publisher) func(context.Context, Config) (shim.ShimClient, io.Closer, error) { return func(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) { - service, err := NewService(config.Path, config.Namespace, &localEventsClient{publisher: events}) + service, err := NewService(config.Path, config.Namespace, publisher) if err != nil { return nil, nil, err } diff --git a/linux/shim/local.go b/linux/shim/local.go index fee614222..eda8ec52b 100644 --- a/linux/shim/local.go +++ b/linux/shim/local.go @@ -5,8 +5,6 @@ package shim import ( "path/filepath" - events "github.com/containerd/containerd/api/services/events/v1" - evt "github.com/containerd/containerd/events" shimapi "github.com/containerd/containerd/linux/shim/v1" google_protobuf "github.com/golang/protobuf/ptypes/empty" "golang.org/x/net/context" @@ -88,18 +86,3 @@ func (c *local) ShimInfo(ctx context.Context, in *google_protobuf.Empty, opts .. func (c *local) Update(ctx context.Context, in *shimapi.UpdateTaskRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { return c.s.Update(ctx, in) } - -type publisher interface { - Publish(ctx context.Context, in *events.PublishRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) -} - -type localEventsClient struct { - publisher evt.Publisher -} - -func (l *localEventsClient) Publish(ctx context.Context, r *events.PublishRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { - if err := l.publisher.Publish(ctx, r.Topic, r.Event); err != nil { - return nil, err - } - return empty, nil -} diff --git a/linux/shim/service.go b/linux/shim/service.go index 53e995484..9ed22b36d 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -11,15 +11,15 @@ import ( "google.golang.org/grpc/codes" "github.com/containerd/console" - events "github.com/containerd/containerd/api/services/events/v1" + eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" shimapi "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/reaper" "github.com/containerd/containerd/runtime" - "github.com/containerd/containerd/typeurl" google_protobuf "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" "golang.org/x/net/context" @@ -30,7 +30,7 @@ var empty = &google_protobuf.Empty{} const RuncRoot = "/run/containerd/runc" // NewService returns a new shim service that can be used via GRPC -func NewService(path, namespace string, client publisher) (*Service, error) { +func NewService(path, namespace string, publisher events.Publisher) (*Service, error) { if namespace == "" { return nil, fmt.Errorf("shim namespace cannot be empty") } @@ -45,7 +45,7 @@ func NewService(path, namespace string, client publisher) (*Service, error) { if err := s.initPlatform(); err != nil { return nil, errors.Wrap(err, "failed to initialized platform behavior") } - go s.forward(client) + go s.forward(publisher) return s, nil } @@ -89,11 +89,11 @@ func (s *Service) Create(ctx context.Context, r *shimapi.CreateTaskRequest) (*sh ExitCh: make(chan int, 1), } reaper.Default.Register(pid, cmd) - s.events <- &events.TaskCreate{ + s.events <- &eventsapi.TaskCreate{ ContainerID: r.ID, Bundle: r.Bundle, Rootfs: r.Rootfs, - IO: &events.TaskIO{ + IO: &eventsapi.TaskIO{ Stdin: r.Stdin, Stdout: r.Stdout, Stderr: r.Stderr, @@ -115,7 +115,7 @@ func (s *Service) Start(ctx context.Context, r *google_protobuf.Empty) (*google_ if err := s.initProcess.Start(ctx); err != nil { return nil, err } - s.events <- &events.TaskStart{ + s.events <- &eventsapi.TaskStart{ ContainerID: s.id, Pid: uint32(s.initProcess.Pid()), } @@ -132,7 +132,7 @@ func (s *Service) Delete(ctx context.Context, r *google_protobuf.Empty) (*shimap s.mu.Lock() delete(s.processes, p.ID()) s.mu.Unlock() - s.events <- &events.TaskDelete{ + s.events <- &eventsapi.TaskDelete{ ContainerID: s.id, ExitStatus: uint32(p.Status()), ExitedAt: p.ExitedAt(), @@ -188,7 +188,7 @@ func (s *Service) Exec(ctx context.Context, r *shimapi.ExecProcessRequest) (*shi reaper.Default.Register(pid, cmd) s.processes[r.ID] = process - s.events <- &events.TaskExecAdded{ + s.events <- &eventsapi.TaskExecAdded{ ContainerID: s.id, ExecID: r.ID, Pid: uint32(pid), @@ -262,7 +262,7 @@ func (s *Service) Pause(ctx context.Context, r *google_protobuf.Empty) (*google_ if err := s.initProcess.Pause(ctx); err != nil { return nil, err } - s.events <- &events.TaskPaused{ + s.events <- &eventsapi.TaskPaused{ ContainerID: s.id, } return empty, nil @@ -275,7 +275,7 @@ func (s *Service) Resume(ctx context.Context, r *google_protobuf.Empty) (*google if err := s.initProcess.Resume(ctx); err != nil { return nil, err } - s.events <- &events.TaskResumed{ + s.events <- &eventsapi.TaskResumed{ ContainerID: s.id, } return empty, nil @@ -329,7 +329,7 @@ func (s *Service) Checkpoint(ctx context.Context, r *shimapi.CheckpointTaskReque if err := s.initProcess.Checkpoint(ctx, r); err != nil { return nil, errdefs.ToGRPC(err) } - s.events <- &events.TaskCheckpointed{ + s.events <- &eventsapi.TaskCheckpointed{ ContainerID: s.id, } return empty, nil @@ -356,7 +356,7 @@ func (s *Service) waitExit(p process, pid int, cmd *reaper.Cmd) { p.Exited(status) reaper.Default.Delete(pid) - s.events <- &events.TaskExit{ + s.events <- &eventsapi.TaskExit{ ContainerID: s.id, ID: p.ID(), Pid: uint32(pid), @@ -377,18 +377,9 @@ func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, er return pids, nil } -func (s *Service) forward(client publisher) { +func (s *Service) forward(publisher events.Publisher) { for e := range s.events { - a, err := typeurl.MarshalAny(e) - if err != nil { - log.G(s.context).WithError(err).Error("marshal event") - continue - } - - if _, err := client.Publish(s.context, &events.PublishRequest{ - Topic: getTopic(e), - Event: a, - }); err != nil { + if err := publisher.Publish(s.context, getTopic(e), e); err != nil { log.G(s.context).WithError(err).Error("post event") } } @@ -396,23 +387,23 @@ func (s *Service) forward(client publisher) { func getTopic(e interface{}) string { switch e.(type) { - case *events.TaskCreate: + case *eventsapi.TaskCreate: return runtime.TaskCreateEventTopic - case *events.TaskStart: + case *eventsapi.TaskStart: return runtime.TaskStartEventTopic - case *events.TaskOOM: + case *eventsapi.TaskOOM: return runtime.TaskOOMEventTopic - case *events.TaskExit: + case *eventsapi.TaskExit: return runtime.TaskExitEventTopic - case *events.TaskDelete: + case *eventsapi.TaskDelete: return runtime.TaskDeleteEventTopic - case *events.TaskExecAdded: + case *eventsapi.TaskExecAdded: return runtime.TaskExecAddedEventTopic - case *events.TaskPaused: + case *eventsapi.TaskPaused: return runtime.TaskPausedEventTopic - case *events.TaskResumed: + case *eventsapi.TaskResumed: return runtime.TaskResumedEventTopic - case *events.TaskCheckpointed: + case *eventsapi.TaskCheckpointed: return runtime.TaskCheckpointedEventTopic } return "?"