diff --git a/api/services/events/v1/container.pb.go b/api/services/events/v1/container.pb.go index 362cafede..1df8f81dc 100644 --- a/api/services/events/v1/container.pb.go +++ b/api/services/events/v1/container.pb.go @@ -19,8 +19,8 @@ ContainerUpdate ContainerDelete ContentDelete - StreamEventsRequest - PostEventRequest + SubscribeRequest + PublishRequest Envelope ImageCreate ImageUpdate diff --git a/api/services/events/v1/events.pb.go b/api/services/events/v1/events.pb.go index 61632cd52..f736c9b09 100644 --- a/api/services/events/v1/events.pb.go +++ b/api/services/events/v1/events.pb.go @@ -32,25 +32,27 @@ var _ = fmt.Errorf var _ = math.Inf var _ = time.Kitchen -type StreamEventsRequest struct { +type SubscribeRequest struct { + Filters []string `protobuf:"bytes,1,rep,name=filters" json:"filters,omitempty"` } -func (m *StreamEventsRequest) Reset() { *m = StreamEventsRequest{} } -func (*StreamEventsRequest) ProtoMessage() {} -func (*StreamEventsRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{0} } +func (m *SubscribeRequest) Reset() { *m = SubscribeRequest{} } +func (*SubscribeRequest) ProtoMessage() {} +func (*SubscribeRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{0} } -type PostEventRequest struct { +type PublishRequest struct { Envelope *Envelope `protobuf:"bytes,1,opt,name=envelope" json:"envelope,omitempty"` } -func (m *PostEventRequest) Reset() { *m = PostEventRequest{} } -func (*PostEventRequest) ProtoMessage() {} -func (*PostEventRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{1} } +func (m *PublishRequest) Reset() { *m = PublishRequest{} } +func (*PublishRequest) ProtoMessage() {} +func (*PublishRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{1} } type Envelope struct { Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,stdtime" json:"timestamp"` - Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"` - Event *google_protobuf1.Any `protobuf:"bytes,3,opt,name=event" json:"event,omitempty"` + Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` + Topic string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"` + Event *google_protobuf1.Any `protobuf:"bytes,4,opt,name=event" json:"event,omitempty"` } func (m *Envelope) Reset() { *m = Envelope{} } @@ -58,8 +60,8 @@ func (*Envelope) ProtoMessage() {} func (*Envelope) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{2} } func init() { - proto.RegisterType((*StreamEventsRequest)(nil), "containerd.services.events.v1.StreamEventsRequest") - proto.RegisterType((*PostEventRequest)(nil), "containerd.services.events.v1.PostEventRequest") + proto.RegisterType((*SubscribeRequest)(nil), "containerd.services.events.v1.SubscribeRequest") + proto.RegisterType((*PublishRequest)(nil), "containerd.services.events.v1.PublishRequest") proto.RegisterType((*Envelope)(nil), "containerd.services.events.v1.Envelope") } @@ -74,8 +76,8 @@ const _ = grpc.SupportPackageIsVersion4 // Client API for Events service type EventsClient interface { - Stream(ctx context.Context, in *StreamEventsRequest, opts ...grpc.CallOption) (Events_StreamClient, error) - Post(ctx context.Context, in *PostEventRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error) + Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error) + Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (Events_SubscribeClient, error) } type eventsClient struct { @@ -86,12 +88,21 @@ func NewEventsClient(cc *grpc.ClientConn) EventsClient { return &eventsClient{cc} } -func (c *eventsClient) Stream(ctx context.Context, in *StreamEventsRequest, opts ...grpc.CallOption) (Events_StreamClient, error) { - stream, err := grpc.NewClientStream(ctx, &_Events_serviceDesc.Streams[0], c.cc, "/containerd.services.events.v1.Events/Stream", opts...) +func (c *eventsClient) Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error) { + out := new(google_protobuf2.Empty) + err := grpc.Invoke(ctx, "/containerd.services.events.v1.Events/Publish", in, out, c.cc, opts...) if err != nil { return nil, err } - x := &eventsStreamClient{stream} + return out, nil +} + +func (c *eventsClient) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (Events_SubscribeClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Events_serviceDesc.Streams[0], c.cc, "/containerd.services.events.v1.Events/Subscribe", opts...) + if err != nil { + return nil, err + } + x := &eventsSubscribeClient{stream} if err := x.ClientStream.SendMsg(in); err != nil { return nil, err } @@ -101,16 +112,16 @@ func (c *eventsClient) Stream(ctx context.Context, in *StreamEventsRequest, opts return x, nil } -type Events_StreamClient interface { +type Events_SubscribeClient interface { Recv() (*Envelope, error) grpc.ClientStream } -type eventsStreamClient struct { +type eventsSubscribeClient struct { grpc.ClientStream } -func (x *eventsStreamClient) Recv() (*Envelope, error) { +func (x *eventsSubscribeClient) Recv() (*Envelope, error) { m := new(Envelope) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err @@ -118,85 +129,76 @@ func (x *eventsStreamClient) Recv() (*Envelope, error) { return m, nil } -func (c *eventsClient) Post(ctx context.Context, in *PostEventRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error) { - out := new(google_protobuf2.Empty) - err := grpc.Invoke(ctx, "/containerd.services.events.v1.Events/Post", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - // Server API for Events service type EventsServer interface { - Stream(*StreamEventsRequest, Events_StreamServer) error - Post(context.Context, *PostEventRequest) (*google_protobuf2.Empty, error) + Publish(context.Context, *PublishRequest) (*google_protobuf2.Empty, error) + Subscribe(*SubscribeRequest, Events_SubscribeServer) error } func RegisterEventsServer(s *grpc.Server, srv EventsServer) { s.RegisterService(&_Events_serviceDesc, srv) } -func _Events_Stream_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(StreamEventsRequest) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(EventsServer).Stream(m, &eventsStreamServer{stream}) -} - -type Events_StreamServer interface { - Send(*Envelope) error - grpc.ServerStream -} - -type eventsStreamServer struct { - grpc.ServerStream -} - -func (x *eventsStreamServer) Send(m *Envelope) error { - return x.ServerStream.SendMsg(m) -} - -func _Events_Post_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(PostEventRequest) +func _Events_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PublishRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(EventsServer).Post(ctx, in) + return srv.(EventsServer).Publish(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/containerd.services.events.v1.Events/Post", + FullMethod: "/containerd.services.events.v1.Events/Publish", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(EventsServer).Post(ctx, req.(*PostEventRequest)) + return srv.(EventsServer).Publish(ctx, req.(*PublishRequest)) } return interceptor(ctx, in, info, handler) } +func _Events_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(SubscribeRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(EventsServer).Subscribe(m, &eventsSubscribeServer{stream}) +} + +type Events_SubscribeServer interface { + Send(*Envelope) error + grpc.ServerStream +} + +type eventsSubscribeServer struct { + grpc.ServerStream +} + +func (x *eventsSubscribeServer) Send(m *Envelope) error { + return x.ServerStream.SendMsg(m) +} + var _Events_serviceDesc = grpc.ServiceDesc{ ServiceName: "containerd.services.events.v1.Events", HandlerType: (*EventsServer)(nil), Methods: []grpc.MethodDesc{ { - MethodName: "Post", - Handler: _Events_Post_Handler, + MethodName: "Publish", + Handler: _Events_Publish_Handler, }, }, Streams: []grpc.StreamDesc{ { - StreamName: "Stream", - Handler: _Events_Stream_Handler, + StreamName: "Subscribe", + Handler: _Events_Subscribe_Handler, ServerStreams: true, }, }, Metadata: "github.com/containerd/containerd/api/services/events/v1/events.proto", } -func (m *StreamEventsRequest) Marshal() (dAtA []byte, err error) { +func (m *SubscribeRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalTo(dAtA) @@ -206,15 +208,30 @@ func (m *StreamEventsRequest) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *StreamEventsRequest) MarshalTo(dAtA []byte) (int, error) { +func (m *SubscribeRequest) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l + if len(m.Filters) > 0 { + for _, s := range m.Filters { + dAtA[i] = 0xa + i++ + l = len(s) + for l >= 1<<7 { + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ + } + dAtA[i] = uint8(l) + i++ + i += copy(dAtA[i:], s) + } + } return i, nil } -func (m *PostEventRequest) Marshal() (dAtA []byte, err error) { +func (m *PublishRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalTo(dAtA) @@ -224,7 +241,7 @@ func (m *PostEventRequest) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *PostEventRequest) MarshalTo(dAtA []byte) (int, error) { +func (m *PublishRequest) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int @@ -265,14 +282,20 @@ func (m *Envelope) MarshalTo(dAtA []byte) (int, error) { return 0, err } i += n2 - if len(m.Topic) > 0 { + if len(m.Namespace) > 0 { dAtA[i] = 0x12 i++ + i = encodeVarintEvents(dAtA, i, uint64(len(m.Namespace))) + i += copy(dAtA[i:], m.Namespace) + } + if len(m.Topic) > 0 { + dAtA[i] = 0x1a + i++ i = encodeVarintEvents(dAtA, i, uint64(len(m.Topic))) i += copy(dAtA[i:], m.Topic) } if m.Event != nil { - dAtA[i] = 0x1a + dAtA[i] = 0x22 i++ i = encodeVarintEvents(dAtA, i, uint64(m.Event.Size())) n3, err := m.Event.MarshalTo(dAtA[i:]) @@ -311,13 +334,19 @@ func encodeVarintEvents(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return offset + 1 } -func (m *StreamEventsRequest) Size() (n int) { +func (m *SubscribeRequest) Size() (n int) { var l int _ = l + if len(m.Filters) > 0 { + for _, s := range m.Filters { + l = len(s) + n += 1 + l + sovEvents(uint64(l)) + } + } return n } -func (m *PostEventRequest) Size() (n int) { +func (m *PublishRequest) Size() (n int) { var l int _ = l if m.Envelope != nil { @@ -332,6 +361,10 @@ func (m *Envelope) Size() (n int) { _ = l l = github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp) n += 1 + l + sovEvents(uint64(l)) + l = len(m.Namespace) + if l > 0 { + n += 1 + l + sovEvents(uint64(l)) + } l = len(m.Topic) if l > 0 { n += 1 + l + sovEvents(uint64(l)) @@ -356,20 +389,21 @@ func sovEvents(x uint64) (n int) { func sozEvents(x uint64) (n int) { return sovEvents(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } -func (this *StreamEventsRequest) String() string { +func (this *SubscribeRequest) String() string { if this == nil { return "nil" } - s := strings.Join([]string{`&StreamEventsRequest{`, + s := strings.Join([]string{`&SubscribeRequest{`, + `Filters:` + fmt.Sprintf("%v", this.Filters) + `,`, `}`, }, "") return s } -func (this *PostEventRequest) String() string { +func (this *PublishRequest) String() string { if this == nil { return "nil" } - s := strings.Join([]string{`&PostEventRequest{`, + s := strings.Join([]string{`&PublishRequest{`, `Envelope:` + strings.Replace(fmt.Sprintf("%v", this.Envelope), "Envelope", "Envelope", 1) + `,`, `}`, }, "") @@ -381,6 +415,7 @@ func (this *Envelope) String() string { } s := strings.Join([]string{`&Envelope{`, `Timestamp:` + strings.Replace(strings.Replace(this.Timestamp.String(), "Timestamp", "google_protobuf3.Timestamp", 1), `&`, ``, 1) + `,`, + `Namespace:` + fmt.Sprintf("%v", this.Namespace) + `,`, `Topic:` + fmt.Sprintf("%v", this.Topic) + `,`, `Event:` + strings.Replace(fmt.Sprintf("%v", this.Event), "Any", "google_protobuf1.Any", 1) + `,`, `}`, @@ -395,7 +430,7 @@ func valueToStringEvents(v interface{}) string { pv := reflect.Indirect(rv).Interface() return fmt.Sprintf("*%v", pv) } -func (m *StreamEventsRequest) Unmarshal(dAtA []byte) error { +func (m *SubscribeRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -418,12 +453,41 @@ func (m *StreamEventsRequest) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: StreamEventsRequest: wiretype end group for non-group") + return fmt.Errorf("proto: SubscribeRequest: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: StreamEventsRequest: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: SubscribeRequest: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Filters", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Filters = append(m.Filters, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipEvents(dAtA[iNdEx:]) @@ -445,7 +509,7 @@ func (m *StreamEventsRequest) Unmarshal(dAtA []byte) error { } return nil } -func (m *PostEventRequest) Unmarshal(dAtA []byte) error { +func (m *PublishRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -468,10 +532,10 @@ func (m *PostEventRequest) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: PostEventRequest: wiretype end group for non-group") + return fmt.Errorf("proto: PublishRequest: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: PostEventRequest: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: PublishRequest: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: @@ -588,6 +652,35 @@ func (m *Envelope) Unmarshal(dAtA []byte) error { } iNdEx = postIndex case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Namespace", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Namespace = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType) } @@ -616,7 +709,7 @@ func (m *Envelope) Unmarshal(dAtA []byte) error { } m.Topic = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex - case 3: + case 4: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType) } @@ -780,28 +873,31 @@ func init() { } var fileDescriptorEvents = []byte{ - // 367 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0xc1, 0x4e, 0xc2, 0x40, - 0x10, 0x86, 0x59, 0x15, 0x02, 0xeb, 0xc5, 0xac, 0x68, 0xb0, 0xc6, 0x42, 0xb8, 0x48, 0x3c, 0xec, - 0x0a, 0x1e, 0x4d, 0x4c, 0x44, 0x39, 0x6b, 0xaa, 0x89, 0xc6, 0x5b, 0xa9, 0x63, 0x6d, 0x42, 0xbb, - 0xb5, 0x5d, 0x9a, 0x70, 0xf3, 0x11, 0x78, 0x26, 0x4f, 0x1c, 0x3d, 0x7a, 0x52, 0xe9, 0x93, 0x18, - 0x76, 0xb7, 0x60, 0xc0, 0x88, 0xf1, 0x36, 0x3b, 0xf3, 0xcd, 0xf4, 0x9f, 0x7f, 0x8a, 0xcf, 0x5d, - 0x4f, 0x3c, 0xf6, 0xbb, 0xd4, 0xe1, 0x3e, 0x73, 0x78, 0x20, 0x6c, 0x2f, 0x80, 0xe8, 0xfe, 0x7b, - 0x68, 0x87, 0x1e, 0x8b, 0x21, 0x4a, 0x3c, 0x07, 0x62, 0x06, 0x09, 0x04, 0x22, 0x66, 0x49, 0x53, - 0x47, 0x34, 0x8c, 0xb8, 0xe0, 0x64, 0x6f, 0xc6, 0xd3, 0x8c, 0xa5, 0x9a, 0x48, 0x9a, 0x46, 0xd9, - 0xe5, 0x2e, 0x97, 0x24, 0x9b, 0x44, 0xaa, 0xc9, 0xd8, 0x71, 0x39, 0x77, 0x7b, 0xc0, 0xe4, 0xab, - 0xdb, 0x7f, 0x60, 0x76, 0x30, 0xd0, 0xa5, 0xdd, 0xf9, 0x12, 0xf8, 0xa1, 0xc8, 0x8a, 0xd5, 0xf9, - 0xa2, 0xf0, 0x7c, 0x88, 0x85, 0xed, 0x87, 0x0a, 0xa8, 0x6f, 0xe1, 0xcd, 0x2b, 0x11, 0x81, 0xed, - 0x77, 0xa4, 0x02, 0x0b, 0x9e, 0xfa, 0x10, 0x8b, 0xfa, 0x0d, 0xde, 0xb8, 0xe4, 0xb1, 0x90, 0x49, - 0x9d, 0x23, 0x67, 0xb8, 0x08, 0x41, 0x02, 0x3d, 0x1e, 0x42, 0x05, 0xd5, 0x50, 0x63, 0xbd, 0xb5, - 0x4f, 0x7f, 0xdd, 0x85, 0x76, 0x34, 0x6e, 0x4d, 0x1b, 0xeb, 0x43, 0x84, 0x8b, 0x59, 0x9a, 0xb4, - 0x71, 0x69, 0xaa, 0x47, 0x8f, 0x34, 0xa8, 0x52, 0x4c, 0x33, 0xc5, 0xf4, 0x3a, 0x23, 0xda, 0xc5, - 0xd1, 0x7b, 0x35, 0x37, 0xfc, 0xa8, 0x22, 0x6b, 0xd6, 0x46, 0xca, 0x38, 0x2f, 0x78, 0xe8, 0x39, - 0x95, 0x95, 0x1a, 0x6a, 0x94, 0x2c, 0xf5, 0x20, 0x07, 0x38, 0x2f, 0x65, 0x54, 0x56, 0xe5, 0xd4, - 0xf2, 0xc2, 0xd4, 0xd3, 0x60, 0x60, 0x29, 0xa4, 0xf5, 0x82, 0x70, 0x41, 0x6d, 0x4f, 0x5c, 0x5c, - 0x50, 0x6e, 0x90, 0xd6, 0x92, 0xd5, 0x7e, 0x30, 0xcd, 0xf8, 0xab, 0x1d, 0x87, 0x88, 0x5c, 0xe0, - 0xb5, 0x89, 0xbf, 0x84, 0x2d, 0x69, 0x99, 0x3f, 0x82, 0xb1, 0xbd, 0xb0, 0x49, 0x67, 0x72, 0xee, - 0xf6, 0xed, 0x68, 0x6c, 0xe6, 0xde, 0xc6, 0x66, 0xee, 0x39, 0x35, 0xd1, 0x28, 0x35, 0xd1, 0x6b, - 0x6a, 0xa2, 0xcf, 0xd4, 0x44, 0x77, 0x27, 0xff, 0xfc, 0x6b, 0x8f, 0x55, 0xd4, 0x2d, 0xc8, 0x2f, - 0x1d, 0x7d, 0x05, 0x00, 0x00, 0xff, 0xff, 0xdd, 0x37, 0xcb, 0x0e, 0xfe, 0x02, 0x00, 0x00, + // 407 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0xcd, 0x6e, 0xd3, 0x40, + 0x10, 0xc7, 0xb3, 0x84, 0x7c, 0x78, 0x91, 0x10, 0x5a, 0x45, 0xc8, 0x18, 0x70, 0xa2, 0x5c, 0x88, + 0x10, 0xec, 0x92, 0x70, 0x44, 0x42, 0x22, 0x90, 0x7b, 0x64, 0x40, 0x42, 0xdc, 0x6c, 0x77, 0xe2, + 0xac, 0x64, 0x7b, 0x5d, 0xef, 0xda, 0x52, 0x6e, 0x7d, 0x84, 0x3e, 0x49, 0x5f, 0xa2, 0x97, 0x1c, + 0x7b, 0xec, 0xa9, 0x6d, 0xfc, 0x24, 0x55, 0xfc, 0x91, 0xb4, 0x89, 0xd4, 0x54, 0xbd, 0xcd, 0xec, + 0xff, 0x37, 0x3b, 0xfb, 0x9f, 0x59, 0xfc, 0xcb, 0xe3, 0x6a, 0x9e, 0x38, 0xd4, 0x15, 0x01, 0x73, + 0x45, 0xa8, 0x6c, 0x1e, 0x42, 0x7c, 0x74, 0x37, 0xb4, 0x23, 0xce, 0x24, 0xc4, 0x29, 0x77, 0x41, + 0x32, 0x48, 0x21, 0x54, 0x92, 0xa5, 0xc3, 0x32, 0xa2, 0x51, 0x2c, 0x94, 0x20, 0xef, 0xb7, 0x3c, + 0xad, 0x58, 0x5a, 0x12, 0xe9, 0xd0, 0xe8, 0x78, 0xc2, 0x13, 0x39, 0xc9, 0xd6, 0x51, 0x51, 0x64, + 0xbc, 0xf1, 0x84, 0xf0, 0x7c, 0x60, 0x79, 0xe6, 0x24, 0x33, 0x66, 0x87, 0x8b, 0x52, 0x7a, 0xbb, + 0x2b, 0x41, 0x10, 0xa9, 0x4a, 0xec, 0xee, 0x8a, 0x8a, 0x07, 0x20, 0x95, 0x1d, 0x44, 0x05, 0xd0, + 0xff, 0x84, 0x5f, 0xfd, 0x4e, 0x1c, 0xe9, 0xc6, 0xdc, 0x01, 0x0b, 0x8e, 0x13, 0x90, 0x8a, 0xe8, + 0xb8, 0x35, 0xe3, 0xbe, 0x82, 0x58, 0xea, 0xa8, 0x57, 0x1f, 0x68, 0x56, 0x95, 0xf6, 0xff, 0xe2, + 0x97, 0xd3, 0xc4, 0xf1, 0xb9, 0x9c, 0x57, 0xec, 0x4f, 0xdc, 0x86, 0x30, 0x05, 0x5f, 0x44, 0xa0, + 0xa3, 0x1e, 0x1a, 0xbc, 0x18, 0x7d, 0xa0, 0x0f, 0x1a, 0xa4, 0x93, 0x12, 0xb7, 0x36, 0x85, 0xfd, + 0x33, 0x84, 0xdb, 0xd5, 0x31, 0x19, 0x63, 0x6d, 0xf3, 0xc8, 0xf2, 0x4a, 0x83, 0x16, 0x36, 0x68, + 0x65, 0x83, 0xfe, 0xa9, 0x88, 0x71, 0x7b, 0x79, 0xd5, 0xad, 0x9d, 0x5e, 0x77, 0x91, 0xb5, 0x2d, + 0x23, 0xef, 0xb0, 0x16, 0xda, 0x01, 0xc8, 0xc8, 0x76, 0x41, 0x7f, 0xd6, 0x43, 0x03, 0xcd, 0xda, + 0x1e, 0x90, 0x0e, 0x6e, 0x28, 0x11, 0x71, 0x57, 0xaf, 0xe7, 0x4a, 0x91, 0x90, 0x8f, 0xb8, 0x91, + 0x3f, 0x52, 0x7f, 0x9e, 0xf7, 0xec, 0xec, 0xf5, 0xfc, 0x11, 0x2e, 0xac, 0x02, 0x19, 0x9d, 0x23, + 0xdc, 0x9c, 0xe4, 0x8e, 0xc8, 0x14, 0xb7, 0xca, 0x91, 0x90, 0xcf, 0x07, 0x9c, 0xdf, 0x1f, 0x9d, + 0xf1, 0x7a, 0xaf, 0xc3, 0x64, 0xbd, 0x39, 0xe2, 0x61, 0x6d, 0xb3, 0x12, 0xc2, 0x0e, 0xdc, 0xb9, + 0xbb, 0x3c, 0xe3, 0xb1, 0xe3, 0xff, 0x82, 0xc6, 0xff, 0x96, 0x2b, 0xb3, 0x76, 0xb9, 0x32, 0x6b, + 0x27, 0x99, 0x89, 0x96, 0x99, 0x89, 0x2e, 0x32, 0x13, 0xdd, 0x64, 0x26, 0xfa, 0xff, 0xfd, 0x89, + 0x3f, 0xfd, 0x5b, 0x11, 0x39, 0xcd, 0xdc, 0xd2, 0xd7, 0xdb, 0x00, 0x00, 0x00, 0xff, 0xff, 0x13, + 0x35, 0xd0, 0x60, 0x32, 0x03, 0x00, 0x00, } diff --git a/api/services/events/v1/events.proto b/api/services/events/v1/events.proto index 5f7d9a4e1..cb470d1c1 100644 --- a/api/services/events/v1/events.proto +++ b/api/services/events/v1/events.proto @@ -10,18 +10,21 @@ import "google/protobuf/timestamp.proto"; option go_package = "github.com/containerd/containerd/api/services/events/v1;events"; service Events { - rpc Stream(StreamEventsRequest) returns (stream Envelope); - rpc Post(PostEventRequest) returns (google.protobuf.Empty); + rpc Publish(PublishRequest) returns (google.protobuf.Empty); + rpc Subscribe(SubscribeRequest) returns (stream Envelope); } -message StreamEventsRequest {} +message SubscribeRequest { + repeated string filters = 1; +} -message PostEventRequest { +message PublishRequest { Envelope envelope = 1; } message Envelope { google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; - string topic = 2; - google.protobuf.Any event = 3; + string namespace = 2; + string topic = 3; + google.protobuf.Any event = 4; } diff --git a/cmd/ctr/events.go b/cmd/ctr/events.go index 9cd9d29b1..253b7c81b 100644 --- a/cmd/ctr/events.go +++ b/cmd/ctr/events.go @@ -22,7 +22,7 @@ var eventsCommand = cli.Command{ ctx, cancel := appContext(context) defer cancel() - events, err := eventsClient.Stream(ctx, &eventsapi.StreamEventsRequest{}) + events, err := eventsClient.Subscribe(ctx, &eventsapi.SubscribeRequest{}) if err != nil { return err } diff --git a/events/emitter.go b/events/emitter.go deleted file mode 100644 index d69105647..000000000 --- a/events/emitter.go +++ /dev/null @@ -1,67 +0,0 @@ -package events - -import ( - "context" - "sync" - - events "github.com/containerd/containerd/api/services/events/v1" - "github.com/containerd/containerd/namespaces" - goevents "github.com/docker/go-events" -) - -const ( - EventVersion = "v1" -) - -type Emitter struct { - sinks map[string]*eventSink - broadcaster *goevents.Broadcaster - m sync.Mutex -} - -func NewEmitter() *Emitter { - return &Emitter{ - sinks: make(map[string]*eventSink), - broadcaster: goevents.NewBroadcaster(), - m: sync.Mutex{}, - } -} - -func (e *Emitter) Post(ctx context.Context, evt Event) error { - if err := e.broadcaster.Write(&sinkEvent{ - ctx: ctx, - event: evt, - }); err != nil { - return err - } - - return nil -} - -func (e *Emitter) Events(ctx context.Context, clientID string) chan *events.Envelope { - e.m.Lock() - if _, ok := e.sinks[clientID]; !ok { - ns, _ := namespaces.Namespace(ctx) - s := &eventSink{ - ch: make(chan *events.Envelope), - ns: ns, - } - e.sinks[clientID] = s - e.m.Unlock() - e.broadcaster.Add(s) - return s.ch - } - ch := e.sinks[clientID].ch - e.m.Unlock() - - return ch -} - -func (e *Emitter) Remove(clientID string) { - e.m.Lock() - if v, ok := e.sinks[clientID]; ok { - e.broadcaster.Remove(v) - delete(e.sinks, clientID) - } - e.m.Unlock() -} diff --git a/events/event.go b/events/event.go deleted file mode 100644 index f59f0c4a8..000000000 --- a/events/event.go +++ /dev/null @@ -1,3 +0,0 @@ -package events - -type Event interface{} diff --git a/events/events.go b/events/events.go new file mode 100644 index 000000000..d00c9e578 --- /dev/null +++ b/events/events.go @@ -0,0 +1,24 @@ +package events + +import ( + "context" + + events "github.com/containerd/containerd/api/services/events/v1" +) + +type Event interface{} + +// Publisher posts the event. +type Publisher interface { + Publish(ctx context.Context, topic string, event Event) error +} + +type Forwarder interface { + Forward(ctx context.Context, envelope *events.Envelope) error +} + +type publisherFunc func(ctx context.Context, topic string, event Event) error + +func (fn publisherFunc) Publish(ctx context.Context, topic string, event Event) error { + return fn(ctx, topic, event) +} diff --git a/events/events_test.go b/events/events_test.go deleted file mode 100644 index c58fc6225..000000000 --- a/events/events_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package events - -import ( - "context" - "fmt" - "testing" -) - -func TestBasicEvent(t *testing.T) { - ctx := context.Background() - - // simulate a layer pull with events - ctx, commit, _ := WithTx(ctx) - - G(ctx).Post(ctx, "pull ubuntu") - - for layer := 0; layer < 4; layer++ { - // make a subtransaction for each layer - ctx, commit, _ := WithTx(ctx) - - G(ctx).Post(ctx, fmt.Sprintf("fetch layer %v", layer)) - - ctx = WithTopic(ctx, "content") - // simulate sub-operations with a separate topic, on the content store - G(ctx).Post(ctx, fmt.Sprint("received sha:256")) - - G(ctx).Post(ctx, fmt.Sprintf("unpack layer %v", layer)) - - commit() - } - - commit() -} diff --git a/events/exchange.go b/events/exchange.go new file mode 100644 index 000000000..e78e5c204 --- /dev/null +++ b/events/exchange.go @@ -0,0 +1,162 @@ +package events + +import ( + "context" + "strings" + "time" + + events "github.com/containerd/containerd/api/services/events/v1" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/filters" + "github.com/containerd/containerd/identifiers" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/typeurl" + goevents "github.com/docker/go-events" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type Exchange struct { + broadcaster *goevents.Broadcaster +} + +func NewExchange() *Exchange { + return &Exchange{ + broadcaster: goevents.NewBroadcaster(), + } +} + +// Forward accepts an envelope to be direcly distributed on the exchange. +func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) error { + log.G(ctx).WithFields(logrus.Fields{ + "topic": envelope.Topic, + "ns": envelope.Namespace, + "type": envelope.Event.TypeUrl, + }).Debug("forward event") + + if err := namespaces.Validate(envelope.Namespace); err != nil { + return errors.Wrapf(err, "event envelope has invalid namespace") + } + + if err := validateTopic(envelope.Topic); err != nil { + return errors.Wrapf(err, "envelope topic %q", envelope.Topic) + } + + return e.broadcaster.Write(envelope) +} + +// Publish packages and sends an event. The caller will be considered the +// initial publisher of the event. This means the timestamp will be calculated +// at this point and this method may read from the calling context. +func (e *Exchange) Publish(ctx context.Context, topic string, event Event) error { + namespace, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return errors.Wrapf(err, "failed publishing event") + } + if err := validateTopic(topic); err != nil { + return errors.Wrapf(err, "envelope topic %q", topic) + } + + evany, err := typeurl.MarshalAny(event) + if err != nil { + return err + } + env := events.Envelope{ + Timestamp: time.Now().UTC(), + Topic: topic, + Event: evany, + } + if err := e.broadcaster.Write(&env); err != nil { + return err + } + + log.G(ctx).WithFields(logrus.Fields{ + "topic": topic, + "type": evany.TypeUrl, + "ns": namespace, + }).Debug("published event") + return nil +} + +// Subscribe to events on the exchange. Events are sent through the returned +// channel ch. If an error is encountered, it will be sent on channel errs and +// errs will be closed. To end the subscription, cancel the provided context. +func (e *Exchange) Subscribe(ctx context.Context, filters ...filters.Filter) (ch <-chan *events.Envelope, errs <-chan error) { + var ( + evch = make(chan *events.Envelope) + errq = make(chan error, 1) + channel = goevents.NewChannel(0) + queue = goevents.NewQueue(channel) + ) + + // TODO(stevvooe): Insert the filter! + + e.broadcaster.Add(queue) + + go func() { + defer close(errq) + defer e.broadcaster.Remove(queue) + defer queue.Close() + + var err error + loop: + for { + select { + case ev := <-channel.C: + env, ok := ev.(*events.Envelope) + if !ok { + // TODO(stevvooe): For the most part, we are well protected + // from this condition. Both Forward and Publish protect + // from this. + err = errors.Errorf("invalid envelope encountered %#v; please file a bug", ev) + break + } + + select { + case evch <- env: + case <-ctx.Done(): + break loop + } + case <-ctx.Done(): + break loop + } + } + + if err == nil { + if cerr := ctx.Err(); cerr != context.Canceled { + err = cerr + } + } + + errq <- err + }() + + ch = evch + errs = errq + + return +} + +func validateTopic(topic string) error { + if topic == "" { + return errors.Wrap(errdefs.ErrInvalidArgument, "must not be empty") + } + + if topic[0] != '/' { + return errors.Wrapf(errdefs.ErrInvalidArgument, "must start with '/'", topic) + } + + if len(topic) == 1 { + return errors.Wrapf(errdefs.ErrInvalidArgument, "must have at least one component", topic) + } + + components := strings.Split(topic[1:], "/") + for _, component := range components { + if err := identifiers.Validate(component); err != nil { + return errors.Wrapf(err, "failed validation on component %q", component) + } + } + + return nil +} diff --git a/events/exchange_test.go b/events/exchange_test.go new file mode 100644 index 000000000..dcd5859bf --- /dev/null +++ b/events/exchange_test.go @@ -0,0 +1,149 @@ +package events + +import ( + "context" + "fmt" + "reflect" + "sync" + "testing" + + events "github.com/containerd/containerd/api/services/events/v1" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/typeurl" + "github.com/pkg/errors" +) + +func TestExchangeBasic(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), t.Name()) + testevents := []Event{ + &events.ContainerCreate{ID: "asdf"}, + &events.ContainerCreate{ID: "qwer"}, + &events.ContainerCreate{ID: "zxcv"}, + } + exchange := NewExchange() + + t.Log("subscribe") + var cancel1, cancel2 func() + + // Create two subscribers for same set of events and make sure they + // traverse the exchange. + ctx1, cancel1 := context.WithCancel(ctx) + eventq1, errq1 := exchange.Subscribe(ctx1) + + ctx2, cancel2 := context.WithCancel(ctx) + eventq2, errq2 := exchange.Subscribe(ctx2) + + t.Log("publish") + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for _, event := range testevents { + fmt.Println("publish", event) + if err := exchange.Publish(ctx, "/test", event); err != nil { + fmt.Println("publish error", err) + t.Fatal(err) + } + } + + t.Log("finished publishing") + }() + + t.Log("waiting") + wg.Wait() + + for _, subscriber := range []struct { + eventq <-chan *events.Envelope + errq <-chan error + cancel func() + }{ + { + eventq: eventq1, + errq: errq1, + cancel: cancel1, + }, + { + eventq: eventq2, + errq: errq2, + cancel: cancel2, + }, + } { + var received []Event + subscribercheck: + for { + select { + case env := <-subscriber.eventq: + ev, err := typeurl.UnmarshalAny(env.Event) + if err != nil { + t.Fatal(err) + } + received = append(received, ev.(*events.ContainerCreate)) + case err := <-subscriber.errq: + if err != nil { + t.Fatal(err) + } + break subscribercheck + } + + if reflect.DeepEqual(received, testevents) { + // when we do this, we expect the errs channel to be closed and + // this will return. + subscriber.cancel() + } + } + } +} + +func TestExchangeValidateTopic(t *testing.T) { + namespace := t.Name() + ctx := namespaces.WithNamespace(context.Background(), namespace) + exchange := NewExchange() + + for _, testcase := range []struct { + input string + err error + }{ + { + input: "/test", + }, + { + input: "/test/test", + }, + { + input: "test", + err: errdefs.ErrInvalidArgument, + }, + } { + t.Run(testcase.input, func(t *testing.T) { + event := &events.ContainerCreate{ID: t.Name()} + if err := exchange.Publish(ctx, testcase.input, event); errors.Cause(err) != testcase.err { + if err == nil { + t.Fatalf("expected error %v, received nil", testcase.err) + } else { + t.Fatalf("expected error %v, received %v", testcase.err, err) + } + } + + evany, err := typeurl.MarshalAny(event) + if err != nil { + t.Fatal(err) + } + + envelope := events.Envelope{ + Namespace: namespace, + Topic: testcase.input, + Event: evany, + } + // make sure we get same errors with forward. + if err := exchange.Forward(ctx, &envelope); errors.Cause(err) != testcase.err { + if err == nil { + t.Fatalf("expected error %v, received nil", testcase.err) + } else { + t.Fatalf("expected error %v, received %v", testcase.err, err) + } + } + + }) + } +} diff --git a/events/poster.go b/events/poster.go deleted file mode 100644 index 124f8b79b..000000000 --- a/events/poster.go +++ /dev/null @@ -1,65 +0,0 @@ -package events - -import ( - "context" - - "github.com/containerd/containerd/log" - "github.com/containerd/containerd/namespaces" - "github.com/sirupsen/logrus" -) - -var ( - G = GetPoster -) - -// Poster posts the event. -type Poster interface { - Post(ctx context.Context, evt Event) error -} - -type posterKey struct{} - -func WithPoster(ctx context.Context, poster Poster) context.Context { - return context.WithValue(ctx, posterKey{}, poster) -} - -func GetPoster(ctx context.Context) Poster { - poster := ctx.Value(posterKey{}) - - if poster == nil { - tx, _ := getTx(ctx) - topic := getTopic(ctx) - - // likely means we don't have a configured event system. Just return - // the default poster, which merely logs events. - return posterFunc(func(ctx context.Context, evt Event) error { - fields := logrus.Fields{"event": evt} - - if topic != "" { - fields["topic"] = topic - } - ns, _ := namespaces.Namespace(ctx) - fields["ns"] = ns - - if tx != nil { - fields["tx.id"] = tx.id - if tx.parent != nil { - fields["tx.parent.id"] = tx.parent.id - } - } - - log.G(ctx).WithFields(fields).Debug("event fired") - - return nil - }) - } - - return poster.(Poster) -} - -type posterFunc func(ctx context.Context, evt Event) error - -func (fn posterFunc) Post(ctx context.Context, evt Event) error { - fn(ctx, evt) - return nil -} diff --git a/events/sink.go b/events/sink.go deleted file mode 100644 index 9246e40aa..000000000 --- a/events/sink.go +++ /dev/null @@ -1,65 +0,0 @@ -package events - -import ( - "context" - "time" - - "github.com/containerd/containerd/api/services/events/v1" - "github.com/containerd/containerd/log" - "github.com/containerd/containerd/namespaces" - "github.com/containerd/containerd/typeurl" - goevents "github.com/docker/go-events" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -type sinkEvent struct { - ctx context.Context - event Event -} - -type eventSink struct { - ns string - ch chan *events.Envelope -} - -func (s *eventSink) Write(evt goevents.Event) error { - e, ok := evt.(*sinkEvent) - if !ok { - return errors.New("event is not a sink event") - } - - ns, _ := namespaces.Namespace(e.ctx) - if ns != "" && ns != s.ns { - // ignore events not intended for this ns - return nil - } - - if ev, ok := e.event.(*events.Envelope); ok { - s.ch <- ev - return nil - } - topic := getTopic(e.ctx) - - eventData, err := typeurl.MarshalAny(e.event) - if err != nil { - return err - } - - log.G(e.ctx).WithFields(logrus.Fields{ - "topic": topic, - "type": eventData.TypeUrl, - "ns": ns, - }).Debug("event") - - s.ch <- &events.Envelope{ - Timestamp: time.Now(), - Topic: topic, - Event: eventData, - } - return nil -} - -func (s *eventSink) Close() error { - return nil -} diff --git a/events/topic.go b/events/topic.go deleted file mode 100644 index 5f371a118..000000000 --- a/events/topic.go +++ /dev/null @@ -1,35 +0,0 @@ -package events - -import "context" - -type topicKey struct{} - -// WithTopic returns a context with a new topic set, such that events emitted -// from the resulting context will be marked with the topic. -// -// A topic groups events by the target module they operate on. This is -// primarily designed to support multi-module log compaction of events. In -// typical journaling systems, the entries operate on a single data structure. -// When compacting the journal, we can replace all former log entries with a -// summary data structure that will result in the same state. -// -// By providing a compaction mechanism by topic, we can prune down to a data -// structure oriented towards a single topic, leaving unrelated messages alone. -func WithTopic(ctx context.Context, topic string) context.Context { - return context.WithValue(ctx, topicKey{}, topic) -} - -func getTopic(ctx context.Context) string { - topic := ctx.Value(topicKey{}) - - if topic == nil { - return "" - } - - return topic.(string) -} - -// RegisterCompactor sets the compacter for the given topic. -func RegisterCompactor(topic string, compactor interface{}) { - panic("not implemented") -} diff --git a/events/transaction.go b/events/transaction.go deleted file mode 100644 index 11a2cd40b..000000000 --- a/events/transaction.go +++ /dev/null @@ -1,96 +0,0 @@ -package events - -import ( - "context" - "fmt" - "sync" - "sync/atomic" - "time" -) - -var txCounter int64 // replace this with something that won't break - -// nextTXID provides the next transaction identifier. -func nexttxID() int64 { - // TODO(stevvooe): Need to coordinate this with existing transaction logs. - // For now, this is a toy, but not a racy one. - return atomic.AddInt64(&txCounter, 1) -} - -type transaction struct { - ctx context.Context - id int64 - parent *transaction // if nil, no parent transaction - finish sync.Once - start, end time.Time // informational -} - -// begin creates a sub-transaction. -func (tx *transaction) begin(ctx context.Context, poster Poster) *transaction { - id := nexttxID() - - child := &transaction{ - ctx: ctx, - id: id, - parent: tx, - start: time.Now(), - } - - // post the transaction started event - poster.Post(ctx, child.makeTransactionEvent("begin")) // transactions are really just events - - return child -} - -// commit writes out the transaction. -func (tx *transaction) commit(poster Poster) { - tx.finish.Do(func() { - tx.end = time.Now() - poster.Post(tx.ctx, tx.makeTransactionEvent("commit")) - }) -} - -func (tx *transaction) rollback(poster Poster, cause error) { - tx.finish.Do(func() { - tx.end = time.Now() - event := tx.makeTransactionEvent("rollback") - event = fmt.Sprintf("%s error=%q", event, cause.Error()) - poster.Post(tx.ctx, event) - }) -} - -func (tx *transaction) makeTransactionEvent(status string) Event { - // TODO(stevvooe): obviously, we need more structure than this. - event := fmt.Sprintf("%v %v", status, tx.id) - if tx.parent != nil { - event += " parent=" + fmt.Sprint(tx.parent.id) - } - - return event -} - -type txKey struct{} - -func getTx(ctx context.Context) (*transaction, bool) { - tx := ctx.Value(txKey{}) - if tx == nil { - return nil, false - } - - return tx.(*transaction), true -} - -// WithTx returns a new context with an event transaction, such that events -// posted to the underlying context will be committed to the event log as a -// group, organized by a transaction id, when commit is called. -func WithTx(pctx context.Context) (ctx context.Context, commit func(), rollback func(err error)) { - poster := G(pctx) - parent, _ := getTx(pctx) - tx := parent.begin(pctx, poster) - - return context.WithValue(pctx, txKey{}, tx), func() { - tx.commit(poster) - }, func(err error) { - tx.rollback(poster, err) - } -} diff --git a/linux/shim/local.go b/linux/shim/local.go index 10215ea41..c7d23b8a6 100644 --- a/linux/shim/local.go +++ b/linux/shim/local.go @@ -89,17 +89,16 @@ func (c *local) Update(ctx context.Context, in *shimapi.UpdateTaskRequest, opts return c.s.Update(ctx, in) } -type poster interface { - Post(ctx context.Context, in *events.PostEventRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) +type publisher interface { + Publish(ctx context.Context, in *events.PublishRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) } type localEventsClient struct { - emitter evt.Poster + forwarder evt.Forwarder } -func (l *localEventsClient) Post(ctx context.Context, r *events.PostEventRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { - ctx = evt.WithTopic(ctx, r.Envelope.Topic) - if err := l.emitter.Post(ctx, r.Envelope); err != nil { +func (l *localEventsClient) Publish(ctx context.Context, r *events.PublishRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { + if err := l.forwarder.Forward(ctx, r.Envelope); err != nil { return nil, err } return empty, nil diff --git a/linux/shim/service.go b/linux/shim/service.go index a767f3710..5555394a8 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -37,7 +37,7 @@ func NewService(path, namespace, address string) (*Service, error) { return nil, fmt.Errorf("shim namespace cannot be empty") } context := namespaces.WithNamespace(context.Background(), namespace) - var client poster + var client publisher if address != "" { conn, err := connect(address, dialer) if err != nil { @@ -46,7 +46,7 @@ func NewService(path, namespace, address string) (*Service, error) { client = events.NewEventsClient(conn) } else { client = &localEventsClient{ - emitter: evt.GetPoster(context), + forwarder: evt.NewExchange(), } } s := &Service{ @@ -379,16 +379,18 @@ func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, er return pids, nil } -func (s *Service) forward(client poster) { +func (s *Service) forward(client publisher) { for e := range s.events { a, err := typeurl.MarshalAny(e) if err != nil { log.G(s.context).WithError(err).Error("marshal event") continue } - if _, err := client.Post(s.context, &events.PostEventRequest{ + + if _, err := client.Publish(s.context, &events.PublishRequest{ Envelope: &events.Envelope{ - Timestamp: time.Now(), + Namespace: s.namespace, + Timestamp: time.Now().UTC(), Topic: getTopic(e), Event: a, }, diff --git a/metrics/cgroups/cgroups.go b/metrics/cgroups/cgroups.go index e2adc7d15..5ef16c0c3 100644 --- a/metrics/cgroups/cgroups.go +++ b/metrics/cgroups/cgroups.go @@ -4,8 +4,8 @@ package cgroups import ( "github.com/containerd/cgroups" - events "github.com/containerd/containerd/api/services/events/v1" - evt "github.com/containerd/containerd/events" + eventsapi "github.com/containerd/containerd/api/services/events/v1" + "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime" @@ -35,7 +35,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { collector: collector, oom: oom, context: ic.Context, - emitter: ic.Emitter, + publisher: ic.Events, }, nil } @@ -43,7 +43,7 @@ type cgroupsMonitor struct { collector *Collector oom *OOMCollector context context.Context - emitter *evt.Emitter + publisher events.Publisher } func (m *cgroupsMonitor) Monitor(c runtime.Task) error { @@ -69,7 +69,7 @@ func (m *cgroupsMonitor) Stop(c runtime.Task) error { } func (m *cgroupsMonitor) trigger(id string, cg cgroups.Cgroup) { - if err := m.emitter.Post(m.context, &events.TaskOOM{ + if err := m.publisher.Publish(m.context, runtime.TaskOOMEventTopic, &eventsapi.TaskOOM{ ContainerID: id, }); err != nil { log.G(m.context).WithError(err).Error("post OOM event") diff --git a/plugin/context.go b/plugin/context.go index 57e190d95..16cf1b8b7 100644 --- a/plugin/context.go +++ b/plugin/context.go @@ -22,7 +22,7 @@ type InitContext struct { Address string Context context.Context Config interface{} - Emitter *events.Emitter + Events *events.Exchange plugins map[PluginType]map[string]interface{} } diff --git a/process.go b/process.go index a3b4b3565..82473ea2b 100644 --- a/process.go +++ b/process.go @@ -64,7 +64,7 @@ func (p *process) Kill(ctx context.Context, s syscall.Signal) error { } func (p *process) Wait(ctx context.Context) (uint32, error) { - eventstream, err := p.task.client.EventService().Stream(ctx, &eventsapi.StreamEventsRequest{}) + eventstream, err := p.task.client.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{}) if err != nil { return UnknownExitStatus, err } diff --git a/server/server.go b/server/server.go index b2a260b11..1d205261f 100644 --- a/server/server.go +++ b/server/server.go @@ -53,8 +53,8 @@ func New(ctx context.Context, config *Config) (*Server, error) { var ( services []plugin.Service s = &Server{ - rpc: rpc, - emitter: events.NewEmitter(), + rpc: rpc, + events: events.NewExchange(), } initialized = make(map[plugin.PluginType]map[string]interface{}) ) @@ -63,12 +63,12 @@ func New(ctx context.Context, config *Config) (*Server, error) { log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id) initContext := plugin.NewContext( - events.WithPoster(ctx, s.emitter), + ctx, initialized, config.Root, id, ) - initContext.Emitter = s.emitter + initContext.Events = s.events initContext.Address = config.GRPC.Address // load the plugin specific configuration if it is provided @@ -112,8 +112,8 @@ func New(ctx context.Context, config *Config) (*Server, error) { // Server is the containerd main daemon type Server struct { - rpc *grpc.Server - emitter *events.Emitter + rpc *grpc.Server + events *events.Exchange } // ServeGRPC provides the containerd grpc APIs on the provided listener diff --git a/services/containers/service.go b/services/containers/service.go index c83c5b67a..74db689df 100644 --- a/services/containers/service.go +++ b/services/containers/service.go @@ -24,23 +24,22 @@ func init() { plugin.MetadataPlugin, }, Init: func(ic *plugin.InitContext) (interface{}, error) { - e := events.GetPoster(ic.Context) m, err := ic.Get(plugin.MetadataPlugin) if err != nil { return nil, err } - return NewService(m.(*bolt.DB), e), nil + return NewService(m.(*bolt.DB), ic.Events), nil }, }) } type Service struct { - db *bolt.DB - emitter events.Poster + db *bolt.DB + publisher events.Publisher } -func NewService(db *bolt.DB, evts events.Poster) api.ContainersServer { - return &Service{db: db, emitter: evts} +func NewService(db *bolt.DB, publisher events.Publisher) api.ContainersServer { + return &Service{db: db, publisher: publisher} } func (s *Service) Register(server *grpc.Server) error { @@ -94,7 +93,7 @@ func (s *Service) Create(ctx context.Context, req *api.CreateContainerRequest) ( }); err != nil { return &resp, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/containers/create", &eventsapi.ContainerCreate{ + if err := s.publisher.Publish(ctx, "/containers/create", &eventsapi.ContainerCreate{ ID: resp.Container.ID, Image: resp.Container.Image, Runtime: &eventsapi.ContainerCreate_Runtime{ @@ -136,7 +135,7 @@ func (s *Service) Update(ctx context.Context, req *api.UpdateContainerRequest) ( return &resp, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/containers/update", &eventsapi.ContainerUpdate{ + if err := s.publisher.Publish(ctx, "/containers/update", &eventsapi.ContainerUpdate{ ID: resp.Container.ID, Image: resp.Container.Image, Labels: resp.Container.Labels, @@ -155,7 +154,7 @@ func (s *Service) Delete(ctx context.Context, req *api.DeleteContainerRequest) ( return &empty.Empty{}, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/containers/delete", &eventsapi.ContainerDelete{ + if err := s.publisher.Publish(ctx, "/containers/delete", &eventsapi.ContainerDelete{ ID: req.ID, }); err != nil { return &empty.Empty{}, err @@ -175,12 +174,3 @@ func (s *Service) withStoreView(ctx context.Context, fn func(ctx context.Context func (s *Service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error { return s.db.Update(s.withStore(ctx, fn)) } - -func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error { - emitterCtx := events.WithTopic(ctx, topic) - if err := s.emitter.Post(emitterCtx, evt); err != nil { - return err - } - - return nil -} diff --git a/services/content/service.go b/services/content/service.go index c8698df8a..3b972e481 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -23,8 +23,8 @@ import ( ) type Service struct { - store content.Store - emitter events.Poster + store content.Store + publisher events.Publisher } var bufPool = sync.Pool{ @@ -58,8 +58,8 @@ func NewService(ic *plugin.InitContext) (interface{}, error) { } cs := metadata.NewContentStore(m.(*bolt.DB), c.(content.Store)) return &Service{ - store: cs, - emitter: events.GetPoster(ic.Context), + store: cs, + publisher: ic.Events, }, nil } @@ -149,7 +149,7 @@ func (s *Service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*e return nil, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/content/delete", &eventsapi.ContentDelete{ + if err := s.publisher.Publish(ctx, "/content/delete", &eventsapi.ContentDelete{ Digest: req.Digest, }); err != nil { return nil, err @@ -459,12 +459,3 @@ func (s *Service) Abort(ctx context.Context, req *api.AbortRequest) (*empty.Empt return &empty.Empty{}, nil } - -func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error { - emitterCtx := events.WithTopic(ctx, topic) - if err := s.emitter.Post(emitterCtx, evt); err != nil { - return err - } - - return nil -} diff --git a/services/events/service.go b/services/events/service.go index 48822a5ea..30f630d27 100644 --- a/services/events/service.go +++ b/services/events/service.go @@ -1,14 +1,13 @@ package events import ( - "fmt" - "time" - api "github.com/containerd/containerd/api/services/events/v1" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" + "github.com/containerd/containerd/filters" "github.com/containerd/containerd/plugin" "github.com/golang/protobuf/ptypes/empty" - "github.com/sirupsen/logrus" + "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" ) @@ -18,18 +17,17 @@ func init() { Type: plugin.GRPCPlugin, ID: "events", Init: func(ic *plugin.InitContext) (interface{}, error) { - return NewService(ic.Emitter), nil + return NewService(ic.Events), nil }, }) } type Service struct { - emitter *events.Emitter - timeouts map[string]*time.Timer + events *events.Exchange } -func NewService(e *events.Emitter) api.EventsServer { - return &Service{emitter: e} +func NewService(events *events.Exchange) api.EventsServer { + return &Service{events: events} } func (s *Service) Register(server *grpc.Server) error { @@ -37,28 +35,36 @@ func (s *Service) Register(server *grpc.Server) error { return nil } -func (s *Service) Stream(req *api.StreamEventsRequest, srv api.Events_StreamServer) error { - clientID := fmt.Sprintf("%d", time.Now().UnixNano()) +func (s *Service) Subscribe(req *api.SubscribeRequest, srv api.Events_SubscribeServer) error { + ctx, cancel := context.WithCancel(srv.Context()) + defer cancel() + + filter, err := filters.ParseAll(req.Filters...) + if err != nil { + return errdefs.ToGRPC(err) + } + + eventq, errq := s.events.Subscribe(ctx, filter) for { - e := <-s.emitter.Events(srv.Context(), clientID) - // upon the client event timeout this will be nil; ignore - if e == nil { + select { + case ev := <-eventq: + if err := srv.Send(ev); err != nil { + return errors.Wrapf(err, "failed sending event to subscriber") + } + case err := <-errq: + if err != nil { + return errors.Wrapf(err, "subscription error") + } + return nil } - if err := srv.Send(e); err != nil { - logrus.WithFields(logrus.Fields{ - "client": clientID, - }).Debug("error sending event; unsubscribing client") - s.emitter.Remove(clientID) - return err - } } } -func (s *Service) Post(ctx context.Context, r *api.PostEventRequest) (*empty.Empty, error) { - ctx = events.WithTopic(ctx, r.Envelope.Topic) - if err := s.emitter.Post(ctx, r.Envelope); err != nil { - return nil, err +func (s *Service) Publish(ctx context.Context, r *api.PublishRequest) (*empty.Empty, error) { + if err := s.events.Forward(ctx, r.Envelope); err != nil { + return nil, errdefs.ToGRPC(err) } + return &empty.Empty{}, nil } diff --git a/services/images/service.go b/services/images/service.go index a6a78ff97..d8499aa43 100644 --- a/services/images/service.go +++ b/services/images/service.go @@ -24,25 +24,24 @@ func init() { plugin.MetadataPlugin, }, Init: func(ic *plugin.InitContext) (interface{}, error) { - e := events.GetPoster(ic.Context) m, err := ic.Get(plugin.MetadataPlugin) if err != nil { return nil, err } - return NewService(m.(*bolt.DB), e), nil + return NewService(m.(*bolt.DB), ic.Events), nil }, }) } type Service struct { - db *bolt.DB - emitter events.Poster + db *bolt.DB + publisher events.Publisher } -func NewService(db *bolt.DB, evts events.Poster) imagesapi.ImagesServer { +func NewService(db *bolt.DB, publisher events.Publisher) imagesapi.ImagesServer { return &Service{ - db: db, - emitter: evts, + db: db, + publisher: publisher, } } @@ -100,7 +99,7 @@ func (s *Service) Create(ctx context.Context, req *imagesapi.CreateImageRequest) return nil, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/images/create", &eventsapi.ImageCreate{ + if err := s.publisher.Publish(ctx, "/images/create", &eventsapi.ImageCreate{ Name: resp.Image.Name, Labels: resp.Image.Labels, }); err != nil { @@ -139,7 +138,7 @@ func (s *Service) Update(ctx context.Context, req *imagesapi.UpdateImageRequest) return nil, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/images/update", &eventsapi.ImageUpdate{ + if err := s.publisher.Publish(ctx, "/images/update", &eventsapi.ImageUpdate{ Name: resp.Image.Name, Labels: resp.Image.Labels, }); err != nil { @@ -156,7 +155,7 @@ func (s *Service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest) return nil, err } - if err := s.emit(ctx, "/images/delete", &eventsapi.ImageDelete{ + if err := s.publisher.Publish(ctx, "/images/delete", &eventsapi.ImageDelete{ Name: req.Name, }); err != nil { return nil, err @@ -176,12 +175,3 @@ func (s *Service) withStoreView(ctx context.Context, fn func(ctx context.Context func (s *Service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store images.Store) error) error { return s.db.Update(s.withStore(ctx, fn)) } - -func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error { - emitterCtx := events.WithTopic(ctx, topic) - if err := s.emitter.Post(emitterCtx, evt); err != nil { - return err - } - - return nil -} diff --git a/services/namespaces/service.go b/services/namespaces/service.go index d0de39295..0891670cb 100644 --- a/services/namespaces/service.go +++ b/services/namespaces/service.go @@ -25,27 +25,26 @@ func init() { plugin.MetadataPlugin, }, Init: func(ic *plugin.InitContext) (interface{}, error) { - e := events.GetPoster(ic.Context) m, err := ic.Get(plugin.MetadataPlugin) if err != nil { return nil, err } - return NewService(m.(*bolt.DB), e), nil + return NewService(m.(*bolt.DB), ic.Events), nil }, }) } type Service struct { - db *bolt.DB - emitter events.Poster + db *bolt.DB + publisher events.Publisher } var _ api.NamespacesServer = &Service{} -func NewService(db *bolt.DB, evts events.Poster) api.NamespacesServer { +func NewService(db *bolt.DB, publisher events.Publisher) api.NamespacesServer { return &Service{ - db: db, - emitter: evts, + db: db, + publisher: publisher, } } @@ -119,7 +118,7 @@ func (s *Service) Create(ctx context.Context, req *api.CreateNamespaceRequest) ( return &resp, err } - if err := s.emit(ctx, "/namespaces/create", &eventsapi.NamespaceCreate{ + if err := s.publisher.Publish(ctx, "/namespaces/create", &eventsapi.NamespaceCreate{ Name: req.Namespace.Name, Labels: req.Namespace.Labels, }); err != nil { @@ -172,7 +171,7 @@ func (s *Service) Update(ctx context.Context, req *api.UpdateNamespaceRequest) ( return &resp, err } - if err := s.emit(ctx, "/namespaces/update", &eventsapi.NamespaceUpdate{ + if err := s.publisher.Publish(ctx, "/namespaces/update", &eventsapi.NamespaceUpdate{ Name: req.Namespace.Name, Labels: req.Namespace.Labels, }); err != nil { @@ -189,7 +188,7 @@ func (s *Service) Delete(ctx context.Context, req *api.DeleteNamespaceRequest) ( return &empty.Empty{}, err } - if err := s.emit(ctx, "/namespaces/delete", &eventsapi.NamespaceDelete{ + if err := s.publisher.Publish(ctx, "/namespaces/delete", &eventsapi.NamespaceDelete{ Name: req.Name, }); err != nil { return &empty.Empty{}, err @@ -209,12 +208,3 @@ func (s *Service) withStoreView(ctx context.Context, fn func(ctx context.Context func (s *Service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store namespaces.Store) error) error { return s.db.Update(s.withStore(ctx, fn)) } - -func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error { - emitterCtx := events.WithTopic(ctx, topic) - if err := s.emitter.Post(emitterCtx, evt); err != nil { - return err - } - - return nil -} diff --git a/services/snapshot/service.go b/services/snapshot/service.go index 3972f0f0d..8b105e8a2 100644 --- a/services/snapshot/service.go +++ b/services/snapshot/service.go @@ -45,11 +45,10 @@ var empty = &protoempty.Empty{} type service struct { snapshotters map[string]snapshot.Snapshotter defaultSnapshotterName string - emitter events.Poster + publisher events.Publisher } func newService(ic *plugin.InitContext) (interface{}, error) { - evts := events.GetPoster(ic.Context) rawSnapshotters, err := ic.GetAll(plugin.SnapshotPlugin) if err != nil { return nil, err @@ -72,7 +71,7 @@ func newService(ic *plugin.InitContext) (interface{}, error) { return &service{ snapshotters: snapshotters, defaultSnapshotterName: cfg.Default, - emitter: evts, + publisher: ic.Events, }, nil } @@ -105,7 +104,7 @@ func (s *service) Prepare(ctx context.Context, pr *snapshotapi.PrepareSnapshotRe return nil, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/snapshot/prepare", &eventsapi.SnapshotPrepare{ + if err := s.publisher.Publish(ctx, "/snapshot/prepare", &eventsapi.SnapshotPrepare{ Key: pr.Key, Parent: pr.Parent, }); err != nil { @@ -162,7 +161,7 @@ func (s *service) Commit(ctx context.Context, cr *snapshotapi.CommitSnapshotRequ return nil, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/snapshot/commit", &eventsapi.SnapshotCommit{ + if err := s.publisher.Publish(ctx, "/snapshot/commit", &eventsapi.SnapshotCommit{ Key: cr.Key, Name: cr.Name, }); err != nil { @@ -183,7 +182,7 @@ func (s *service) Remove(ctx context.Context, rr *snapshotapi.RemoveSnapshotRequ return nil, errdefs.ToGRPC(err) } - if err := s.emit(ctx, "/snapshot/remove", &eventsapi.SnapshotRemove{ + if err := s.publisher.Publish(ctx, "/snapshot/remove", &eventsapi.SnapshotRemove{ Key: rr.Key, }); err != nil { return nil, err @@ -293,12 +292,3 @@ func fromMounts(mounts []mount.Mount) []*types.Mount { } return out } - -func (s *service) emit(ctx context.Context, topic string, evt interface{}) error { - emitterCtx := events.WithTopic(ctx, topic) - if err := s.emitter.Post(emitterCtx, evt); err != nil { - return err - } - - return nil -} diff --git a/services/tasks/service.go b/services/tasks/service.go index e794d6ae8..a6764ccdb 100644 --- a/services/tasks/service.go +++ b/services/tasks/service.go @@ -67,20 +67,19 @@ func New(ic *plugin.InitContext) (interface{}, error) { r := rr.(runtime.Runtime) runtimes[r.ID()] = r } - e := events.GetPoster(ic.Context) return &Service{ - runtimes: runtimes, - db: m.(*bolt.DB), - store: cs, - emitter: e, + runtimes: runtimes, + db: m.(*bolt.DB), + store: cs, + publisher: ic.Events, }, nil } type Service struct { - runtimes map[string]runtime.Runtime - db *bolt.DB - store content.Store - emitter events.Poster + runtimes map[string]runtime.Runtime + db *bolt.DB + store content.Store + publisher events.Publisher } func (s *Service) Register(server *grpc.Server) error { @@ -502,12 +501,3 @@ func (s *Service) getRuntime(name string) (runtime.Runtime, error) { } return runtime, nil } - -func (s *Service) emit(ctx context.Context, topic string, evt interface{}) error { - emitterCtx := events.WithTopic(ctx, topic) - if err := s.emitter.Post(emitterCtx, evt); err != nil { - return err - } - - return nil -} diff --git a/task.go b/task.go index 015c8cc7c..87f95a11d 100644 --- a/task.go +++ b/task.go @@ -163,7 +163,7 @@ func (t *task) Status(ctx context.Context) (TaskStatus, error) { // Wait is a blocking call that will wait for the task to exit and return the exit status func (t *task) Wait(ctx context.Context) (uint32, error) { - eventstream, err := t.client.EventService().Stream(ctx, &eventsapi.StreamEventsRequest{}) + eventstream, err := t.client.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{}) if err != nil { return UnknownExitStatus, errdefs.FromGRPC(err) } diff --git a/windows/runtime.go b/windows/runtime.go index 5ab31c034..2ea448477 100644 --- a/windows/runtime.go +++ b/windows/runtime.go @@ -63,8 +63,8 @@ func New(ic *plugin.InitContext) (interface{}, error) { root: ic.Root, pidPool: newPidPool(), - events: make(chan interface{}, 4096), - emitter: ic.Emitter, + events: make(chan interface{}, 4096), + publisher: ic.Events, // TODO(mlaventure): windows needs a stat monitor monitor: nil, tasks: runtime.NewTaskList(), @@ -84,8 +84,8 @@ type windowsRuntime struct { root string pidPool *pidPool - emitter *events.Emitter - events chan interface{} + publisher events.Publisher + events chan interface{} monitor runtime.TaskMonitor tasks *runtime.TaskList @@ -194,7 +194,8 @@ func (r *windowsRuntime) Delete(ctx context.Context, t runtime.Task) (*runtime.E wt.cleanup() r.tasks.Delete(ctx, t) - r.emitter.Post(events.WithTopic(ctx, runtime.TaskDeleteEventTopic), + r.publisher.Publish(ctx, + runtime.TaskDeleteEventTopic, &eventsapi.TaskDelete{ ContainerID: wt.id, Pid: wt.pid, @@ -296,7 +297,7 @@ func (r *windowsRuntime) newTask(ctx context.Context, namespace, id string, spec spec: spec, processes: make(map[string]*process), hyperV: spec.Windows.HyperV != nil, - emitter: r.emitter, + publisher: r.publisher, rwLayer: conf.LayerFolderPath, pidPool: r.pidPool, hcsContainer: ctr, @@ -312,7 +313,8 @@ func (r *windowsRuntime) newTask(ctx context.Context, namespace, id string, spec }) } - r.emitter.Post(events.WithTopic(ctx, runtime.TaskCreateEventTopic), + r.publisher.Publish(ctx, + runtime.TaskCreateEventTopic, &eventsapi.TaskCreate{ ContainerID: id, IO: &eventsapi.TaskIO{ diff --git a/windows/task.go b/windows/task.go index 7e5bbb521..5114a2990 100644 --- a/windows/task.go +++ b/windows/task.go @@ -34,8 +34,8 @@ type task struct { processes map[string]*process hyperV bool - emitter *events.Emitter - rwLayer string + publisher events.Publisher + rwLayer string pidPool *pidPool hcsContainer hcsshim.Container @@ -112,7 +112,8 @@ func (t *task) Start(ctx context.Context) error { return err } - t.emitter.Post(events.WithTopic(ctx, runtime.TaskStartEventTopic), + t.publisher.Publish(ctx, + runtime.TaskStartEventTopic, &eventsapi.TaskStart{ ContainerID: t.id, Pid: t.pid, @@ -130,7 +131,8 @@ func (t *task) Pause(ctx context.Context) error { t.Unlock() } if err == nil { - t.emitter.Post(events.WithTopic(ctx, runtime.TaskPausedEventTopic), + t.publisher.Publish(ctx, + runtime.TaskPausedEventTopic, &eventsapi.TaskPaused{ ContainerID: t.id, }) @@ -150,7 +152,8 @@ func (t *task) Resume(ctx context.Context) error { t.Unlock() } if err == nil { - t.emitter.Post(events.WithTopic(ctx, runtime.TaskResumedEventTopic), + t.publisher.Publish(ctx, + runtime.TaskResumedEventTopic, &eventsapi.TaskResumed{ ContainerID: t.id, }) @@ -195,7 +198,8 @@ func (t *task) Exec(ctx context.Context, id string, opts runtime.ExecOpts) (runt return nil, err } - t.emitter.Post(events.WithTopic(ctx, runtime.TaskExecAddedEventTopic), + t.publisher.Publish(ctx, + runtime.TaskExecAddedEventTopic, &eventsapi.TaskExecAdded{ ContainerID: t.id, ExecID: id, @@ -358,7 +362,8 @@ func (t *task) newProcess(ctx context.Context, id string, conf *hcsshim.ProcessC } wp.exitCode = uint32(ec) - t.emitter.Post(events.WithTopic(ctx, runtime.TaskExitEventTopic), + t.publisher.Publish(ctx, + runtime.TaskExitEventTopic, &eventsapi.TaskExit{ ContainerID: t.id, ID: id,