From af2d7f0e558627f3b29d6346b525807948a9654d Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Fri, 28 Jul 2017 15:21:55 -0700 Subject: [PATCH] events: initial support for filters This change further plumbs the components required for implementing event filters. Specifically, we now have the ability to filter on the `topic` and `namespace`. In the course of implementing this functionality, it was found that there were mismatches in the events API that created extra serialization round trips. A modification to `typeurl.MarshalAny` and a clear separation between publishing and forwarding allow us to avoid these serialization issues. Unfortunately, this has required a few tweaks to the GRPC API, so this is a breaking change. `Publish` and `Forward` have been clearly separated in the GRPC API. `Publish` honors the contextual namespace and performs timestamping while `Forward` simply validates and forwards. The behavior of `Subscribe` is to propagate events for all namespaces unless specifically filtered (and hence the relation to this particular change. The following is an example of using filters to monitor the task events generated while running the [bucketbench tool](https://github.com/estesp/bucketbench): ``` $ ctr events 'topic~=/tasks/.+,namespace==bb' ... 2017-07-28 22:19:51.78944874 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-6-8","pid":25889} 2017-07-28 22:19:51.791893688 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-4-8","pid":25882} 2017-07-28 22:19:51.792608389 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-2-9","pid":25860} 2017-07-28 22:19:51.793035217 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-5-6","pid":25869} 2017-07-28 22:19:51.802659622 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-0-7","pid":25877} 2017-07-28 22:19:51.805192898 +0000 UTC bb /tasks/start {"container_id":"bb-ctr-3-6","pid":25856} 2017-07-28 22:19:51.832374931 +0000 UTC bb /tasks/exit {"container_id":"bb-ctr-8-6","id":"bb-ctr-8-6","pid":25864,"exited_at":"2017-07-28T22:19:51.832013043Z"} 2017-07-28 22:19:51.84001249 +0000 UTC bb /tasks/exit {"container_id":"bb-ctr-2-9","id":"bb-ctr-2-9","pid":25860,"exited_at":"2017-07-28T22:19:51.839717714Z"} 2017-07-28 22:19:51.840272635 +0000 UTC bb /tasks/exit {"container_id":"bb-ctr-7-6","id":"bb-ctr-7-6","pid":25855,"exited_at":"2017-07-28T22:19:51.839796335Z"} ... ``` In addition to the events changes, we now display the namespace origin of the event in the cli tool. This will be followed by a PR to add individual field filtering for the events API for each event type. Signed-off-by: Stephen J Day --- api/services/events/v1/container.pb.go | 3 +- api/services/events/v1/events.pb.go | 594 ++++++++++++++++++------- api/services/events/v1/events.proto | 32 +- cmd/ctr/events.go | 35 +- events/exchange.go | 169 +++++-- events/exchange_test.go | 3 + linux/shim/client.go | 2 +- linux/shim/local.go | 4 +- linux/shim/service.go | 9 +- process.go | 5 +- services/events/service.go | 32 +- task.go | 5 +- typeurl/types.go | 22 +- typeurl/types_test.go | 11 + 14 files changed, 652 insertions(+), 274 deletions(-) diff --git a/api/services/events/v1/container.pb.go b/api/services/events/v1/container.pb.go index 1df8f81dc..2c1167f82 100644 --- a/api/services/events/v1/container.pb.go +++ b/api/services/events/v1/container.pb.go @@ -19,8 +19,9 @@ ContainerUpdate ContainerDelete ContentDelete - SubscribeRequest PublishRequest + ForwardRequest + SubscribeRequest Envelope ImageCreate ImageUpdate diff --git a/api/services/events/v1/events.pb.go b/api/services/events/v1/events.pb.go index f736c9b09..40ae004d3 100644 --- a/api/services/events/v1/events.pb.go +++ b/api/services/events/v1/events.pb.go @@ -32,21 +32,30 @@ var _ = fmt.Errorf var _ = math.Inf var _ = time.Kitchen +type PublishRequest struct { + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Event *google_protobuf1.Any `protobuf:"bytes,2,opt,name=event" json:"event,omitempty"` +} + +func (m *PublishRequest) Reset() { *m = PublishRequest{} } +func (*PublishRequest) ProtoMessage() {} +func (*PublishRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{0} } + +type ForwardRequest struct { + Envelope *Envelope `protobuf:"bytes,1,opt,name=envelope" json:"envelope,omitempty"` +} + +func (m *ForwardRequest) Reset() { *m = ForwardRequest{} } +func (*ForwardRequest) ProtoMessage() {} +func (*ForwardRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{1} } + type SubscribeRequest struct { Filters []string `protobuf:"bytes,1,rep,name=filters" json:"filters,omitempty"` } func (m *SubscribeRequest) Reset() { *m = SubscribeRequest{} } func (*SubscribeRequest) ProtoMessage() {} -func (*SubscribeRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{0} } - -type PublishRequest struct { - Envelope *Envelope `protobuf:"bytes,1,opt,name=envelope" json:"envelope,omitempty"` -} - -func (m *PublishRequest) Reset() { *m = PublishRequest{} } -func (*PublishRequest) ProtoMessage() {} -func (*PublishRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{1} } +func (*SubscribeRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{2} } type Envelope struct { Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,stdtime" json:"timestamp"` @@ -57,11 +66,12 @@ type Envelope struct { func (m *Envelope) Reset() { *m = Envelope{} } func (*Envelope) ProtoMessage() {} -func (*Envelope) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{2} } +func (*Envelope) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{3} } func init() { - proto.RegisterType((*SubscribeRequest)(nil), "containerd.services.events.v1.SubscribeRequest") proto.RegisterType((*PublishRequest)(nil), "containerd.services.events.v1.PublishRequest") + proto.RegisterType((*ForwardRequest)(nil), "containerd.services.events.v1.ForwardRequest") + proto.RegisterType((*SubscribeRequest)(nil), "containerd.services.events.v1.SubscribeRequest") proto.RegisterType((*Envelope)(nil), "containerd.services.events.v1.Envelope") } @@ -76,7 +86,24 @@ const _ = grpc.SupportPackageIsVersion4 // Client API for Events service type EventsClient interface { + // Publish an event to a topic. + // + // The event will be packed into a timestamp envelope with the namespace + // introspected from the context. The envelope will then be dispatched. Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error) + // Forward sends an event that has already been packaged into an envelope + // with a timestamp and namespace. + // + // This is useful if earlier timestamping is required or when fowarding on + // behalf of another component, namespace or publisher. + Forward(ctx context.Context, in *ForwardRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error) + // Subscribe to a stream of events, possibly returning only that match any + // of the provided filters. + // + // Unlike many other methods in containerd, subscribers will get messages + // from all namespaces unless otherwise specified. If this is not desired, + // a filter can be provided in the format 'namespace==' to + // restrict the received events. Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (Events_SubscribeClient, error) } @@ -97,6 +124,15 @@ func (c *eventsClient) Publish(ctx context.Context, in *PublishRequest, opts ... return out, nil } +func (c *eventsClient) Forward(ctx context.Context, in *ForwardRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error) { + out := new(google_protobuf2.Empty) + err := grpc.Invoke(ctx, "/containerd.services.events.v1.Events/Forward", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *eventsClient) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (Events_SubscribeClient, error) { stream, err := grpc.NewClientStream(ctx, &_Events_serviceDesc.Streams[0], c.cc, "/containerd.services.events.v1.Events/Subscribe", opts...) if err != nil { @@ -132,7 +168,24 @@ func (x *eventsSubscribeClient) Recv() (*Envelope, error) { // Server API for Events service type EventsServer interface { + // Publish an event to a topic. + // + // The event will be packed into a timestamp envelope with the namespace + // introspected from the context. The envelope will then be dispatched. Publish(context.Context, *PublishRequest) (*google_protobuf2.Empty, error) + // Forward sends an event that has already been packaged into an envelope + // with a timestamp and namespace. + // + // This is useful if earlier timestamping is required or when fowarding on + // behalf of another component, namespace or publisher. + Forward(context.Context, *ForwardRequest) (*google_protobuf2.Empty, error) + // Subscribe to a stream of events, possibly returning only that match any + // of the provided filters. + // + // Unlike many other methods in containerd, subscribers will get messages + // from all namespaces unless otherwise specified. If this is not desired, + // a filter can be provided in the format 'namespace==' to + // restrict the received events. Subscribe(*SubscribeRequest, Events_SubscribeServer) error } @@ -158,6 +211,24 @@ func _Events_Publish_Handler(srv interface{}, ctx context.Context, dec func(inte return interceptor(ctx, in, info, handler) } +func _Events_Forward_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ForwardRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EventsServer).Forward(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/containerd.services.events.v1.Events/Forward", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EventsServer).Forward(ctx, req.(*ForwardRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Events_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(SubscribeRequest) if err := stream.RecvMsg(m); err != nil { @@ -187,6 +258,10 @@ var _Events_serviceDesc = grpc.ServiceDesc{ MethodName: "Publish", Handler: _Events_Publish_Handler, }, + { + MethodName: "Forward", + Handler: _Events_Forward_Handler, + }, }, Streams: []grpc.StreamDesc{ { @@ -198,6 +273,68 @@ var _Events_serviceDesc = grpc.ServiceDesc{ Metadata: "github.com/containerd/containerd/api/services/events/v1/events.proto", } +func (m *PublishRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PublishRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Topic) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintEvents(dAtA, i, uint64(len(m.Topic))) + i += copy(dAtA[i:], m.Topic) + } + if m.Event != nil { + dAtA[i] = 0x12 + i++ + i = encodeVarintEvents(dAtA, i, uint64(m.Event.Size())) + n1, err := m.Event.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } + return i, nil +} + +func (m *ForwardRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ForwardRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Envelope != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintEvents(dAtA, i, uint64(m.Envelope.Size())) + n2, err := m.Envelope.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 + } + return i, nil +} + func (m *SubscribeRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -231,34 +368,6 @@ func (m *SubscribeRequest) MarshalTo(dAtA []byte) (int, error) { return i, nil } -func (m *PublishRequest) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalTo(dAtA) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *PublishRequest) MarshalTo(dAtA []byte) (int, error) { - var i int - _ = i - var l int - _ = l - if m.Envelope != nil { - dAtA[i] = 0xa - i++ - i = encodeVarintEvents(dAtA, i, uint64(m.Envelope.Size())) - n1, err := m.Envelope.MarshalTo(dAtA[i:]) - if err != nil { - return 0, err - } - i += n1 - } - return i, nil -} - func (m *Envelope) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -277,11 +386,11 @@ func (m *Envelope) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintEvents(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp))) - n2, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:]) + n3, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:]) if err != nil { return 0, err } - i += n2 + i += n3 if len(m.Namespace) > 0 { dAtA[i] = 0x12 i++ @@ -298,11 +407,11 @@ func (m *Envelope) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x22 i++ i = encodeVarintEvents(dAtA, i, uint64(m.Event.Size())) - n3, err := m.Event.MarshalTo(dAtA[i:]) + n4, err := m.Event.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n3 + i += n4 } return i, nil } @@ -334,6 +443,30 @@ func encodeVarintEvents(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return offset + 1 } +func (m *PublishRequest) Size() (n int) { + var l int + _ = l + l = len(m.Topic) + if l > 0 { + n += 1 + l + sovEvents(uint64(l)) + } + if m.Event != nil { + l = m.Event.Size() + n += 1 + l + sovEvents(uint64(l)) + } + return n +} + +func (m *ForwardRequest) Size() (n int) { + var l int + _ = l + if m.Envelope != nil { + l = m.Envelope.Size() + n += 1 + l + sovEvents(uint64(l)) + } + return n +} + func (m *SubscribeRequest) Size() (n int) { var l int _ = l @@ -346,16 +479,6 @@ func (m *SubscribeRequest) Size() (n int) { return n } -func (m *PublishRequest) Size() (n int) { - var l int - _ = l - if m.Envelope != nil { - l = m.Envelope.Size() - n += 1 + l + sovEvents(uint64(l)) - } - return n -} - func (m *Envelope) Size() (n int) { var l int _ = l @@ -389,6 +512,27 @@ func sovEvents(x uint64) (n int) { func sozEvents(x uint64) (n int) { return sovEvents(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } +func (this *PublishRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&PublishRequest{`, + `Topic:` + fmt.Sprintf("%v", this.Topic) + `,`, + `Event:` + strings.Replace(fmt.Sprintf("%v", this.Event), "Any", "google_protobuf1.Any", 1) + `,`, + `}`, + }, "") + return s +} +func (this *ForwardRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ForwardRequest{`, + `Envelope:` + strings.Replace(fmt.Sprintf("%v", this.Envelope), "Envelope", "Envelope", 1) + `,`, + `}`, + }, "") + return s +} func (this *SubscribeRequest) String() string { if this == nil { return "nil" @@ -399,16 +543,6 @@ func (this *SubscribeRequest) String() string { }, "") return s } -func (this *PublishRequest) String() string { - if this == nil { - return "nil" - } - s := strings.Join([]string{`&PublishRequest{`, - `Envelope:` + strings.Replace(fmt.Sprintf("%v", this.Envelope), "Envelope", "Envelope", 1) + `,`, - `}`, - }, "") - return s -} func (this *Envelope) String() string { if this == nil { return "nil" @@ -430,6 +564,201 @@ func valueToStringEvents(v interface{}) string { pv := reflect.Indirect(rv).Interface() return fmt.Sprintf("*%v", pv) } +func (m *PublishRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PublishRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PublishRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Topic = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Event == nil { + m.Event = &google_protobuf1.Any{} + } + if err := m.Event.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipEvents(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ForwardRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ForwardRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ForwardRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Envelope", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Envelope == nil { + m.Envelope = &Envelope{} + } + if err := m.Envelope.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipEvents(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *SubscribeRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -509,89 +838,6 @@ func (m *SubscribeRequest) Unmarshal(dAtA []byte) error { } return nil } -func (m *PublishRequest) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowEvents - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: PublishRequest: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: PublishRequest: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Envelope", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowEvents - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthEvents - } - postIndex := iNdEx + msglen - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.Envelope == nil { - m.Envelope = &Envelope{} - } - if err := m.Envelope.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipEvents(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthEvents - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} func (m *Envelope) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -873,31 +1119,33 @@ func init() { } var fileDescriptorEvents = []byte{ - // 407 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0xcd, 0x6e, 0xd3, 0x40, - 0x10, 0xc7, 0xb3, 0x84, 0x7c, 0x78, 0x91, 0x10, 0x5a, 0x45, 0xc8, 0x18, 0x70, 0xa2, 0x5c, 0x88, - 0x10, 0xec, 0x92, 0x70, 0x44, 0x42, 0x22, 0x90, 0x7b, 0x64, 0x40, 0x42, 0xdc, 0x6c, 0x77, 0xe2, - 0xac, 0x64, 0x7b, 0x5d, 0xef, 0xda, 0x52, 0x6e, 0x7d, 0x84, 0x3e, 0x49, 0x5f, 0xa2, 0x97, 0x1c, - 0x7b, 0xec, 0xa9, 0x6d, 0xfc, 0x24, 0x55, 0xfc, 0x91, 0xb4, 0x89, 0xd4, 0x54, 0xbd, 0xcd, 0xec, - 0xff, 0x37, 0x3b, 0xfb, 0x9f, 0x59, 0xfc, 0xcb, 0xe3, 0x6a, 0x9e, 0x38, 0xd4, 0x15, 0x01, 0x73, - 0x45, 0xa8, 0x6c, 0x1e, 0x42, 0x7c, 0x74, 0x37, 0xb4, 0x23, 0xce, 0x24, 0xc4, 0x29, 0x77, 0x41, - 0x32, 0x48, 0x21, 0x54, 0x92, 0xa5, 0xc3, 0x32, 0xa2, 0x51, 0x2c, 0x94, 0x20, 0xef, 0xb7, 0x3c, - 0xad, 0x58, 0x5a, 0x12, 0xe9, 0xd0, 0xe8, 0x78, 0xc2, 0x13, 0x39, 0xc9, 0xd6, 0x51, 0x51, 0x64, - 0xbc, 0xf1, 0x84, 0xf0, 0x7c, 0x60, 0x79, 0xe6, 0x24, 0x33, 0x66, 0x87, 0x8b, 0x52, 0x7a, 0xbb, - 0x2b, 0x41, 0x10, 0xa9, 0x4a, 0xec, 0xee, 0x8a, 0x8a, 0x07, 0x20, 0x95, 0x1d, 0x44, 0x05, 0xd0, - 0xff, 0x84, 0x5f, 0xfd, 0x4e, 0x1c, 0xe9, 0xc6, 0xdc, 0x01, 0x0b, 0x8e, 0x13, 0x90, 0x8a, 0xe8, - 0xb8, 0x35, 0xe3, 0xbe, 0x82, 0x58, 0xea, 0xa8, 0x57, 0x1f, 0x68, 0x56, 0x95, 0xf6, 0xff, 0xe2, - 0x97, 0xd3, 0xc4, 0xf1, 0xb9, 0x9c, 0x57, 0xec, 0x4f, 0xdc, 0x86, 0x30, 0x05, 0x5f, 0x44, 0xa0, - 0xa3, 0x1e, 0x1a, 0xbc, 0x18, 0x7d, 0xa0, 0x0f, 0x1a, 0xa4, 0x93, 0x12, 0xb7, 0x36, 0x85, 0xfd, - 0x33, 0x84, 0xdb, 0xd5, 0x31, 0x19, 0x63, 0x6d, 0xf3, 0xc8, 0xf2, 0x4a, 0x83, 0x16, 0x36, 0x68, - 0x65, 0x83, 0xfe, 0xa9, 0x88, 0x71, 0x7b, 0x79, 0xd5, 0xad, 0x9d, 0x5e, 0x77, 0x91, 0xb5, 0x2d, - 0x23, 0xef, 0xb0, 0x16, 0xda, 0x01, 0xc8, 0xc8, 0x76, 0x41, 0x7f, 0xd6, 0x43, 0x03, 0xcd, 0xda, - 0x1e, 0x90, 0x0e, 0x6e, 0x28, 0x11, 0x71, 0x57, 0xaf, 0xe7, 0x4a, 0x91, 0x90, 0x8f, 0xb8, 0x91, - 0x3f, 0x52, 0x7f, 0x9e, 0xf7, 0xec, 0xec, 0xf5, 0xfc, 0x11, 0x2e, 0xac, 0x02, 0x19, 0x9d, 0x23, - 0xdc, 0x9c, 0xe4, 0x8e, 0xc8, 0x14, 0xb7, 0xca, 0x91, 0x90, 0xcf, 0x07, 0x9c, 0xdf, 0x1f, 0x9d, - 0xf1, 0x7a, 0xaf, 0xc3, 0x64, 0xbd, 0x39, 0xe2, 0x61, 0x6d, 0xb3, 0x12, 0xc2, 0x0e, 0xdc, 0xb9, - 0xbb, 0x3c, 0xe3, 0xb1, 0xe3, 0xff, 0x82, 0xc6, 0xff, 0x96, 0x2b, 0xb3, 0x76, 0xb9, 0x32, 0x6b, - 0x27, 0x99, 0x89, 0x96, 0x99, 0x89, 0x2e, 0x32, 0x13, 0xdd, 0x64, 0x26, 0xfa, 0xff, 0xfd, 0x89, - 0x3f, 0xfd, 0x5b, 0x11, 0x39, 0xcd, 0xdc, 0xd2, 0xd7, 0xdb, 0x00, 0x00, 0x00, 0xff, 0xff, 0x13, - 0x35, 0xd0, 0x60, 0x32, 0x03, 0x00, 0x00, + // 438 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0x41, 0x6f, 0xd3, 0x30, + 0x14, 0xc7, 0xeb, 0x8d, 0x6d, 0x8d, 0x27, 0x4d, 0xc8, 0xaa, 0x50, 0x08, 0x90, 0x56, 0xb9, 0x50, + 0x21, 0xb0, 0x59, 0x39, 0x22, 0x21, 0x51, 0x28, 0xe7, 0xc9, 0x80, 0x84, 0xb8, 0x25, 0xd9, 0x5b, + 0x66, 0x29, 0x89, 0x43, 0xec, 0x04, 0xed, 0xc6, 0x47, 0xe0, 0xc2, 0xd7, 0xe0, 0x73, 0xf4, 0xc8, + 0x91, 0x13, 0xb0, 0x7c, 0x12, 0xd4, 0x24, 0x6e, 0x58, 0x0b, 0x04, 0x71, 0x7b, 0xce, 0xfb, 0xbf, + 0x5f, 0xfc, 0xfe, 0xff, 0x04, 0x3f, 0x8f, 0x84, 0x3e, 0x2f, 0x02, 0x1a, 0xca, 0x84, 0x85, 0x32, + 0xd5, 0xbe, 0x48, 0x21, 0x3f, 0xfd, 0xb5, 0xf4, 0x33, 0xc1, 0x14, 0xe4, 0xa5, 0x08, 0x41, 0x31, + 0x28, 0x21, 0xd5, 0x8a, 0x95, 0xc7, 0x6d, 0x45, 0xb3, 0x5c, 0x6a, 0x49, 0xee, 0x74, 0x7a, 0x6a, + 0xb4, 0xb4, 0x55, 0x94, 0xc7, 0xce, 0x28, 0x92, 0x91, 0xac, 0x95, 0x6c, 0x55, 0x35, 0x43, 0xce, + 0xcd, 0x48, 0xca, 0x28, 0x06, 0x56, 0x9f, 0x82, 0xe2, 0x8c, 0xf9, 0xe9, 0x45, 0xdb, 0xba, 0xb5, + 0xd9, 0x82, 0x24, 0xd3, 0xa6, 0x39, 0xde, 0x6c, 0x6a, 0x91, 0x80, 0xd2, 0x7e, 0x92, 0x35, 0x02, + 0x8f, 0xe3, 0xa3, 0x93, 0x22, 0x88, 0x85, 0x3a, 0xe7, 0xf0, 0xae, 0x00, 0xa5, 0xc9, 0x08, 0xef, + 0x69, 0x99, 0x89, 0xd0, 0x46, 0x13, 0x34, 0xb5, 0x78, 0x73, 0x20, 0xf7, 0xf0, 0x5e, 0x7d, 0x47, + 0x7b, 0x67, 0x82, 0xa6, 0x87, 0xb3, 0x11, 0x6d, 0xc0, 0xd4, 0x80, 0xe9, 0xd3, 0xf4, 0x82, 0x37, + 0x12, 0xef, 0x35, 0x3e, 0x7a, 0x21, 0xf3, 0xf7, 0x7e, 0x7e, 0x6a, 0x98, 0xcf, 0xf0, 0x10, 0xd2, + 0x12, 0x62, 0x99, 0x41, 0x8d, 0x3d, 0x9c, 0xdd, 0xa5, 0x7f, 0xb5, 0x81, 0x2e, 0x5a, 0x39, 0x5f, + 0x0f, 0x7a, 0xf7, 0xf1, 0xf5, 0x97, 0x45, 0xa0, 0xc2, 0x5c, 0x04, 0x60, 0xc0, 0x36, 0x3e, 0x38, + 0x13, 0xb1, 0x86, 0x5c, 0xd9, 0x68, 0xb2, 0x3b, 0xb5, 0xb8, 0x39, 0x7a, 0x9f, 0x11, 0x1e, 0x1a, + 0x08, 0x99, 0x63, 0x6b, 0xbd, 0x78, 0x7b, 0x01, 0x67, 0x6b, 0x83, 0x57, 0x46, 0x31, 0x1f, 0x2e, + 0xbf, 0x8d, 0x07, 0x1f, 0xbf, 0x8f, 0x11, 0xef, 0xc6, 0xc8, 0x6d, 0x6c, 0xa5, 0x7e, 0x02, 0x2a, + 0xf3, 0x43, 0xa8, 0x5d, 0xb0, 0x78, 0xf7, 0xa0, 0x73, 0x6d, 0xf7, 0xb7, 0xae, 0x5d, 0xeb, 0x75, + 0x6d, 0xf6, 0x69, 0x07, 0xef, 0x2f, 0xea, 0xfd, 0xc9, 0x09, 0x3e, 0x68, 0x43, 0x21, 0x0f, 0x7a, + 0x7c, 0xba, 0x1a, 0x9e, 0x73, 0x63, 0xeb, 0x0d, 0x8b, 0xd5, 0xd7, 0xb0, 0x22, 0xb6, 0x91, 0xf4, + 0x12, 0xaf, 0x46, 0xf7, 0x47, 0x62, 0x84, 0xad, 0x75, 0x1a, 0x84, 0xf5, 0x30, 0x37, 0x73, 0x73, + 0xfe, 0x35, 0xfe, 0x87, 0x68, 0xfe, 0x66, 0x79, 0xe9, 0x0e, 0xbe, 0x5e, 0xba, 0x83, 0x0f, 0x95, + 0x8b, 0x96, 0x95, 0x8b, 0xbe, 0x54, 0x2e, 0xfa, 0x51, 0xb9, 0xe8, 0xed, 0x93, 0xff, 0xfc, 0x1f, + 0x1f, 0x37, 0x55, 0xb0, 0x5f, 0xaf, 0xf4, 0xe8, 0x67, 0x00, 0x00, 0x00, 0xff, 0xff, 0x3b, 0x5d, + 0x09, 0xd6, 0xd8, 0x03, 0x00, 0x00, } diff --git a/api/services/events/v1/events.proto b/api/services/events/v1/events.proto index cb470d1c1..154bdff96 100644 --- a/api/services/events/v1/events.proto +++ b/api/services/events/v1/events.proto @@ -10,18 +10,42 @@ import "google/protobuf/timestamp.proto"; option go_package = "github.com/containerd/containerd/api/services/events/v1;events"; service Events { + // Publish an event to a topic. + // + // The event will be packed into a timestamp envelope with the namespace + // introspected from the context. The envelope will then be dispatched. rpc Publish(PublishRequest) returns (google.protobuf.Empty); + + // Forward sends an event that has already been packaged into an envelope + // with a timestamp and namespace. + // + // This is useful if earlier timestamping is required or when fowarding on + // behalf of another component, namespace or publisher. + rpc Forward(ForwardRequest) returns (google.protobuf.Empty); + + // Subscribe to a stream of events, possibly returning only that match any + // of the provided filters. + // + // Unlike many other methods in containerd, subscribers will get messages + // from all namespaces unless otherwise specified. If this is not desired, + // a filter can be provided in the format 'namespace==' to + // restrict the received events. rpc Subscribe(SubscribeRequest) returns (stream Envelope); } +message PublishRequest { + string topic = 1; + google.protobuf.Any event = 2; +} + +message ForwardRequest { + Envelope envelope = 1; +} + message SubscribeRequest { repeated string filters = 1; } -message PublishRequest { - Envelope envelope = 1; -} - message Envelope { google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; string namespace = 2; diff --git a/cmd/ctr/events.go b/cmd/ctr/events.go index 253b7c81b..df4e54a7e 100644 --- a/cmd/ctr/events.go +++ b/cmd/ctr/events.go @@ -3,8 +3,6 @@ package main import ( "encoding/json" "fmt" - "os" - "text/tabwriter" eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/typeurl" @@ -22,39 +20,38 @@ var eventsCommand = cli.Command{ ctx, cancel := appContext(context) defer cancel() - events, err := eventsClient.Subscribe(ctx, &eventsapi.SubscribeRequest{}) + events, err := eventsClient.Subscribe(ctx, &eventsapi.SubscribeRequest{ + Filters: context.Args(), + }) if err != nil { return err } - w := tabwriter.NewWriter(os.Stdout, 10, 1, 3, ' ', 0) for { e, err := events.Recv() if err != nil { return err } - v, err := typeurl.UnmarshalAny(e.Event) - if err != nil { - return err - } - out, err := json.Marshal(v) - if err != nil { - return err + var out []byte + if e.Event != nil { + v, err := typeurl.UnmarshalAny(e.Event) + if err != nil { + return err + } + out, err = json.Marshal(v) + if err != nil { + return err + } } - if _, err := fmt.Fprintf(w, - "%s\t%s", + if _, err := fmt.Println( e.Timestamp, + e.Namespace, e.Topic, + string(out), ); err != nil { return err } - if _, err := fmt.Fprintf(w, "\t%s\n", out); err != nil { - return err - } - if err := w.Flush(); err != nil { - return err - } } }, } diff --git a/events/exchange.go b/events/exchange.go index 5b8891ae4..a48886211 100644 --- a/events/exchange.go +++ b/events/exchange.go @@ -13,6 +13,7 @@ import ( "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/typeurl" goevents "github.com/docker/go-events" + "github.com/gogo/protobuf/types" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -28,20 +29,27 @@ func NewExchange() *Exchange { } // Forward accepts an envelope to be direcly distributed on the exchange. -func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) error { - log.G(ctx).WithFields(logrus.Fields{ - "topic": envelope.Topic, - "ns": envelope.Namespace, - "type": envelope.Event.TypeUrl, - }).Debug("forward event") - - if err := namespaces.Validate(envelope.Namespace); err != nil { - return errors.Wrapf(err, "event envelope has invalid namespace") +// +// This is useful when an event is forwaded on behalf of another namespace or +// when the event is propagated on behalf of another publisher. +func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err error) { + if err := validateEnvelope(envelope); err != nil { + return err } - if err := validateTopic(envelope.Topic); err != nil { - return errors.Wrapf(err, "envelope topic %q", envelope.Topic) - } + defer func() { + logger := log.G(ctx).WithFields(logrus.Fields{ + "topic": envelope.Topic, + "ns": envelope.Namespace, + "type": envelope.Event.TypeUrl, + }) + + if err != nil { + logger.WithError(err).Error("error forwarding event") + } else { + logger.Debug("event forwarded") + } + }() return e.broadcaster.Write(envelope) } @@ -49,8 +57,14 @@ func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) error // Publish packages and sends an event. The caller will be considered the // initial publisher of the event. This means the timestamp will be calculated // at this point and this method may read from the calling context. -func (e *Exchange) Publish(ctx context.Context, topic string, event Event) error { - namespace, err := namespaces.NamespaceRequired(ctx) +func (e *Exchange) Publish(ctx context.Context, topic string, event Event) (err error) { + var ( + namespace string + encoded *types.Any + envelope events.Envelope + ) + + namespace, err = namespaces.NamespaceRequired(ctx) if err != nil { return errors.Wrapf(err, "failed publishing event") } @@ -58,47 +72,76 @@ func (e *Exchange) Publish(ctx context.Context, topic string, event Event) error return errors.Wrapf(err, "envelope topic %q", topic) } - evany, err := typeurl.MarshalAny(event) + encoded, err = typeurl.MarshalAny(event) if err != nil { return err } - env := events.Envelope{ - Timestamp: time.Now().UTC(), - Topic: topic, - Event: evany, - } - if err := e.broadcaster.Write(&env); err != nil { - return err - } - log.G(ctx).WithFields(logrus.Fields{ - "topic": topic, - "type": evany.TypeUrl, - "ns": namespace, - }).Debug("published event") - return nil + envelope.Timestamp = time.Now().UTC() + envelope.Namespace = namespace + envelope.Topic = topic + envelope.Event = encoded + + defer func() { + logger := log.G(ctx).WithFields(logrus.Fields{ + "topic": envelope.Topic, + "ns": envelope.Namespace, + "type": envelope.Event.TypeUrl, + }) + + if err != nil { + logger.WithError(err).Error("error publishing event") + } else { + logger.Debug("event published") + } + }() + + return e.broadcaster.Write(&envelope) } // Subscribe to events on the exchange. Events are sent through the returned // channel ch. If an error is encountered, it will be sent on channel errs and // errs will be closed. To end the subscription, cancel the provided context. -func (e *Exchange) Subscribe(ctx context.Context, filters ...filters.Filter) (ch <-chan *events.Envelope, errs <-chan error) { +// +// Zero or more filters may be provided as strings. Only events that match +// *any* of the provided filters will be sent on the channel. The filters use +// the standard containerd filters package syntax. +func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) { var ( - evch = make(chan *events.Envelope) - errq = make(chan error, 1) - channel = goevents.NewChannel(0) - queue = goevents.NewQueue(channel) + evch = make(chan *events.Envelope) + errq = make(chan error, 1) + channel = goevents.NewChannel(0) + queue = goevents.NewQueue(channel) + dst goevents.Sink = queue ) - // TODO(stevvooe): Insert the filter! - - e.broadcaster.Add(queue) - - go func() { + closeAll := func() { defer close(errq) - defer e.broadcaster.Remove(queue) + defer e.broadcaster.Remove(dst) defer queue.Close() defer channel.Close() + } + + ch = evch + errs = errq + + if len(fs) > 0 { + filter, err := filters.ParseAll(fs...) + if err != nil { + errq <- errors.Wrapf(err, "failed parsing subscription filters") + closeAll() + return + } + + dst = goevents.NewFilter(queue, goevents.MatcherFunc(func(gev goevents.Event) bool { + return filter.Match(adapt(gev)) + })) + } + + e.broadcaster.Add(dst) + + go func() { + defer closeAll() var err error loop: @@ -133,9 +176,6 @@ func (e *Exchange) Subscribe(ctx context.Context, filters ...filters.Filter) (ch errq <- err }() - ch = evch - errs = errq - return } @@ -161,3 +201,46 @@ func validateTopic(topic string) error { return nil } + +func validateEnvelope(envelope *events.Envelope) error { + if err := namespaces.Validate(envelope.Namespace); err != nil { + return errors.Wrapf(err, "event envelope has invalid namespace") + } + + if err := validateTopic(envelope.Topic); err != nil { + return errors.Wrapf(err, "envelope topic %q", envelope.Topic) + } + + if envelope.Timestamp.IsZero() { + return errors.Wrapf(errdefs.ErrInvalidArgument, "timestamp must be set on forwarded event") + } + + return nil +} + +func adapt(ev interface{}) filters.Adaptor { + switch ev := ev.(type) { + case *events.Envelope: + return filters.AdapterFunc(func(fieldpath []string) (string, bool) { + if len(fieldpath) == 0 { + return "", false + } + + switch fieldpath[0] { + case "namespace": + return ev.Namespace, len(ev.Namespace) > 0 + case "topic": + return ev.Topic, len(ev.Topic) > 0 + default: + // TODO(stevvooe): Handle event fields. + return "", false + } + }) + case filters.Adaptor: + return ev + } + + return filters.AdapterFunc(func(fieldpath []string) (string, bool) { + return "", false + }) +} diff --git a/events/exchange_test.go b/events/exchange_test.go index dcd5859bf..38f059c67 100644 --- a/events/exchange_test.go +++ b/events/exchange_test.go @@ -6,6 +6,7 @@ import ( "reflect" "sync" "testing" + "time" events "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/errdefs" @@ -131,10 +132,12 @@ func TestExchangeValidateTopic(t *testing.T) { } envelope := events.Envelope{ + Timestamp: time.Now().UTC(), Namespace: namespace, Topic: testcase.input, Event: evany, } + // make sure we get same errors with forward. if err := exchange.Forward(ctx, &envelope); errors.Cause(err) != testcase.err { if err == nil { diff --git a/linux/shim/client.go b/linux/shim/client.go index 30b54a909..d2c1c02be 100644 --- a/linux/shim/client.go +++ b/linux/shim/client.go @@ -152,7 +152,7 @@ func WithConnect(ctx context.Context, config Config) (shim.ShimClient, io.Closer // WithLocal uses an in process shim func WithLocal(events *events.Exchange) func(context.Context, Config) (shim.ShimClient, io.Closer, error) { return func(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) { - service, err := NewService(config.Path, config.Namespace, &localEventsClient{forwarder: events}) + service, err := NewService(config.Path, config.Namespace, &localEventsClient{publisher: events}) if err != nil { return nil, nil, err } diff --git a/linux/shim/local.go b/linux/shim/local.go index c7d23b8a6..fee614222 100644 --- a/linux/shim/local.go +++ b/linux/shim/local.go @@ -94,11 +94,11 @@ type publisher interface { } type localEventsClient struct { - forwarder evt.Forwarder + publisher evt.Publisher } func (l *localEventsClient) Publish(ctx context.Context, r *events.PublishRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { - if err := l.forwarder.Forward(ctx, r.Envelope); err != nil { + if err := l.publisher.Publish(ctx, r.Topic, r.Event); err != nil { return nil, err } return empty, nil diff --git a/linux/shim/service.go b/linux/shim/service.go index ce7027e54..3b354eda4 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "sync" - "time" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -375,12 +374,8 @@ func (s *Service) forward(client publisher) { } if _, err := client.Publish(s.context, &events.PublishRequest{ - Envelope: &events.Envelope{ - Namespace: s.namespace, - Timestamp: time.Now().UTC(), - Topic: getTopic(e), - Event: a, - }, + Topic: getTopic(e), + Event: a, }); err != nil { log.G(s.context).WithError(err).Error("post event") } diff --git a/process.go b/process.go index 82473ea2b..440be26ea 100644 --- a/process.go +++ b/process.go @@ -6,6 +6,7 @@ import ( eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/api/services/tasks/v1" + "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/typeurl" specs "github.com/opencontainers/runtime-spec/specs-go" ) @@ -64,7 +65,9 @@ func (p *process) Kill(ctx context.Context, s syscall.Signal) error { } func (p *process) Wait(ctx context.Context) (uint32, error) { - eventstream, err := p.task.client.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{}) + eventstream, err := p.task.client.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{ + Filters: []string{"topic==" + runtime.TaskExitEventTopic}, + }) if err != nil { return UnknownExitStatus, err } diff --git a/services/events/service.go b/services/events/service.go index 30f630d27..eb9e99810 100644 --- a/services/events/service.go +++ b/services/events/service.go @@ -4,7 +4,6 @@ import ( api "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" - "github.com/containerd/containerd/filters" "github.com/containerd/containerd/plugin" "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" @@ -35,16 +34,27 @@ func (s *Service) Register(server *grpc.Server) error { return nil } +func (s *Service) Publish(ctx context.Context, r *api.PublishRequest) (*empty.Empty, error) { + if err := s.events.Publish(ctx, r.Topic, r.Event); err != nil { + return nil, errdefs.ToGRPC(err) + } + + return &empty.Empty{}, nil +} + +func (s *Service) Forward(ctx context.Context, r *api.ForwardRequest) (*empty.Empty, error) { + if err := s.events.Forward(ctx, r.Envelope); err != nil { + return nil, errdefs.ToGRPC(err) + } + + return &empty.Empty{}, nil +} + func (s *Service) Subscribe(req *api.SubscribeRequest, srv api.Events_SubscribeServer) error { ctx, cancel := context.WithCancel(srv.Context()) defer cancel() - filter, err := filters.ParseAll(req.Filters...) - if err != nil { - return errdefs.ToGRPC(err) - } - - eventq, errq := s.events.Subscribe(ctx, filter) + eventq, errq := s.events.Subscribe(ctx, req.Filters...) for { select { case ev := <-eventq: @@ -60,11 +70,3 @@ func (s *Service) Subscribe(req *api.SubscribeRequest, srv api.Events_SubscribeS } } } - -func (s *Service) Publish(ctx context.Context, r *api.PublishRequest) (*empty.Empty, error) { - if err := s.events.Forward(ctx, r.Envelope); err != nil { - return nil, errdefs.ToGRPC(err) - } - - return &empty.Empty{}, nil -} diff --git a/task.go b/task.go index 87f95a11d..640506497 100644 --- a/task.go +++ b/task.go @@ -18,6 +18,7 @@ import ( "github.com/containerd/containerd/linux/runcopts" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/rootfs" + "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/typeurl" digest "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/specs-go/v1" @@ -163,7 +164,9 @@ func (t *task) Status(ctx context.Context) (TaskStatus, error) { // Wait is a blocking call that will wait for the task to exit and return the exit status func (t *task) Wait(ctx context.Context) (uint32, error) { - eventstream, err := t.client.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{}) + eventstream, err := t.client.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{ + Filters: []string{"topic==" + runtime.TaskExitEventTopic}, + }) if err != nil { return UnknownExitStatus, errdefs.FromGRPC(err) } diff --git a/typeurl/types.go b/typeurl/types.go index e12b8a3bf..63b214b5e 100644 --- a/typeurl/types.go +++ b/typeurl/types.go @@ -66,17 +66,25 @@ func Is(any *types.Any, v interface{}) bool { // MarshalAny marshals the value v into an any with the correct TypeUrl func MarshalAny(v interface{}) (*types.Any, error) { - var data []byte + var marshal func(v interface{}) ([]byte, error) + switch t := v.(type) { + case *types.Any: + // avoid reserializing the type if we have an any. + return t, nil + case proto.Message: + marshal = func(v interface{}) ([]byte, error) { + return proto.Marshal(t) + } + default: + marshal = json.Marshal + } + url, err := TypeURL(v) if err != nil { return nil, err } - switch t := v.(type) { - case proto.Message: - data, err = proto.Marshal(t) - default: - data, err = json.Marshal(v) - } + + data, err := marshal(v) if err != nil { return nil, err } diff --git a/typeurl/types_test.go b/typeurl/types_test.go index fa5339237..0c17ea654 100644 --- a/typeurl/types_test.go +++ b/typeurl/types_test.go @@ -48,6 +48,17 @@ func TestMarshal(t *testing.T) { if any.TypeUrl != expected { t.Fatalf("expected %q but received %q", expected, any.TypeUrl) } + + // marshal it again and make sure we get the same thing back. + newany, err := MarshalAny(any) + if err != nil { + t.Fatal(err) + } + + if newany != any { // you that right: we want the same *pointer*! + t.Fatalf("expected to get back same object: %v != %v", newany, any) + } + } func TestMarshalUnmarshal(t *testing.T) {