diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go index 1f51b1aea..5e719b19c 100644 --- a/cmd/containerd-shim/main_unix.go +++ b/cmd/containerd-shim/main_unix.go @@ -3,28 +3,27 @@ package main import ( + "bytes" "context" + "errors" "flag" "fmt" "net" "os" + "os/exec" "runtime" "strings" "sync" "syscall" - "time" - eventsapi "github.com/containerd/containerd/api/services/events/v1" - "github.com/containerd/containerd/dialer" - "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" "github.com/containerd/containerd/linux/proc" "github.com/containerd/containerd/linux/shim" shimapi "github.com/containerd/containerd/linux/shim/v1" + "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/reaper" "github.com/containerd/typeurl" google_protobuf "github.com/golang/protobuf/ptypes/empty" - "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/sys/unix" "google.golang.org/grpc" @@ -85,10 +84,6 @@ func executeShim() error { return err } server := newServer() - e, err := connectEvents(addressFlag) - if err != nil { - return err - } sv, err := shim.NewService( shim.Config{ Path: path, @@ -98,7 +93,7 @@ func executeShim() error { SystemdCgroup: systemdCgroupFlag, RuntimeRoot: runtimeRootFlag, }, - &remoteEventsPublisher{client: e}, + &remoteEventsPublisher{address: addressFlag}, ) if err != nil { return err @@ -193,44 +188,29 @@ func dumpStacks(logger *logrus.Entry) { logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) } -func connectEvents(address string) (eventsapi.EventsClient, error) { - conn, err := connect(address, dialer.Dialer) - if err != nil { - return nil, errors.Wrapf(err, "failed to dial %q", address) - } - return eventsapi.NewEventsClient(conn), nil -} - -func connect(address string, d func(string, time.Duration) (net.Conn, error)) (*grpc.ClientConn, error) { - gopts := []grpc.DialOption{ - grpc.WithBlock(), - grpc.WithInsecure(), - grpc.WithTimeout(60 * time.Second), - grpc.WithDialer(d), - grpc.FailOnNonTempDialError(true), - grpc.WithBackoffMaxDelay(3 * time.Second), - } - conn, err := grpc.Dial(dialer.DialAddress(address), gopts...) - if err != nil { - return nil, errors.Wrapf(err, "failed to dial %q", address) - } - return conn, nil -} - type remoteEventsPublisher struct { - client eventsapi.EventsClient + address string } func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { + ns, _ := namespaces.Namespace(ctx) encoded, err := typeurl.MarshalAny(event) if err != nil { return err } - if _, err := l.client.Publish(ctx, &eventsapi.PublishRequest{ - Topic: topic, - Event: encoded, - }); err != nil { - return errdefs.FromGRPC(err) + data, err := encoded.Marshal() + if err != nil { + return err + } + cmd := exec.CommandContext(ctx, "containerd", "--address", l.address, "publish", "--topic", topic, "--namespace", ns) + cmd.Stdin = bytes.NewReader(data) + c, err := reaper.Default.Start(cmd) + if err != nil { + return err + } + exit := <-c + if exit.Status != 0 { + return errors.New("failed to publish event") } return nil } diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index 3b6d2fcb1..04a178af8 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -73,6 +73,7 @@ func main() { } app.Commands = []cli.Command{ configCommand, + publishCommand, } app.Action = func(context *cli.Context) error { var ( diff --git a/cmd/containerd/publish.go b/cmd/containerd/publish.go new file mode 100644 index 000000000..b975c0759 --- /dev/null +++ b/cmd/containerd/publish.go @@ -0,0 +1,92 @@ +package main + +import ( + gocontext "context" + "io" + "io/ioutil" + "net" + "os" + "time" + + eventsapi "github.com/containerd/containerd/api/services/events/v1" + "github.com/containerd/containerd/dialer" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/namespaces" + "github.com/gogo/protobuf/types" + "github.com/pkg/errors" + "github.com/urfave/cli" + "google.golang.org/grpc" +) + +var publishCommand = cli.Command{ + Name: "publish", + Usage: "binary to publish events to containerd", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "namespace", + Usage: "namespace to publish to", + }, + cli.StringFlag{ + Name: "topic", + Usage: "topic of the event", + }, + }, + Action: func(context *cli.Context) error { + ctx := namespaces.WithNamespace(gocontext.Background(), context.String("namespace")) + topic := context.String("topic") + if topic == "" { + return errors.New("topic required to publish event") + } + payload, err := getEventPayload(os.Stdin) + if err != nil { + return err + } + client, err := connectEvents(context.GlobalString("address")) + if err != nil { + return err + } + if _, err := client.Publish(ctx, &eventsapi.PublishRequest{ + Topic: topic, + Event: payload, + }); err != nil { + return errdefs.FromGRPC(err) + } + return nil + }, +} + +func getEventPayload(r io.Reader) (*types.Any, error) { + data, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + var any types.Any + if err := any.Unmarshal(data); err != nil { + return nil, err + } + return &any, nil +} + +func connectEvents(address string) (eventsapi.EventsClient, error) { + conn, err := connect(address, dialer.Dialer) + if err != nil { + return nil, errors.Wrapf(err, "failed to dial %q", address) + } + return eventsapi.NewEventsClient(conn), nil +} + +func connect(address string, d func(string, time.Duration) (net.Conn, error)) (*grpc.ClientConn, error) { + gopts := []grpc.DialOption{ + grpc.WithBlock(), + grpc.WithInsecure(), + grpc.WithTimeout(60 * time.Second), + grpc.WithDialer(d), + grpc.FailOnNonTempDialError(true), + grpc.WithBackoffMaxDelay(3 * time.Second), + } + conn, err := grpc.Dial(dialer.DialAddress(address), gopts...) + if err != nil { + return nil, errors.Wrapf(err, "failed to dial %q", address) + } + return conn, nil +}