diff --git a/events/exchange.go b/events/exchange/exchange.go similarity index 92% rename from events/exchange.go rename to events/exchange/exchange.go index eeeeea362..3fefb9c25 100644 --- a/events/exchange.go +++ b/events/exchange/exchange.go @@ -1,12 +1,13 @@ -package events +package exchange import ( "context" "strings" "time" - events "github.com/containerd/containerd/api/services/events/v1" + v1 "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" "github.com/containerd/containerd/filters" "github.com/containerd/containerd/identifiers" "github.com/containerd/containerd/log" @@ -34,7 +35,7 @@ func NewExchange() *Exchange { // // This is useful when an event is forwaded on behalf of another namespace or // when the event is propagated on behalf of another publisher. -func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err error) { +func (e *Exchange) Forward(ctx context.Context, envelope *v1.Envelope) (err error) { if err := validateEnvelope(envelope); err != nil { return err } @@ -59,11 +60,11 @@ func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err // Publish packages and sends an event. The caller will be considered the // initial publisher of the event. This means the timestamp will be calculated // at this point and this method may read from the calling context. -func (e *Exchange) Publish(ctx context.Context, topic string, event Event) (err error) { +func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event) (err error) { var ( namespace string encoded *types.Any - envelope events.Envelope + envelope v1.Envelope ) namespace, err = namespaces.NamespaceRequired(ctx) @@ -108,9 +109,9 @@ func (e *Exchange) Publish(ctx context.Context, topic string, event Event) (err // Zero or more filters may be provided as strings. Only events that match // *any* of the provided filters will be sent on the channel. The filters use // the standard containerd filters package syntax. -func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) { +func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *v1.Envelope, errs <-chan error) { var ( - evch = make(chan *events.Envelope) + evch = make(chan *v1.Envelope) errq = make(chan error, 1) channel = goevents.NewChannel(0) queue = goevents.NewQueue(channel) @@ -150,7 +151,7 @@ func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *even for { select { case ev := <-channel.C: - env, ok := ev.(*events.Envelope) + env, ok := ev.(*v1.Envelope) if !ok { // TODO(stevvooe): For the most part, we are well protected // from this condition. Both Forward and Publish protect @@ -204,7 +205,7 @@ func validateTopic(topic string) error { return nil } -func validateEnvelope(envelope *events.Envelope) error { +func validateEnvelope(envelope *v1.Envelope) error { if err := namespaces.Validate(envelope.Namespace); err != nil { return errors.Wrapf(err, "event envelope has invalid namespace") } diff --git a/events/exchange_test.go b/events/exchange/exchange_test.go similarity index 87% rename from events/exchange_test.go rename to events/exchange/exchange_test.go index 5493fa305..9aa8f9960 100644 --- a/events/exchange_test.go +++ b/events/exchange/exchange_test.go @@ -1,4 +1,4 @@ -package events +package exchange import ( "context" @@ -8,8 +8,9 @@ import ( "testing" "time" - events "github.com/containerd/containerd/api/services/events/v1" + v1 "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" "github.com/containerd/containerd/namespaces" "github.com/containerd/typeurl" "github.com/pkg/errors" @@ -17,10 +18,10 @@ import ( func TestExchangeBasic(t *testing.T) { ctx := namespaces.WithNamespace(context.Background(), t.Name()) - testevents := []Event{ - &events.ContainerCreate{ID: "asdf"}, - &events.ContainerCreate{ID: "qwer"}, - &events.ContainerCreate{ID: "zxcv"}, + testevents := []events.Event{ + &v1.ContainerCreate{ID: "asdf"}, + &v1.ContainerCreate{ID: "qwer"}, + &v1.ContainerCreate{ID: "zxcv"}, } exchange := NewExchange() @@ -55,7 +56,7 @@ func TestExchangeBasic(t *testing.T) { wg.Wait() for _, subscriber := range []struct { - eventq <-chan *events.Envelope + eventq <-chan *v1.Envelope errq <-chan error cancel func() }{ @@ -79,7 +80,7 @@ func TestExchangeBasic(t *testing.T) { if err != nil { t.Fatal(err) } - received = append(received, ev.(*events.ContainerCreate)) + received = append(received, ev.(*v1.ContainerCreate)) case err := <-subscriber.errq: if err != nil { t.Fatal(err) @@ -117,7 +118,7 @@ func TestExchangeValidateTopic(t *testing.T) { }, } { t.Run(testcase.input, func(t *testing.T) { - event := &events.ContainerCreate{ID: t.Name()} + event := &v1.ContainerCreate{ID: t.Name()} if err := exchange.Publish(ctx, testcase.input, event); errors.Cause(err) != testcase.err { if err == nil { t.Fatalf("expected error %v, received nil", testcase.err) @@ -131,7 +132,7 @@ func TestExchangeValidateTopic(t *testing.T) { t.Fatal(err) } - envelope := events.Envelope{ + envelope := v1.Envelope{ Timestamp: time.Now().UTC(), Namespace: namespace, Topic: testcase.input, diff --git a/linux/bundle.go b/linux/bundle.go index 29ab6ba5f..07a96d37b 100644 --- a/linux/bundle.go +++ b/linux/bundle.go @@ -9,7 +9,7 @@ import ( "os" "path/filepath" - "github.com/containerd/containerd/events" + "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/linux/runcopts" client "github.com/containerd/containerd/linux/shim" "github.com/pkg/errors" @@ -82,7 +82,7 @@ func ShimRemote(shim, daemonAddress, cgroup string, nonewns, debug bool, exitHan } // ShimLocal is a ShimOpt for using an in process shim implementation -func ShimLocal(exchange *events.Exchange) ShimOpt { +func ShimLocal(exchange *exchange.Exchange) ShimOpt { return func(b *bundle, ns string, ropts *runcopts.RuncOptions) (client.Config, client.ClientOpt) { return b.shimConfig(ns, ropts), client.WithLocal(exchange) } diff --git a/linux/runtime.go b/linux/runtime.go index 26d001f8a..44219e40d 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -15,7 +15,7 @@ import ( "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" + "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/identifiers" "github.com/containerd/containerd/linux/runcopts" client "github.com/containerd/containerd/linux/shim" @@ -143,7 +143,7 @@ type Runtime struct { monitor runtime.TaskMonitor tasks *runtime.TaskList db *metadata.DB - events *events.Exchange + events *exchange.Exchange config *Config } diff --git a/plugin/context.go b/plugin/context.go index 67ad2aa44..87e53b84f 100644 --- a/plugin/context.go +++ b/plugin/context.go @@ -5,7 +5,7 @@ import ( "path/filepath" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" + "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/log" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -18,7 +18,7 @@ type InitContext struct { State string Config interface{} Address string - Events *events.Exchange + Events *exchange.Exchange Meta *Meta // plugins can fill in metadata at init. diff --git a/server/server.go b/server/server.go index d1a58e915..b585b4a36 100644 --- a/server/server.go +++ b/server/server.go @@ -22,7 +22,7 @@ import ( version "github.com/containerd/containerd/api/services/version/v1" "github.com/containerd/containerd/content" "github.com/containerd/containerd/content/local" - "github.com/containerd/containerd/events" + "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/plugin" @@ -65,7 +65,7 @@ func New(ctx context.Context, config *Config) (*Server, error) { services []plugin.Service s = &Server{ rpc: rpc, - events: events.NewExchange(), + events: exchange.NewExchange(), } initialized = plugin.NewPluginSet() ) @@ -122,7 +122,7 @@ func New(ctx context.Context, config *Config) (*Server, error) { // Server is the containerd main daemon type Server struct { rpc *grpc.Server - events *events.Exchange + events *exchange.Exchange } // ServeGRPC provides the containerd grpc APIs on the provided listener diff --git a/services/events/service.go b/services/events/service.go index c6f080b8c..eb5c33e6f 100644 --- a/services/events/service.go +++ b/services/events/service.go @@ -3,7 +3,7 @@ package events import ( api "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" + "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/plugin" "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" @@ -22,11 +22,11 @@ func init() { } type service struct { - events *events.Exchange + events *exchange.Exchange } // NewService returns the GRPC events server -func NewService(events *events.Exchange) api.EventsServer { +func NewService(events *exchange.Exchange) api.EventsServer { return &service{events: events} }