From a615a6fe5dd3a6a6689ccb119b0a9632100c7a50 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Mon, 24 Jul 2017 22:19:02 -0700 Subject: [PATCH] events: refactor event distribution In the course of setting out to add filters and address some cleanup, it was found that we had a few problems in the events subsystem that needed addressing before moving forward. The biggest change was to move to the more standard terminology of publish and subscribe. We make this terminology change across the Go interface and the GRPC API, making the behavior more familier. The previous system was very context-oriented, which is no longer required. With this, we've removed a large amount of dead and unneeded code. Event transactions, context storage and the concept of `Poster` is gone. This has been replaced in most places with a `Publisher`, which matches the actual usage throughout the codebase, removing the need for helpers. There are still some questions around the way events are handled in the shim. Right now, we've preserved some of the existing bugs which may require more extensive changes to resolve correctly. Signed-off-by: Stephen J Day --- api/services/events/v1/container.pb.go | 4 +- api/services/events/v1/events.pb.go | 304 ++++++++++++++++--------- api/services/events/v1/events.proto | 15 +- cmd/ctr/events.go | 2 +- events/emitter.go | 67 ------ events/event.go | 3 - events/events.go | 24 ++ events/events_test.go | 33 --- events/exchange.go | 162 +++++++++++++ events/exchange_test.go | 149 ++++++++++++ events/poster.go | 65 ------ events/sink.go | 65 ------ events/topic.go | 35 --- events/transaction.go | 96 -------- linux/shim/local.go | 11 +- linux/shim/service.go | 12 +- metrics/cgroups/cgroups.go | 10 +- plugin/context.go | 2 +- process.go | 2 +- server/server.go | 12 +- services/containers/service.go | 26 +-- services/content/service.go | 19 +- services/events/service.go | 56 +++-- services/images/service.go | 28 +-- services/namespaces/service.go | 28 +-- services/snapshot/service.go | 20 +- services/tasks/service.go | 26 +-- task.go | 2 +- windows/runtime.go | 16 +- windows/task.go | 19 +- 30 files changed, 669 insertions(+), 644 deletions(-) delete mode 100644 events/emitter.go delete mode 100644 events/event.go create mode 100644 events/events.go delete mode 100644 events/events_test.go create mode 100644 events/exchange.go create mode 100644 events/exchange_test.go delete mode 100644 events/poster.go delete mode 100644 events/sink.go delete mode 100644 events/topic.go delete mode 100644 events/transaction.go diff --git a/api/services/events/v1/container.pb.go b/api/services/events/v1/container.pb.go index 362cafede..1df8f81dc 100644 --- a/api/services/events/v1/container.pb.go +++ b/api/services/events/v1/container.pb.go @@ -19,8 +19,8 @@ ContainerUpdate ContainerDelete ContentDelete - StreamEventsRequest - PostEventRequest + SubscribeRequest + PublishRequest Envelope ImageCreate ImageUpdate diff --git a/api/services/events/v1/events.pb.go b/api/services/events/v1/events.pb.go index 61632cd52..f736c9b09 100644 --- a/api/services/events/v1/events.pb.go +++ b/api/services/events/v1/events.pb.go @@ -32,25 +32,27 @@ var _ = fmt.Errorf var _ = math.Inf var _ = time.Kitchen -type StreamEventsRequest struct { +type SubscribeRequest struct { + Filters []string `protobuf:"bytes,1,rep,name=filters" json:"filters,omitempty"` } -func (m *StreamEventsRequest) Reset() { *m = StreamEventsRequest{} } -func (*StreamEventsRequest) ProtoMessage() {} -func (*StreamEventsRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{0} } +func (m *SubscribeRequest) Reset() { *m = SubscribeRequest{} } +func (*SubscribeRequest) ProtoMessage() {} +func (*SubscribeRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{0} } -type PostEventRequest struct { +type PublishRequest struct { Envelope *Envelope `protobuf:"bytes,1,opt,name=envelope" json:"envelope,omitempty"` } -func (m *PostEventRequest) Reset() { *m = PostEventRequest{} } -func (*PostEventRequest) ProtoMessage() {} -func (*PostEventRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{1} } +func (m *PublishRequest) Reset() { *m = PublishRequest{} } +func (*PublishRequest) ProtoMessage() {} +func (*PublishRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{1} } type Envelope struct { Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,stdtime" json:"timestamp"` - Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"` - Event *google_protobuf1.Any `protobuf:"bytes,3,opt,name=event" json:"event,omitempty"` + Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` + Topic string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"` + Event *google_protobuf1.Any `protobuf:"bytes,4,opt,name=event" json:"event,omitempty"` } func (m *Envelope) Reset() { *m = Envelope{} } @@ -58,8 +60,8 @@ func (*Envelope) ProtoMessage() {} func (*Envelope) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{2} } func init() { - proto.RegisterType((*StreamEventsRequest)(nil), "containerd.services.events.v1.StreamEventsRequest") - proto.RegisterType((*PostEventRequest)(nil), "containerd.services.events.v1.PostEventRequest") + proto.RegisterType((*SubscribeRequest)(nil), "containerd.services.events.v1.SubscribeRequest") + proto.RegisterType((*PublishRequest)(nil), "containerd.services.events.v1.PublishRequest") proto.RegisterType((*Envelope)(nil), "containerd.services.events.v1.Envelope") } @@ -74,8 +76,8 @@ const _ = grpc.SupportPackageIsVersion4 // Client API for Events service type EventsClient interface { - Stream(ctx context.Context, in *StreamEventsRequest, opts ...grpc.CallOption) (Events_StreamClient, error) - Post(ctx context.Context, in *PostEventRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error) + Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error) + Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (Events_SubscribeClient, error) } type eventsClient struct { @@ -86,12 +88,21 @@ func NewEventsClient(cc *grpc.ClientConn) EventsClient { return &eventsClient{cc} } -func (c *eventsClient) Stream(ctx context.Context, in *StreamEventsRequest, opts ...grpc.CallOption) (Events_StreamClient, error) { - stream, err := grpc.NewClientStream(ctx, &_Events_serviceDesc.Streams[0], c.cc, "/containerd.services.events.v1.Events/Stream", opts...) +func (c *eventsClient) Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error) { + out := new(google_protobuf2.Empty) + err := grpc.Invoke(ctx, "/containerd.services.events.v1.Events/Publish", in, out, c.cc, opts...) if err != nil { return nil, err } - x := &eventsStreamClient{stream} + return out, nil +} + +func (c *eventsClient) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (Events_SubscribeClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Events_serviceDesc.Streams[0], c.cc, "/containerd.services.events.v1.Events/Subscribe", opts...) + if err != nil { + return nil, err + } + x := &eventsSubscribeClient{stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } @@ -101,16 +112,16 @@ func (c *eventsClient) Stream(ctx context.Context, in *StreamEventsRequest, opts return x, nil } -type Events_StreamClient interface { +type Events_SubscribeClient interface { Recv() (*Envelope, error) grpc.ClientStream } -type eventsStreamClient struct { +type eventsSubscribeClient struct { grpc.ClientStream } -func (x *eventsStreamClient) Recv() (*Envelope, error) { +func (x *eventsSubscribeClient) Recv() (*Envelope, error) { m := new(Envelope) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err @@ -118,85 +129,76 @@ func (x *eventsStreamClient) Recv() (*Envelope, error) { return m, nil } -func (c *eventsClient) Post(ctx context.Context, in *PostEventRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error) { - out := new(google_protobuf2.Empty) - err := grpc.Invoke(ctx, "/containerd.services.events.v1.Events/Post", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - // Server API for Events service type EventsServer interface { - Stream(*StreamEventsRequest, Events_StreamServer) error - Post(context.Context, *PostEventRequest) (*google_protobuf2.Empty, error) + Publish(context.Context, *PublishRequest) (*google_protobuf2.Empty, error) + Subscribe(*SubscribeRequest, Events_SubscribeServer) error } func RegisterEventsServer(s *grpc.Server, srv EventsServer) { s.RegisterService(&_Events_serviceDesc, srv) } -func _Events_Stream_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(StreamEventsRequest) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(EventsServer).Stream(m, &eventsStreamServer{stream}) -} - -type Events_StreamServer interface { - Send(*Envelope) error - grpc.ServerStream -} - -type eventsStreamServer struct { - grpc.ServerStream -} - -func (x *eventsStreamServer) Send(m *Envelope) error { - return x.ServerStream.SendMsg(m) -} - -func _Events_Post_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(PostEventRequest) +func _Events_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PublishRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(EventsServer).Post(ctx, in) + return srv.(EventsServer).Publish(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/containerd.services.events.v1.Events/Post", + FullMethod: "/containerd.services.events.v1.Events/Publish", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(EventsServer).Post(ctx, req.(*PostEventRequest)) + return srv.(EventsServer).Publish(ctx, req.(*PublishRequest)) } return interceptor(ctx, in, info, handler) } +func _Events_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(SubscribeRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(EventsServer).Subscribe(m, &eventsSubscribeServer{stream}) +} + +type Events_SubscribeServer interface { + Send(*Envelope) error + grpc.ServerStream +} + +type eventsSubscribeServer struct { + grpc.ServerStream +} + +func (x *eventsSubscribeServer) Send(m *Envelope) error { + return x.ServerStream.SendMsg(m) +} + var _Events_serviceDesc = grpc.ServiceDesc{ ServiceName: "containerd.services.events.v1.Events", HandlerType: (*EventsServer)(nil), Methods: []grpc.MethodDesc{ { - MethodName: "Post", - Handler: _Events_Post_Handler, + MethodName: "Publish", + Handler: _Events_Publish_Handler, }, }, Streams: []grpc.StreamDesc{ { - StreamName: "Stream", - Handler: _Events_Stream_Handler, + StreamName: "Subscribe", + Handler: _Events_Subscribe_Handler, ServerStreams: true, }, }, Metadata: "github.com/containerd/containerd/api/services/events/v1/events.proto", } -func (m *StreamEventsRequest) Marshal() (dAtA []byte, err error) { +func (m *SubscribeRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalTo(dAtA) @@ -206,15 +208,30 @@ func (m *StreamEventsRequest) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *StreamEventsRequest) MarshalTo(dAtA []byte) (int, error) { +func (m *SubscribeRequest) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l + if len(m.Filters) > 0 { + for _, s := range m.Filters { + dAtA[i] = 0xa + i++ + l = len(s) + for l >= 1<<7 { + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ + } + dAtA[i] = uint8(l) + i++ + i += copy(dAtA[i:], s) + } + } return i, nil } -func (m *PostEventRequest) Marshal() (dAtA []byte, err error) { +func (m *PublishRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalTo(dAtA) @@ -224,7 +241,7 @@ func (m *PostEventRequest) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *PostEventRequest) MarshalTo(dAtA []byte) (int, error) { +func (m *PublishRequest) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int @@ -265,14 +282,20 @@ func (m *Envelope) MarshalTo(dAtA []byte) (int, error) { return 0, err } i += n2 - if len(m.Topic) > 0 { + if len(m.Namespace) > 0 { dAtA[i] = 0x12 i++ + i = encodeVarintEvents(dAtA, i, uint64(len(m.Namespace))) + i += copy(dAtA[i:], m.Namespace) + } + if len(m.Topic) > 0 { + dAtA[i] = 0x1a + i++ i = encodeVarintEvents(dAtA, i, uint64(len(m.Topic))) i += copy(dAtA[i:], m.Topic) } if m.Event != nil { - dAtA[i] = 0x1a + dAtA[i] = 0x22 i++ i = encodeVarintEvents(dAtA, i, uint64(m.Event.Size())) n3, err := m.Event.MarshalTo(dAtA[i:]) @@ -311,13 +334,19 @@ func encodeVarintEvents(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return offset + 1 } -func (m *StreamEventsRequest) Size() (n int) { +func (m *SubscribeRequest) Size() (n int) { var l int _ = l + if len(m.Filters) > 0 { + for _, s := range m.Filters { + l = len(s) + n += 1 + l + sovEvents(uint64(l)) + } + } return n } -func (m *PostEventRequest) Size() (n int) { +func (m *PublishRequest) Size() (n int) { var l int _ = l if m.Envelope != nil { @@ -332,6 +361,10 @@ func (m *Envelope) Size() (n int) { _ = l l = github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp) n += 1 + l + sovEvents(uint64(l)) + l = len(m.Namespace) + if l > 0 { + n += 1 + l + sovEvents(uint64(l)) + } l = len(m.Topic) if l > 0 { n += 1 + l + sovEvents(uint64(l)) @@ -356,20 +389,21 @@ func sovEvents(x uint64) (n int) { func sozEvents(x uint64) (n int) { return sovEvents(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } -func (this *StreamEventsRequest) String() string { +func (this *SubscribeRequest) String() string { if this == nil { return "nil" } - s := strings.Join([]string{`&StreamEventsRequest{`, + s := strings.Join([]string{`&SubscribeRequest{`, + `Filters:` + fmt.Sprintf("%v", this.Filters) + `,`, `}`, }, "") return s } -func (this *PostEventRequest) String() string { +func (this *PublishRequest) String() string { if this == nil { return "nil" } - s := strings.Join([]string{`&PostEventRequest{`, + s := strings.Join([]string{`&PublishRequest{`, `Envelope:` + strings.Replace(fmt.Sprintf("%v", this.Envelope), "Envelope", "Envelope", 1) + `,`, `}`, }, "") @@ -381,6 +415,7 @@ func (this *Envelope) String() string { } s := strings.Join([]string{`&Envelope{`, `Timestamp:` + strings.Replace(strings.Replace(this.Timestamp.String(), "Timestamp", "google_protobuf3.Timestamp", 1), `&`, ``, 1) + `,`, + `Namespace:` + fmt.Sprintf("%v", this.Namespace) + `,`, `Topic:` + fmt.Sprintf("%v", this.Topic) + `,`, `Event:` + strings.Replace(fmt.Sprintf("%v", this.Event), "Any", "google_protobuf1.Any", 1) + `,`, `}`, @@ -395,7 +430,7 @@ func valueToStringEvents(v interface{}) string { pv := reflect.Indirect(rv).Interface() return fmt.Sprintf("*%v", pv) } -func (m *StreamEventsRequest) Unmarshal(dAtA []byte) error { +func (m *SubscribeRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -418,12 +453,41 @@ func (m *StreamEventsRequest) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: StreamEventsRequest: wiretype end group for non-group") + return fmt.Errorf("proto: SubscribeRequest: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: StreamEventsRequest: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: SubscribeRequest: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Filters", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Filters = append(m.Filters, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipEvents(dAtA[iNdEx:]) @@ -445,7 +509,7 @@ func (m *StreamEventsRequest) Unmarshal(dAtA []byte) error { } return nil } -func (m *PostEventRequest) Unmarshal(dAtA []byte) error { +func (m *PublishRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -468,10 +532,10 @@ func (m *PostEventRequest) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: PostEventRequest: wiretype end group for non-group") + return fmt.Errorf("proto: PublishRequest: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: PostEventRequest: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: PublishRequest: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: @@ -588,6 +652,35 @@ func (m *Envelope) Unmarshal(dAtA []byte) error { } iNdEx = postIndex case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Namespace", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Namespace = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType) } @@ -616,7 +709,7 @@ func (m *Envelope) Unmarshal(dAtA []byte) error { } m.Topic = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex - case 3: + case 4: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType) } @@ -780,28 +873,31 @@ func init() { } var fileDescriptorEvents = []byte{ - // 367 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0xc1, 0x4e, 0xc2, 0x40, - 0x10, 0x86, 0x59, 0x15, 0x02, 0xeb, 0xc5, 0xac, 0x68, 0xb0, 0xc6, 0x42, 0xb8, 0x48, 0x3c, 0xec, - 0x0a, 0x1e, 0x4d, 0x4c, 0x44, 0x39, 0x6b, 0xaa, 0x89, 0xc6, 0x5b, 0xa9, 0x63, 0x6d, 0x42, 0xbb, - 0xb5, 0x5d, 0x9a, 0x70, 0xf3, 0x11, 0x78, 0x26, 0x4f, 0x1c, 0x3d, 0x7a, 0x52, 0xe9, 0x93, 0x18, - 0x76, 0xb7, 0x60, 0xc0, 0x88, 0xf1, 0x36, 0x3b, 0xf3, 0xcd, 0xf4, 0x9f, 0x7f, 0x8a, 0xcf, 0x5d, - 0x4f, 0x3c, 0xf6, 0xbb, 0xd4, 0xe1, 0x3e, 0x73, 0x78, 0x20, 0x6c, 0x2f, 0x80, 0xe8, 0xfe, 0x7b, - 0x68, 0x87, 0x1e, 0x8b, 0x21, 0x4a, 0x3c, 0x07, 0x62, 0x06, 0x09, 0x04, 0x22, 0x66, 0x49, 0x53, - 0x47, 0x34, 0x8c, 0xb8, 0xe0, 0x64, 0x6f, 0xc6, 0xd3, 0x8c, 0xa5, 0x9a, 0x48, 0x9a, 0x46, 0xd9, - 0xe5, 0x2e, 0x97, 0x24, 0x9b, 0x44, 0xaa, 0xc9, 0xd8, 0x71, 0x39, 0x77, 0x7b, 0xc0, 0xe4, 0xab, - 0xdb, 0x7f, 0x60, 0x76, 0x30, 0xd0, 0xa5, 0xdd, 0xf9, 0x12, 0xf8, 0xa1, 0xc8, 0x8a, 0xd5, 0xf9, - 0xa2, 0xf0, 0x7c, 0x88, 0x85, 0xed, 0x87, 0x0a, 0xa8, 0x6f, 0xe1, 0xcd, 0x2b, 0x11, 0x81, 0xed, - 0x77, 0xa4, 0x02, 0x0b, 0x9e, 0xfa, 0x10, 0x8b, 0xfa, 0x0d, 0xde, 0xb8, 0xe4, 0xb1, 0x90, 0x49, - 0x9d, 0x23, 0x67, 0xb8, 0x08, 0x41, 0x02, 0x3d, 0x1e, 0x42, 0x05, 0xd5, 0x50, 0x63, 0xbd, 0xb5, - 0x4f, 0x7f, 0xdd, 0x85, 0x76, 0x34, 0x6e, 0x4d, 0x1b, 0xeb, 0x43, 0x84, 0x8b, 0x59, 0x9a, 0xb4, - 0x71, 0x69, 0xaa, 0x47, 0x8f, 0x34, 0xa8, 0x52, 0x4c, 0x33, 0xc5, 0xf4, 0x3a, 0x23, 0xda, 0xc5, - 0xd1, 0x7b, 0x35, 0x37, 0xfc, 0xa8, 0x22, 0x6b, 0xd6, 0x46, 0xca, 0x38, 0x2f, 0x78, 0xe8, 0x39, - 0x95, 0x95, 0x1a, 0x6a, 0x94, 0x2c, 0xf5, 0x20, 0x07, 0x38, 0x2f, 0x65, 0x54, 0x56, 0xe5, 0xd4, - 0xf2, 0xc2, 0xd4, 0xd3, 0x60, 0x60, 0x29, 0xa4, 0xf5, 0x82, 0x70, 0x41, 0x6d, 0x4f, 0x5c, 0x5c, - 0x50, 0x6e, 0x90, 0xd6, 0x92, 0xd5, 0x7e, 0x30, 0xcd, 0xf8, 0xab, 0x1d, 0x87, 0x88, 0x5c, 0xe0, - 0xb5, 0x89, 0xbf, 0x84, 0x2d, 0x69, 0x99, 0x3f, 0x82, 0xb1, 0xbd, 0xb0, 0x49, 0x67, 0x72, 0xee, - 0xf6, 0xed, 0x68, 0x6c, 0xe6, 0xde, 0xc6, 0x66, 0xee, 0x39, 0x35, 0xd1, 0x28, 0x35, 0xd1, 0x6b, - 0x6a, 0xa2, 0xcf, 0xd4, 0x44, 0x77, 0x27, 0xff, 0xfc, 0x6b, 0x8f, 0x55, 0xd4, 0x2d, 0xc8, 0x2f, - 0x1d, 0x7d, 0x05, 0x00, 0x00, 0xff, 0xff, 0xdd, 0x37, 0xcb, 0x0e, 0xfe, 0x02, 0x00, 0x00, + // 407 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0xcd, 0x6e, 0xd3, 0x40, + 0x10, 0xc7, 0xb3, 0x84, 0x7c, 0x78, 0x91, 0x10, 0x5a, 0x45, 0xc8, 0x18, 0x70, 0xa2, 0x5c, 0x88, + 0x10, 0xec, 0x92, 0x70, 0x44, 0x42, 0x22, 0x90, 0x7b, 0x64, 0x40, 0x42, 0xdc, 0x6c, 0x77, 0xe2, + 0xac, 0x64, 0x7b, 0x5d, 0xef, 0xda, 0x52, 0x6e, 0x7d, 0x84, 0x3e, 0x49, 0x5f, 0xa2, 0x97, 0x1c, + 0x7b, 0xec, 0xa9, 0x6d, 0xfc, 0x24, 0x55, 0xfc, 0x91, 0xb4, 0x89, 0xd4, 0x54, 0xbd, 0xcd, 0xec, + 0xff, 0x37, 0x3b, 0xfb, 0x9f, 0x59, 0xfc, 0xcb, 0xe3, 0x6a, 0x9e, 0x38, 0xd4, 0x15, 0x01, 0x73, + 0x45, 0xa8, 0x6c, 0x1e, 0x42, 0x7c, 0x74, 0x37, 0xb4, 0x23, 0xce, 0x24, 0xc4, 0x29, 0x77, 0x41, + 0x32, 0x48, 0x21, 0x54, 0x92, 0xa5, 0xc3, 0x32, 0xa2, 0x51, 0x2c, 0x94, 0x20, 0xef, 0xb7, 0x3c, + 0xad, 0x58, 0x5a, 0x12, 0xe9, 0xd0, 0xe8, 0x78, 0xc2, 0x13, 0x39, 0xc9, 0xd6, 0x51, 0x51, 0x64, + 0xbc, 0xf1, 0x84, 0xf0, 0x7c, 0x60, 0x79, 0xe6, 0x24, 0x33, 0x66, 0x87, 0x8b, 0x52, 0x7a, 0xbb, + 0x2b, 0x41, 0x10, 0xa9, 0x4a, 0xec, 0xee, 0x8a, 0x8a, 0x07, 0x20, 0x95, 0x1d, 0x44, 0x05, 0xd0, + 0xff, 0x84, 0x5f, 0xfd, 0x4e, 0x1c, 0xe9, 0xc6, 0xdc, 0x01, 0x0b, 0x8e, 0x13, 0x90, 0x8a, 0xe8, + 0xb8, 0x35, 0xe3, 0xbe, 0x82, 0x58, 0xea, 0xa8, 0x57, 0x1f, 0x68, 0x56, 0x95, 0xf6, 0xff, 0xe2, + 0x97, 0xd3, 0xc4, 0xf1, 0xb9, 0x9c, 0x57, 0xec, 0x4f, 0xdc, 0x86, 0x30, 0x05, 0x5f, 0x44, 0xa0, + 0xa3, 0x1e, 0x1a, 0xbc, 0x18, 0x7d, 0xa0, 0x0f, 0x1a, 0xa4, 0x93, 0x12, 0xb7, 0x36, 0x85, 0xfd, + 0x33, 0x84, 0xdb, 0xd5, 0x31, 0x19, 0x63, 0x6d, 0xf3, 0xc8, 0xf2, 0x4a, 0x83, 0x16, 0x36, 0x68, + 0x65, 0x83, 0xfe, 0xa9, 0x88, 0x71, 0x7b, 0x79, 0xd5, 0xad, 0x9d, 0x5e, 0x77, 0x91, 0xb5, 0x2d, + 0x23, 0xef, 0xb0, 0x16, 0xda, 0x01, 0xc8, 0xc8, 0x76, 0x41, 0x7f, 0xd6, 0x43, 0x03, 0xcd, 0xda, + 0x1e, 0x90, 0x0e, 0x6e, 0x28, 0x11, 0x71, 0x57, 0xaf, 0xe7, 0x4a, 0x91, 0x90, 0x8f, 0xb8, 0x91, + 0x3f, 0x52, 0x7f, 0x9e, 0xf7, 0xec, 0xec, 0xf5, 0xfc, 0x11, 0x2e, 0xac, 0x02, 0x19, 0x9d, 0x23, + 0xdc, 0x9c, 0xe4, 0x8e, 0xc8, 0x14, 0xb7, 0xca, 0x91, 0x90, 0xcf, 0x07, 0x9c, 0xdf, 0x1f, 0x9d, + 0xf1, 0x7a, 0xaf, 0xc3, 0x64, 0xbd, 0x39, 0xe2, 0x61, 0x6d, 0xb3, 0x12, 0xc2, 0x0e, 0xdc, 0xb9, + 0xbb, 0x3c, 0xe3, 0xb1, 0xe3, 0xff, 0x82, 0xc6, 0xff, 0x96, 0x2b, 0xb3, 0x76, 0xb9, 0x32, 0x6b, + 0x27, 0x99, 0x89, 0x96, 0x99, 0x89, 0x2e, 0x32, 0x13, 0xdd, 0x64, 0x26, 0xfa, 0xff, 0xfd, 0x89, + 0x3f, 0xfd, 0x5b, 0x11, 0x39, 0xcd, 0xdc, 0xd2, 0xd7, 0xdb, 0x00, 0x00, 0x00, 0xff, 0xff, 0x13, + 0x35, 0xd0, 0x60, 0x32, 0x03, 0x00, 0x00, } diff --git a/api/services/events/v1/events.proto b/api/services/events/v1/events.proto index 5f7d9a4e1..cb470d1c1 100644 --- a/api/services/events/v1/events.proto +++ b/api/services/events/v1/events.proto @@ -10,18 +10,21 @@ import "google/protobuf/timestamp.proto"; option go_package = "github.com/containerd/containerd/api/services/events/v1;events"; service Events { - rpc Stream(StreamEventsRequest) returns (stream Envelope); - rpc Post(PostEventRequest) returns (google.protobuf.Empty); + rpc Publish(PublishRequest) returns (google.protobuf.Empty); + rpc Subscribe(SubscribeRequest) returns (stream Envelope); } -message StreamEventsRequest {} +message SubscribeRequest { + repeated string filters = 1; +} -message PostEventRequest { +message PublishRequest { Envelope envelope = 1; } message Envelope { google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; - string topic = 2; - google.protobuf.Any event = 3; + string namespace = 2; + string topic = 3; + google.protobuf.Any event = 4; } diff --git a/cmd/ctr/events.go b/cmd/ctr/events.go index 9cd9d29b1..253b7c81b 100644 --- a/cmd/ctr/events.go +++ b/cmd/ctr/events.go @@ -22,7 +22,7 @@ var eventsCommand = cli.Command{ ctx, cancel := appContext(context) defer cancel() - events, err := eventsClient.Stream(ctx, &eventsapi.StreamEventsRequest{}) + events, err := eventsClient.Subscribe(ctx, &eventsapi.SubscribeRequest{}) if err != nil { return err } diff --git a/events/emitter.go b/events/emitter.go deleted file mode 100644 index d69105647..000000000 --- a/events/emitter.go +++ /dev/null @@ -1,67 +0,0 @@ -package events - -import ( - "context" - "sync" - - events "github.com/containerd/containerd/api/services/events/v1" - "github.com/containerd/containerd/namespaces" - goevents "github.com/docker/go-events" -) - -const ( - EventVersion = "v1" -) - -type Emitter struct { - sinks map[string]*eventSink - broadcaster *goevents.Broadcaster - m sync.Mutex -} - -func NewEmitter() *Emitter { - return &Emitter{ - sinks: make(map[string]*eventSink), - broadcaster: goevents.NewBroadcaster(), - m: sync.Mutex{}, - } -} - -func (e *Emitter) Post(ctx context.Context, evt Event) error { - if err := e.broadcaster.Write(&sinkEvent{ - ctx: ctx, - event: evt, - }); err != nil { - return err - } - - return nil -} - -func (e *Emitter) Events(ctx context.Context, clientID string) chan *events.Envelope { - e.m.Lock() - if _, ok := e.sinks[clientID]; !ok { - ns, _ := namespaces.Namespace(ctx) - s := &eventSink{ - ch: make(chan *events.Envelope), - ns: ns, - } - e.sinks[clientID] = s - e.m.Unlock() - e.broadcaster.Add(s) - return s.ch - } - ch := e.sinks[clientID].ch - e.m.Unlock() - - return ch -} - -func (e *Emitter) Remove(clientID string) { - e.m.Lock() - if v, ok := e.sinks[clientID]; ok { - e.broadcaster.Remove(v) - delete(e.sinks, clientID) - } - e.m.Unlock() -} diff --git a/events/event.go b/events/event.go deleted file mode 100644 index f59f0c4a8..000000000 --- a/events/event.go +++ /dev/null @@ -1,3 +0,0 @@ -package events - -type Event interface{} diff --git a/events/events.go b/events/events.go new file mode 100644 index 000000000..d00c9e578 --- /dev/null +++ b/events/events.go @@ -0,0 +1,24 @@ +package events + +import ( + "context" + + events "github.com/containerd/containerd/api/services/events/v1" +) + +type Event interface{} + +// Publisher posts the event. +type Publisher interface { + Publish(ctx context.Context, topic string, event Event) error +} + +type Forwarder interface { + Forward(ctx context.Context, envelope *events.Envelope) error +} + +type publisherFunc func(ctx context.Context, topic string, event Event) error + +func (fn publisherFunc) Publish(ctx context.Context, topic string, event Event) error { + return fn(ctx, topic, event) +} diff --git a/events/events_test.go b/events/events_test.go deleted file mode 100644 index c58fc6225..000000000 --- a/events/events_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package events - -import ( - "context" - "fmt" - "testing" -) - -func TestBasicEvent(t *testing.T) { - ctx := context.Background() - - // simulate a layer pull with events - ctx, commit, _ := WithTx(ctx) - - G(ctx).Post(ctx, "pull ubuntu") - - for layer := 0; layer < 4; layer++ { - // make a subtransaction for each layer - ctx, commit, _ := WithTx(ctx) - - G(ctx).Post(ctx, fmt.Sprintf("fetch layer %v", layer)) - - ctx = WithTopic(ctx, "content") - // simulate sub-operations with a separate topic, on the content store - G(ctx).Post(ctx, fmt.Sprint("received sha:256")) - - G(ctx).Post(ctx, fmt.Sprintf("unpack layer %v", layer)) - - commit() - } - - commit() -} diff --git a/events/exchange.go b/events/exchange.go new file mode 100644 index 000000000..e78e5c204 --- /dev/null +++ b/events/exchange.go @@ -0,0 +1,162 @@ +package events + +import ( + "context" + "strings" + "time" + + events "github.com/containerd/containerd/api/services/events/v1" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/filters" + "github.com/containerd/containerd/identifiers" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/typeurl" + goevents "github.com/docker/go-events" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type Exchange struct { + broadcaster *goevents.Broadcaster +} + +func NewExchange() *Exchange { + return &Exchange{ + broadcaster: goevents.NewBroadcaster(), + } +} + +// Forward accepts an envelope to be direcly distributed on the exchange. +func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) error { + log.G(ctx).WithFields(logrus.Fields{ + "topic": envelope.Topic, + "ns": envelope.Namespace, + "type": envelope.Event.TypeUrl, + }).Debug("forward event") + + if err := namespaces.Validate(envelope.Namespace); err != nil { + return errors.Wrapf(err, "event envelope has invalid namespace") + } + + if err := validateTopic(envelope.Topic); err != nil { + return errors.Wrapf(err, "envelope topic %q", envelope.Topic) + } + + return e.broadcaster.Write(envelope) +} + +// Publish packages and sends an event. The caller will be considered the +// initial publisher of the event. This means the timestamp will be calculated +// at this point and this method may read from the calling context. +func (e *Exchange) Publish(ctx context.Context, topic string, event Event) error { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return errors.Wrapf(err, "failed publishing event") + } + if err := validateTopic(topic); err != nil { + return errors.Wrapf(err, "envelope topic %q", topic) + } + + evany, err := typeurl.MarshalAny(event) + if err != nil { + return err + } + env := events.Envelope{ + Timestamp: time.Now().UTC(), + Topic: topic, + Event: evany, + } + if err := e.broadcaster.Write(&env); err != nil { + return err + } + + log.G(ctx).WithFields(logrus.Fields{ + "topic": topic, + "type": evany.TypeUrl, + "ns": namespace, + }).Debug("published event") + return nil +} + +// Subscribe to events on the exchange. Events are sent through the returned +// channel ch. If an error is encountered, it will be sent on channel errs and +// errs will be closed. To end the subscription, cancel the provided context. +func (e *Exchange) Subscribe(ctx context.Context, filters ...filters.Filter) (ch <-chan *events.Envelope, errs <-chan error) { + var ( + evch = make(chan *events.Envelope) + errq = make(chan error, 1) + channel = goevents.NewChannel(0) + queue = goevents.NewQueue(channel) + ) + + // TODO(stevvooe): Insert the filter! + + e.broadcaster.Add(queue) + + go func() { + defer close(errq) + defer e.broadcaster.Remove(queue) + defer queue.Close() + + var err error + loop: + for { + select { + case ev := <-channel.C: + env, ok := ev.(*events.Envelope) + if !ok { + // TODO(stevvooe): For the most part, we are well protected + // from this condition. Both Forward and Publish protect + // from this. + err = errors.Errorf("invalid envelope encountered %#v; please file a bug", ev) + break + } + + select { + case evch <- env: + case <-ctx.Done(): + break loop + } + case <-ctx.Done(): + break loop + } + } + + if err == nil { + if cerr := ctx.Err(); cerr != context.Canceled { + err = cerr + } + } + + errq <- err + }() + + ch = evch + errs = errq + + return +} + +func validateTopic(topic string) error { + if topic == "" { + return errors.Wrap(errdefs.ErrInvalidArgument, "must not be empty") + } + + if topic[0] != '/' { + return errors.Wrapf(errdefs.ErrInvalidArgument, "must start with '/'", topic) + } + + if len(topic) == 1 { + return errors.Wrapf(errdefs.ErrInvalidArgument, "must have at least one component", topic) + } + + components := strings.Split(topic[1:], "/") + for _, component := range components { + if err := identifiers.Validate(component); err != nil { + return errors.Wrapf(err, "failed validation on component %q", component) + } + } + + return nil +} diff --git a/events/exchange_test.go b/events/exchange_test.go new file mode 100644 index 000000000..dcd5859bf --- /dev/null +++ b/events/exchange_test.go @@ -0,0 +1,149 @@ +package events + +import ( + "context" + "fmt" + "reflect" + "sync" + "testing" + + events "github.com/containerd/containerd/api/services/events/v1" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/typeurl" + "github.com/pkg/errors" +) + +func TestExchangeBasic(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), t.Name()) + testevents := []Event{ + &events.ContainerCreate{ID: "asdf"}, + &events.ContainerCreate{ID: "qwer"}, + &events.ContainerCreate{ID: "zxcv"}, + } + exchange := NewExchange() + + t.Log("subscribe") + var cancel1, cancel2 func() + + // Create two subscribers for same set of events and make sure they + // traverse the exchange. + ctx1, cancel1 := context.WithCancel(ctx) + eventq1, errq1 := exchange.Subscribe(ctx1) + + ctx2, cancel2 := context.WithCancel(ctx) + eventq2, errq2 := exchange.Subscribe(ctx2) + + t.Log("publish") + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for _, event := range testevents { + fmt.Println("publish", event) + if err := exchange.Publish(ctx, "/test", event); err != nil { + fmt.Println("publish error", err) + t.Fatal(err) + } + } + + t.Log("finished publishing") + }() + + t.Log("waiting") + wg.Wait() + + for _, subscriber := range []struct { + eventq <-chan *events.Envelope + errq <-chan error + cancel func() + }{ + { + eventq: eventq1, + errq: errq1, + cancel: cancel1, + }, + { + eventq: eventq2, + errq: errq2, + cancel: cancel2, + }, + } { + var received []Event + subscribercheck: + for { + select { + case env := <-subscriber.eventq: + ev, err := typeurl.UnmarshalAny(env.Event) + if err != nil { + t.Fatal(err) + } + received = append(received, ev.(*events.ContainerCreate)) + case err := <-subscriber.errq: + if err != nil { + t.Fatal(err) + } + break subscribercheck + } + + if reflect.DeepEqual(received, testevents) { + // when we do this, we expect the errs channel to be closed and + // this will return. + subscriber.cancel() + } + } + } +} + +func TestExchangeValidateTopic(t *testing.T) { + namespace := t.Name() + ctx := namespaces.WithNamespace(context.Background(), namespace) + exchange := NewExchange() + + for _, testcase := range []struct { + input string + err error + }{ + { + input: "/test", + }, + { + input: "/test/test", + }, + { + input: "test", + err: errdefs.ErrInvalidArgument, + }, + } { + t.Run(testcase.input, func(t *testing.T) { + event := &events.ContainerCreate{ID: t.Name()} + if err := exchange.Publish(ctx, testcase.input, event); errors.Cause(err) != testcase.err { + if err == nil { + t.Fatalf("expected error %v, received nil", testcase.err) + } else { + t.Fatalf("expected error %v, received %v", testcase.err, err) + } + } + + evany, err := typeurl.MarshalAny(event) + if err != nil { + t.Fatal(err) + } + + envelope := events.Envelope{ + Namespace: namespace, + Topic: testcase.input, + Event: evany, + } + // make sure we get same errors with forward. + if err := exchange.Forward(ctx, &envelope); errors.Cause(err) != testcase.err { + if err == nil { + t.Fatalf("expected error %v, received nil", testcase.err) + } else { + t.Fatalf("expected error %v, received %v", testcase.err, err) + } + } + + }) + } +} diff --git a/events/poster.go b/events/poster.go deleted file mode 100644 index 124f8b79b..000000000 --- a/events/poster.go +++ /dev/null @@ -1,65 +0,0 @@ -package events - -import ( - "context" - - "github.com/containerd/containerd/log" - "github.com/containerd/containerd/namespaces" - "github.com/sirupsen/logrus" -) - -var ( - G = GetPoster -) - -// Poster posts the event. -type Poster interface { - Post(ctx context.Context, evt Event) error -} - -type posterKey struct{} - -func WithPoster(ctx context.Context, poster Poster) context.Context { - return context.WithValue(ctx, posterKey{}, poster) -} - -func GetPoster(ctx context.Context) Poster { - poster := ctx.Value(posterKey{}) - - if poster == nil { - tx, _ := getTx(ctx) - topic := getTopic(ctx) - - // likely means we don't have a configured event system. Just return - // the default poster, which merely logs events. - return posterFunc(func(ctx context.Context, evt Event) error { - fields := logrus.Fields{"event": evt} - - if topic != "" { - fields["topic"] = topic - } - ns, _ := namespaces.Namespace(ctx) - fields["ns"] = ns - - if tx != nil { - fields["tx.id"] = tx.id - if tx.parent != nil { - fields["tx.parent.id"] = tx.parent.id - } - } - - log.G(ctx).WithFields(fields).Debug("event fired") - - return nil - }) - } - - return poster.(Poster) -} - -type posterFunc func(ctx context.Context, evt Event) error - -func (fn posterFunc) Post(ctx context.Context, evt Event) error { - fn(ctx, evt) - return nil -} diff --git a/events/sink.go b/events/sink.go deleted file mode 100644 index 9246e40aa..000000000 --- a/events/sink.go +++ /dev/null @@ -1,65 +0,0 @@ -package events - -import ( - "context" - "time" - - "github.com/containerd/containerd/api/services/events/v1" - "github.com/containerd/containerd/log" - "github.com/containerd/containerd/namespaces" - "github.com/containerd/containerd/typeurl" - goevents "github.com/docker/go-events" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -type sinkEvent struct { - ctx context.Context - event Event -} - -type eventSink struct { - ns string - ch chan *events.Envelope -} - -func (s *eventSink) Write(evt goevents.Event) error { - e, ok := evt.(*sinkEvent) - if !ok { - return errors.New("event is not a sink event") - } - - ns, _ := namespaces.Namespace(e.ctx) - if ns != "" && ns != s.ns { - // ignore events not intended for this ns - return nil - } - - if ev, ok := e.event.(*events.Envelope); ok { - s.ch <- ev - return nil - } - topic := getTopic(e.ctx) - - eventData, err := typeurl.MarshalAny(e.event) - if err != nil { - return err - } - - log.G(e.ctx).WithFields(logrus.Fields{ - "topic": topic, - "type": eventData.TypeUrl, - "ns": ns, - }).Debug("event") - - s.ch <- &events.Envelope{ - Timestamp: time.Now(), - Topic: topic, - Event: eventData, - } - return nil -} - -func (s *eventSink) Close() error { - return nil -} diff --git a/events/topic.go b/events/topic.go deleted file mode 100644 index 5f371a118..000000000 --- a/events/topic.go +++ /dev/null @@ -1,35 +0,0 @@ -package events - -import "context" - -type topicKey struct{} - -// WithTopic returns a context with a new topic set, such that events emitted -// from the resulting context will be marked with the topic. -// -// A topic groups events by the target module they operate on. This is -// primarily designed to support multi-module log compaction of events. In -// typical journaling systems, the entries operate on a single data structure. -// When compacting the journal, we can replace all former log entries with a -// summary data structure that will result in the same state. -// -// By providing a compaction mechanism by topic, we can prune down to a data -// structure oriented towards a single topic, leaving unrelated messages alone. -func WithTopic(ctx context.Context, topic string) context.Context { - return context.WithValue(ctx, topicKey{}, topic) -} - -func getTopic(ctx context.Context) string { - topic := ctx.Value(topicKey{}) - - if topic == nil { - return "" - } - - return topic.(string) -} - -// RegisterCompactor sets the compacter for the given topic. -func RegisterCompactor(topic string, compactor interface{}) { - panic("not implemented") -} diff --git a/events/transaction.go b/events/transaction.go deleted file mode 100644 index 11a2cd40b..000000000 --- a/events/transaction.go +++ /dev/null @@ -1,96 +0,0 @@ -package events - -import ( - "context" - "fmt" - "sync" - "sync/atomic" - "time" -) - -var txCounter int64 // replace this with something that won't break - -// nextTXID provides the next transaction identifier. -func nexttxID() int64 { - // TODO(stevvooe): Need to coordinate this with existing transaction logs. - // For now, this is a toy, but not a racy one. - return atomic.AddInt64(&txCounter, 1) -} - -type transaction struct { - ctx context.Context - id int64 - parent *transaction // if nil, no parent transaction - finish sync.Once - start, end time.Time // informational -} - -// begin creates a sub-transaction. -func (tx *transaction) begin(ctx context.Context, poster Poster) *transaction { - id := nexttxID() - - child := &transaction{ - ctx: ctx, - id: id, - parent: tx, - start: time.Now(), - } - - // post the transaction started event - poster.Post(ctx, child.makeTransactionEvent("begin")) // transactions are really just events - - return child -} - -// commit writes out the transaction. -func (tx *transaction) commit(poster Poster) { - tx.finish.Do(func() { - tx.end = time.Now() - poster.Post(tx.ctx, tx.makeTransactionEvent("commit")) - }) -} - -func (tx *transaction) rollback(poster Poster, cause error) { - tx.finish.Do(func() { - tx.end = time.Now() - event := tx.makeTransactionEvent("rollback") - event = fmt.Sprintf("%s error=%q", event, cause.Error()) - poster.Post(tx.ctx, event) - }) -} - -func (tx *transaction) makeTransactionEvent(status string) Event { - // TODO(stevvooe): obviously, we need more structure than this. - event := fmt.Sprintf("%v %v", status, tx.id) - if tx.parent != nil { - event += " parent=" + fmt.Sprint(tx.parent.id) - } - - return event -} - -type txKey struct{} - -func getTx(ctx context.Context) (*transaction, bool) { - tx := ctx.Value(txKey{}) - if tx == nil { - return nil, false - } - - return tx.(*transaction), true -} - -// WithTx returns a new context with an event transaction, such that events -// posted to the underlying context will be committed to the event log as a -// group, organized by a transaction id, when commit is called. -func WithTx(pctx context.Context) (ctx context.Context, commit func(), rollback func(err error)) { - poster := G(pctx) - parent, _ := getTx(pctx) - tx := parent.begin(pctx, poster) - - return context.WithValue(pctx, txKey{}, tx), func() { - tx.commit(poster) - }, func(err error) { - tx.rollback(poster, err) - } -} diff --git a/linux/shim/local.go b/linux/shim/local.go index 10215ea41..c7d23b8a6 100644 --- a/linux/shim/local.go +++ b/linux/shim/local.go @@ -89,17 +89,16 @@ func (c *local) Update(ctx context.Context, in *shimapi.UpdateTaskRequest, opts return c.s.Update(ctx, in) } -type poster interface { - Post(ctx context.Context, in *events.PostEventRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) +type publisher interface { + Publish(ctx context.Context, in *events.PublishRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) } type localEventsClient struct { - emitter evt.Poster + forwarder evt.Forwarder } -func (l *localEventsClient) Post(ctx context.Context, r *events.PostEventRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { - ctx = evt.WithTopic(ctx, r.Envelope.Topic) - if err := l.emitter.Post(ctx, r.Envelope); err != nil { +func (l *localEventsClient) Publish(ctx context.Context, r *events.PublishRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { + if err := l.forwarder.Forward(ctx, r.Envelope); err != nil { return nil, err } return empty, nil diff --git a/linux/shim/service.go b/linux/shim/service.go index a767f3710..5555394a8 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -37,7 +37,7 @@ func NewService(path, namespace, address string) (*Service, error) { return nil, fmt.Errorf("shim namespace cannot be empty") } context := namespaces.WithNamespace(context.Background(), namespace) - var client poster + var client publisher if address != "" { conn, err := connect(address, dialer) if err != nil { @@ -46,7 +46,7 @@ func NewService(path, namespace, address string) (*Service, error) { client = events.NewEventsClient(conn) } else { client = &localEventsClient{ - emitter: evt.GetPoster(context), + forwarder: evt.NewExchange(), } } s := &Service{ @@ -379,16 +379,18 @@ func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, er return pids, nil } -func (s *Service) forward(client poster) { +func (s *Service) forward(client publisher) { for e := range s.events { a, err := typeurl.MarshalAny(e) if err != nil { log.G(s.context).WithError(err).Error("marshal event") continue } - if _, err := client.Post(s.context, &events.PostEventRequest{ + + if _, err := client.Publish(s.context, &events.PublishRequest{ Envelope: &events.Envelope{ - Timestamp: time.Now(), + Namespace: s.namespace, + Timestamp: time.Now().UTC(), Topic: getTopic(e), Event: a, }, diff --git a/metrics/cgroups/cgroups.go b/metrics/cgroups/cgroups.go index e2adc7d15..5ef16c0c3 100644 --- a/metrics/cgroups/cgroups.go +++ b/metrics/cgroups/cgroups.go @@ -4,8 +4,8 @@ package cgroups import ( "github.com/containerd/cgroups" - events "github.com/containerd/containerd/api/services/events/v1" - evt "github.com/containerd/containerd/events" + eventsapi "github.com/containerd/containerd/api/services/events/v1" + "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime" @@ -35,7 +35,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { collector: collector, oom: oom, context: ic.Context, - emitter: ic.Emitter, + publisher: ic.Events, }, nil } @@ -43,7 +43,7 @@ type cgroupsMonitor struct { collector *Collector oom *OOMCollector context context.Context - emitter *evt.Emitter + publisher events.Publisher } func (m *cgroupsMonitor) Monitor(c runtime.Task) error { @@ -69,7 +69,7 @@ func (m *cgroupsMonitor) Stop(c runtime.Task) error { } func (m *cgroupsMonitor) trigger(id string, cg cgroups.Cgroup) { - if err := m.emitter.Post(m.context, &events.TaskOOM{ + if err := m.publisher.Publish(m.context, runtime.TaskOOMEventTopic, &eventsapi.TaskOOM{ ContainerID: id, }); err != nil { log.G(m.context).WithError(err).Error("post OOM event") diff --git a/plugin/context.go b/plugin/context.go index 57e190d95..16cf1b8b7 100644 --- a/plugin/context.go +++ b/plugin/context.go @@ -22,7 +22,7 @@ type InitContext struct { Address string Context context.Context Config interface{} - Emitter *events.Emitter + Events *events.Exchange plugins map[PluginType]map[string]interface{} } diff --git a/process.go b/process.go index a3b4b3565..82473ea2b 100644 --- a/process.go +++ b/process.go @@ -64,7 +64,7 @@ func (p *process) Kill(ctx context.Context, s syscall.Signal) error { } func (p *process) Wait(ctx context.Context) (uint32, error) { - eventstream, err := p.task.client.EventService().Stream(ctx, &eventsapi.StreamEventsRequest{}) + eventstream, err := p.task.client.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{}) if err != nil { return UnknownExitStatus, err } diff --git a/server/server.go b/server/server.go index b2a260b11..1d205261f 100644 --- a/server/server.go +++ b/server/server.go @@ -53,8 +53,8 @@ func New(ctx context.Context, config *Config) (*Server, error) { var ( services []plugin.Service s = &Server{ - rpc: rpc, - emitter: events.NewEmitter(), + rpc: rpc, + events: events.NewExchange(), } initialized = make(map[plugin.PluginType]map[string]interface{}) ) @@ -63,12 +63,12 @@ func New(ctx context.Context, config *Config) (*Server, error) { log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id) initContext := plugin.NewContext( - events.WithPoster(ctx, s.emitter), + ctx, initialized, config.Root, id, ) - initContext.Emitter = s.emitter + initContext.Events = s.events initContext.Address = config.GRPC.Address // load the plugin specific configuration if it is provided @@ -112,8 +112,8 @@ func New(ctx context.Context, config *Config) (*Server, error) { // Server is the containerd main daemon type Server struct { - rpc *grpc.Server - emitter *events.Emitter + rpc *grpc.Server + events *events.Exchange } // ServeGRPC provides the containerd grpc APIs on the provided listener diff --git a/services/containers/service.go b/services/containers/service.go index c83c5b67a..74db689df 100644 --- a/services/containers/service.go +++ b/services/containers/service.go @@ -24,23 +24,22 @@ func init() { plugin.MetadataPlugin, }, Init: func(ic *plugin.InitContext) (interface{}, error) { - e := events.GetPoster(ic.Context) m, err := ic.Get(plugin.MetadataPlugin) if err != nil { return nil, err } - return NewService(m.(*bolt.DB), e), nil + return NewService(m.(*bolt.DB), ic.Events), nil }, }) } type Service struct { - db *bolt.DB - emitter events.Poster + db *bolt.DB + publisher events.Publisher } -func NewService(db *bolt.DB, evts events.Poster) api.ContainersServer { - return &Service{db: db, emitter: evts} +func NewService(db *bolt.DB, publisher events.Publisher) api.ContainersServer { + return &Service{db: db, publisher: publisher} } func (s *Service) Register(server *grpc.Server) error { @@ -94,7 +93,7 @@ func (s *Service) Create(ctx context.Context, req *api.CreateContainerRequest) ( }); err != nil { return &resp, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/containers/create", &eventsapi.ContainerCreate{ + if err := s.publisher.Publish(ctx, "/containers/create", &eventsapi.ContainerCreate{ ID: resp.Container.ID, Image: resp.Container.Image, Runtime: &eventsapi.ContainerCreate_Runtime{ @@ -136,7 +135,7 @@ func (s *Service) Update(ctx context.Context, req *api.UpdateContainerRequest) ( return &resp, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/containers/update", &eventsapi.ContainerUpdate{ + if err := s.publisher.Publish(ctx, "/containers/update", &eventsapi.ContainerUpdate{ ID: resp.Container.ID, Image: resp.Container.Image, Labels: resp.Container.Labels, @@ -155,7 +154,7 @@ func (s *Service) Delete(ctx context.Context, req *api.DeleteContainerRequest) ( return &empty.Empty{}, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/containers/delete", &eventsapi.ContainerDelete{ + if err := s.publisher.Publish(ctx, "/containers/delete", &eventsapi.ContainerDelete{ ID: req.ID, }); err != nil { return &empty.Empty{}, err @@ -175,12 +174,3 @@ func (s *Service) withStoreView(ctx context.Context, fn func(ctx context.Context func (s *Service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error { return s.db.Update(s.withStore(ctx, fn)) } - -func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error { - emitterCtx := events.WithTopic(ctx, topic) - if err := s.emitter.Post(emitterCtx, evt); err != nil { - return err - } - - return nil -} diff --git a/services/content/service.go b/services/content/service.go index c8698df8a..3b972e481 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -23,8 +23,8 @@ import ( ) type Service struct { - store content.Store - emitter events.Poster + store content.Store + publisher events.Publisher } var bufPool = sync.Pool{ @@ -58,8 +58,8 @@ func NewService(ic *plugin.InitContext) (interface{}, error) { } cs := metadata.NewContentStore(m.(*bolt.DB), c.(content.Store)) return &Service{ - store: cs, - emitter: events.GetPoster(ic.Context), + store: cs, + publisher: ic.Events, }, nil } @@ -149,7 +149,7 @@ func (s *Service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*e return nil, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/content/delete", &eventsapi.ContentDelete{ + if err := s.publisher.Publish(ctx, "/content/delete", &eventsapi.ContentDelete{ Digest: req.Digest, }); err != nil { return nil, err @@ -459,12 +459,3 @@ func (s *Service) Abort(ctx context.Context, req *api.AbortRequest) (*empty.Empt return &empty.Empty{}, nil } - -func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error { - emitterCtx := events.WithTopic(ctx, topic) - if err := s.emitter.Post(emitterCtx, evt); err != nil { - return err - } - - return nil -} diff --git a/services/events/service.go b/services/events/service.go index 48822a5ea..30f630d27 100644 --- a/services/events/service.go +++ b/services/events/service.go @@ -1,14 +1,13 @@ package events import ( - "fmt" - "time" - api "github.com/containerd/containerd/api/services/events/v1" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" + "github.com/containerd/containerd/filters" "github.com/containerd/containerd/plugin" "github.com/golang/protobuf/ptypes/empty" - "github.com/sirupsen/logrus" + "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" ) @@ -18,18 +17,17 @@ func init() { Type: plugin.GRPCPlugin, ID: "events", Init: func(ic *plugin.InitContext) (interface{}, error) { - return NewService(ic.Emitter), nil + return NewService(ic.Events), nil }, }) } type Service struct { - emitter *events.Emitter - timeouts map[string]*time.Timer + events *events.Exchange } -func NewService(e *events.Emitter) api.EventsServer { - return &Service{emitter: e} +func NewService(events *events.Exchange) api.EventsServer { + return &Service{events: events} } func (s *Service) Register(server *grpc.Server) error { @@ -37,28 +35,36 @@ func (s *Service) Register(server *grpc.Server) error { return nil } -func (s *Service) Stream(req *api.StreamEventsRequest, srv api.Events_StreamServer) error { - clientID := fmt.Sprintf("%d", time.Now().UnixNano()) +func (s *Service) Subscribe(req *api.SubscribeRequest, srv api.Events_SubscribeServer) error { + ctx, cancel := context.WithCancel(srv.Context()) + defer cancel() + + filter, err := filters.ParseAll(req.Filters...) + if err != nil { + return errdefs.ToGRPC(err) + } + + eventq, errq := s.events.Subscribe(ctx, filter) for { - e := <-s.emitter.Events(srv.Context(), clientID) - // upon the client event timeout this will be nil; ignore - if e == nil { + select { + case ev := <-eventq: + if err := srv.Send(ev); err != nil { + return errors.Wrapf(err, "failed sending event to subscriber") + } + case err := <-errq: + if err != nil { + return errors.Wrapf(err, "subscription error") + } + return nil } - if err := srv.Send(e); err != nil { - logrus.WithFields(logrus.Fields{ - "client": clientID, - }).Debug("error sending event; unsubscribing client") - s.emitter.Remove(clientID) - return err - } } } -func (s *Service) Post(ctx context.Context, r *api.PostEventRequest) (*empty.Empty, error) { - ctx = events.WithTopic(ctx, r.Envelope.Topic) - if err := s.emitter.Post(ctx, r.Envelope); err != nil { - return nil, err +func (s *Service) Publish(ctx context.Context, r *api.PublishRequest) (*empty.Empty, error) { + if err := s.events.Forward(ctx, r.Envelope); err != nil { + return nil, errdefs.ToGRPC(err) } + return &empty.Empty{}, nil } diff --git a/services/images/service.go b/services/images/service.go index a6a78ff97..d8499aa43 100644 --- a/services/images/service.go +++ b/services/images/service.go @@ -24,25 +24,24 @@ func init() { plugin.MetadataPlugin, }, Init: func(ic *plugin.InitContext) (interface{}, error) { - e := events.GetPoster(ic.Context) m, err := ic.Get(plugin.MetadataPlugin) if err != nil { return nil, err } - return NewService(m.(*bolt.DB), e), nil + return NewService(m.(*bolt.DB), ic.Events), nil }, }) } type Service struct { - db *bolt.DB - emitter events.Poster + db *bolt.DB + publisher events.Publisher } -func NewService(db *bolt.DB, evts events.Poster) imagesapi.ImagesServer { +func NewService(db *bolt.DB, publisher events.Publisher) imagesapi.ImagesServer { return &Service{ - db: db, - emitter: evts, + db: db, + publisher: publisher, } } @@ -100,7 +99,7 @@ func (s *Service) Create(ctx context.Context, req *imagesapi.CreateImageRequest) return nil, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/images/create", &eventsapi.ImageCreate{ + if err := s.publisher.Publish(ctx, "/images/create", &eventsapi.ImageCreate{ Name: resp.Image.Name, Labels: resp.Image.Labels, }); err != nil { @@ -139,7 +138,7 @@ func (s *Service) Update(ctx context.Context, req *imagesapi.UpdateImageRequest) return nil, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/images/update", &eventsapi.ImageUpdate{ + if err := s.publisher.Publish(ctx, "/images/update", &eventsapi.ImageUpdate{ Name: resp.Image.Name, Labels: resp.Image.Labels, }); err != nil { @@ -156,7 +155,7 @@ func (s *Service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest) return nil, err } - if err := s.emit(ctx, "/images/delete", &eventsapi.ImageDelete{ + if err := s.publisher.Publish(ctx, "/images/delete", &eventsapi.ImageDelete{ Name: req.Name, }); err != nil { return nil, err @@ -176,12 +175,3 @@ func (s *Service) withStoreView(ctx context.Context, fn func(ctx context.Context func (s *Service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store images.Store) error) error { return s.db.Update(s.withStore(ctx, fn)) } - -func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error { - emitterCtx := events.WithTopic(ctx, topic) - if err := s.emitter.Post(emitterCtx, evt); err != nil { - return err - } - - return nil -} diff --git a/services/namespaces/service.go b/services/namespaces/service.go index d0de39295..0891670cb 100644 --- a/services/namespaces/service.go +++ b/services/namespaces/service.go @@ -25,27 +25,26 @@ func init() { plugin.MetadataPlugin, }, Init: func(ic *plugin.InitContext) (interface{}, error) { - e := events.GetPoster(ic.Context) m, err := ic.Get(plugin.MetadataPlugin) if err != nil { return nil, err } - return NewService(m.(*bolt.DB), e), nil + return NewService(m.(*bolt.DB), ic.Events), nil }, }) } type Service struct { - db *bolt.DB - emitter events.Poster + db *bolt.DB + publisher events.Publisher } var _ api.NamespacesServer = &Service{} -func NewService(db *bolt.DB, evts events.Poster) api.NamespacesServer { +func NewService(db *bolt.DB, publisher events.Publisher) api.NamespacesServer { return &Service{ - db: db, - emitter: evts, + db: db, + publisher: publisher, } } @@ -119,7 +118,7 @@ func (s *Service) Create(ctx context.Context, req *api.CreateNamespaceRequest) ( return &resp, err } - if err := s.emit(ctx, "/namespaces/create", &eventsapi.NamespaceCreate{ + if err := s.publisher.Publish(ctx, "/namespaces/create", &eventsapi.NamespaceCreate{ Name: req.Namespace.Name, Labels: req.Namespace.Labels, }); err != nil { @@ -172,7 +171,7 @@ func (s *Service) Update(ctx context.Context, req *api.UpdateNamespaceRequest) ( return &resp, err } - if err := s.emit(ctx, "/namespaces/update", &eventsapi.NamespaceUpdate{ + if err := s.publisher.Publish(ctx, "/namespaces/update", &eventsapi.NamespaceUpdate{ Name: req.Namespace.Name, Labels: req.Namespace.Labels, }); err != nil { @@ -189,7 +188,7 @@ func (s *Service) Delete(ctx context.Context, req *api.DeleteNamespaceRequest) ( return &empty.Empty{}, err } - if err := s.emit(ctx, "/namespaces/delete", &eventsapi.NamespaceDelete{ + if err := s.publisher.Publish(ctx, "/namespaces/delete", &eventsapi.NamespaceDelete{ Name: req.Name, }); err != nil { return &empty.Empty{}, err @@ -209,12 +208,3 @@ func (s *Service) withStoreView(ctx context.Context, fn func(ctx context.Context func (s *Service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) error { return s.db.Update(s.withStore(ctx, fn)) } - -func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error { - emitterCtx := events.WithTopic(ctx, topic) - if err := s.emitter.Post(emitterCtx, evt); err != nil { - return err - } - - return nil -} diff --git a/services/snapshot/service.go b/services/snapshot/service.go index 089812e09..adc3cc186 100644 --- a/services/snapshot/service.go +++ b/services/snapshot/service.go @@ -45,11 +45,10 @@ var empty = &protoempty.Empty{} type service struct { snapshotters map[string]snapshot.Snapshotter defaultSnapshotterName string - emitter events.Poster + publisher events.Publisher } func newService(ic *plugin.InitContext) (interface{}, error) { - evts := events.GetPoster(ic.Context) rawSnapshotters, err := ic.GetAll(plugin.SnapshotPlugin) if err != nil { return nil, err @@ -72,7 +71,7 @@ func newService(ic *plugin.InitContext) (interface{}, error) { return &service{ snapshotters: snapshotters, defaultSnapshotterName: cfg.Default, - emitter: evts, + publisher: ic.Events, }, nil } @@ -105,7 +104,7 @@ func (s *service) Prepare(ctx context.Context, pr *snapshotapi.PrepareSnapshotRe return nil, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/snapshot/prepare", &eventsapi.SnapshotPrepare{ + if err := s.publisher.Publish(ctx, "/snapshot/prepare", &eventsapi.SnapshotPrepare{ Key: pr.Key, Parent: pr.Parent, }); err != nil { @@ -162,7 +161,7 @@ func (s *service) Commit(ctx context.Context, cr *snapshotapi.CommitSnapshotRequ return nil, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/snapshot/commit", &eventsapi.SnapshotCommit{ + if err := s.publisher.Publish(ctx, "/snapshot/commit", &eventsapi.SnapshotCommit{ Key: cr.Key, Name: cr.Name, }); err != nil { @@ -183,7 +182,7 @@ func (s *service) Remove(ctx context.Context, rr *snapshotapi.RemoveSnapshotRequ return nil, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/snapshot/remove", &eventsapi.SnapshotRemove{ + if err := s.publisher.Publish(ctx, "/snapshot/remove", &eventsapi.SnapshotRemove{ Key: rr.Key, }); err != nil { return nil, err @@ -294,12 +293,3 @@ func fromMounts(mounts []mount.Mount) []*types.Mount { } return out } - -func (s *service) emit(ctx context.Context, topic string, evt interface{}) error { - emitterCtx := events.WithTopic(ctx, topic) - if err := s.emitter.Post(emitterCtx, evt); err != nil { - return err - } - - return nil -} diff --git a/services/tasks/service.go b/services/tasks/service.go index e794d6ae8..a6764ccdb 100644 --- a/services/tasks/service.go +++ b/services/tasks/service.go @@ -67,20 +67,19 @@ func New(ic *plugin.InitContext) (interface{}, error) { r := rr.(runtime.Runtime) runtimes[r.ID()] = r } - e := events.GetPoster(ic.Context) return &Service{ - runtimes: runtimes, - db: m.(*bolt.DB), - store: cs, - emitter: e, + runtimes: runtimes, + db: m.(*bolt.DB), + store: cs, + publisher: ic.Events, }, nil } type Service struct { - runtimes map[string]runtime.Runtime - db *bolt.DB - store content.Store - emitter events.Poster + runtimes map[string]runtime.Runtime + db *bolt.DB + store content.Store + publisher events.Publisher } func (s *Service) Register(server *grpc.Server) error { @@ -502,12 +501,3 @@ func (s *Service) getRuntime(name string) (runtime.Runtime, error) { } return runtime, nil } - -func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error { - emitterCtx := events.WithTopic(ctx, topic) - if err := s.emitter.Post(emitterCtx, evt); err != nil { - return err - } - - return nil -} diff --git a/task.go b/task.go index 61191d312..fc7b74437 100644 --- a/task.go +++ b/task.go @@ -161,7 +161,7 @@ func (t *task) Status(ctx context.Context) (TaskStatus, error) { // Wait is a blocking call that will wait for the task to exit and return the exit status func (t *task) Wait(ctx context.Context) (uint32, error) { - eventstream, err := t.client.EventService().Stream(ctx, &eventsapi.StreamEventsRequest{}) + eventstream, err := t.client.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{}) if err != nil { return UnknownExitStatus, errdefs.FromGRPC(err) } diff --git a/windows/runtime.go b/windows/runtime.go index 5ab31c034..2ea448477 100644 --- a/windows/runtime.go +++ b/windows/runtime.go @@ -63,8 +63,8 @@ func New(ic *plugin.InitContext) (interface{}, error) { root: ic.Root, pidPool: newPidPool(), - events: make(chan interface{}, 4096), - emitter: ic.Emitter, + events: make(chan interface{}, 4096), + publisher: ic.Events, // TODO(mlaventure): windows needs a stat monitor monitor: nil, tasks: runtime.NewTaskList(), @@ -84,8 +84,8 @@ type windowsRuntime struct { root string pidPool *pidPool - emitter *events.Emitter - events chan interface{} + publisher events.Publisher + events chan interface{} monitor runtime.TaskMonitor tasks *runtime.TaskList @@ -194,7 +194,8 @@ func (r *windowsRuntime) Delete(ctx context.Context, t runtime.Task) (*runtime.E wt.cleanup() r.tasks.Delete(ctx, t) - r.emitter.Post(events.WithTopic(ctx, runtime.TaskDeleteEventTopic), + r.publisher.Publish(ctx, + runtime.TaskDeleteEventTopic, &eventsapi.TaskDelete{ ContainerID: wt.id, Pid: wt.pid, @@ -296,7 +297,7 @@ func (r *windowsRuntime) newTask(ctx context.Context, namespace, id string, spec spec: spec, processes: make(map[string]*process), hyperV: spec.Windows.HyperV != nil, - emitter: r.emitter, + publisher: r.publisher, rwLayer: conf.LayerFolderPath, pidPool: r.pidPool, hcsContainer: ctr, @@ -312,7 +313,8 @@ func (r *windowsRuntime) newTask(ctx context.Context, namespace, id string, spec }) } - r.emitter.Post(events.WithTopic(ctx, runtime.TaskCreateEventTopic), + r.publisher.Publish(ctx, + runtime.TaskCreateEventTopic, &eventsapi.TaskCreate{ ContainerID: id, IO: &eventsapi.TaskIO{ diff --git a/windows/task.go b/windows/task.go index 7e5bbb521..5114a2990 100644 --- a/windows/task.go +++ b/windows/task.go @@ -34,8 +34,8 @@ type task struct { processes map[string]*process hyperV bool - emitter *events.Emitter - rwLayer string + publisher events.Publisher + rwLayer string pidPool *pidPool hcsContainer hcsshim.Container @@ -112,7 +112,8 @@ func (t *task) Start(ctx context.Context) error { return err } - t.emitter.Post(events.WithTopic(ctx, runtime.TaskStartEventTopic), + t.publisher.Publish(ctx, + runtime.TaskStartEventTopic, &eventsapi.TaskStart{ ContainerID: t.id, Pid: t.pid, @@ -130,7 +131,8 @@ func (t *task) Pause(ctx context.Context) error { t.Unlock() } if err == nil { - t.emitter.Post(events.WithTopic(ctx, runtime.TaskPausedEventTopic), + t.publisher.Publish(ctx, + runtime.TaskPausedEventTopic, &eventsapi.TaskPaused{ ContainerID: t.id, }) @@ -150,7 +152,8 @@ func (t *task) Resume(ctx context.Context) error { t.Unlock() } if err == nil { - t.emitter.Post(events.WithTopic(ctx, runtime.TaskResumedEventTopic), + t.publisher.Publish(ctx, + runtime.TaskResumedEventTopic, &eventsapi.TaskResumed{ ContainerID: t.id, }) @@ -195,7 +198,8 @@ func (t *task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runt return nil, err } - t.emitter.Post(events.WithTopic(ctx, runtime.TaskExecAddedEventTopic), + t.publisher.Publish(ctx, + runtime.TaskExecAddedEventTopic, &eventsapi.TaskExecAdded{ ContainerID: t.id, ExecID: id, @@ -358,7 +362,8 @@ func (t *task) newProcess(ctx context.Context, id string, conf *hcsshim.ProcessC } wp.exitCode = uint32(ec) - t.emitter.Post(events.WithTopic(ctx, runtime.TaskExitEventTopic), + t.publisher.Publish(ctx, + runtime.TaskExitEventTopic, &eventsapi.TaskExit{ ContainerID: t.id, ID: id,