diff --git a/api/services/events/v1/container.pb.go b/api/services/events/v1/container.pb.go index 1df8f81dc..2c1167f82 100644 --- a/api/services/events/v1/container.pb.go +++ b/api/services/events/v1/container.pb.go @@ -19,8 +19,9 @@ ContainerUpdate ContainerDelete ContentDelete - SubscribeRequest PublishRequest + ForwardRequest + SubscribeRequest Envelope ImageCreate ImageUpdate diff --git a/api/services/events/v1/events.pb.go b/api/services/events/v1/events.pb.go index f736c9b09..40ae004d3 100644 --- a/api/services/events/v1/events.pb.go +++ b/api/services/events/v1/events.pb.go @@ -32,21 +32,30 @@ var _ = fmt.Errorf var _ = math.Inf var _ = time.Kitchen +type PublishRequest struct { + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Event *google_protobuf1.Any `protobuf:"bytes,2,opt,name=event" json:"event,omitempty"` +} + +func (m *PublishRequest) Reset() { *m = PublishRequest{} } +func (*PublishRequest) ProtoMessage() {} +func (*PublishRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{0} } + +type ForwardRequest struct { + Envelope *Envelope `protobuf:"bytes,1,opt,name=envelope" json:"envelope,omitempty"` +} + +func (m *ForwardRequest) Reset() { *m = ForwardRequest{} } +func (*ForwardRequest) ProtoMessage() {} +func (*ForwardRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{1} } + type SubscribeRequest struct { Filters []string `protobuf:"bytes,1,rep,name=filters" json:"filters,omitempty"` } func (m *SubscribeRequest) Reset() { *m = SubscribeRequest{} } func (*SubscribeRequest) ProtoMessage() {} -func (*SubscribeRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{0} } - -type PublishRequest struct { - Envelope *Envelope `protobuf:"bytes,1,opt,name=envelope" json:"envelope,omitempty"` -} - -func (m *PublishRequest) Reset() { *m = PublishRequest{} } -func (*PublishRequest) ProtoMessage() {} -func (*PublishRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{1} } +func (*SubscribeRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{2} } type Envelope struct { Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,stdtime" json:"timestamp"` @@ -57,11 +66,12 @@ type Envelope struct { func (m *Envelope) Reset() { *m = Envelope{} } func (*Envelope) ProtoMessage() {} -func (*Envelope) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{2} } +func (*Envelope) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{3} } func init() { - proto.RegisterType((*SubscribeRequest)(nil), "containerd.services.events.v1.SubscribeRequest") proto.RegisterType((*PublishRequest)(nil), "containerd.services.events.v1.PublishRequest") + proto.RegisterType((*ForwardRequest)(nil), "containerd.services.events.v1.ForwardRequest") + proto.RegisterType((*SubscribeRequest)(nil), "containerd.services.events.v1.SubscribeRequest") proto.RegisterType((*Envelope)(nil), "containerd.services.events.v1.Envelope") } @@ -76,7 +86,24 @@ const _ = grpc.SupportPackageIsVersion4 // Client API for Events service type EventsClient interface { + // Publish an event to a topic. + // + // The event will be packed into a timestamp envelope with the namespace + // introspected from the context. The envelope will then be dispatched. Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error) + // Forward sends an event that has already been packaged into an envelope + // with a timestamp and namespace. + // + // This is useful if earlier timestamping is required or when fowarding on + // behalf of another component, namespace or publisher. + Forward(ctx context.Context, in *ForwardRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error) + // Subscribe to a stream of events, possibly returning only that match any + // of the provided filters. + // + // Unlike many other methods in containerd, subscribers will get messages + // from all namespaces unless otherwise specified. If this is not desired, + // a filter can be provided in the format 'namespace==' to + // restrict the received events. Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (Events_SubscribeClient, error) } @@ -97,6 +124,15 @@ func (c *eventsClient) Publish(ctx context.Context, in *PublishRequest, opts ... return out, nil } +func (c *eventsClient) Forward(ctx context.Context, in *ForwardRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error) { + out := new(google_protobuf2.Empty) + err := grpc.Invoke(ctx, "/containerd.services.events.v1.Events/Forward", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *eventsClient) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (Events_SubscribeClient, error) { stream, err := grpc.NewClientStream(ctx, &_Events_serviceDesc.Streams[0], c.cc, "/containerd.services.events.v1.Events/Subscribe", opts...) if err != nil { @@ -132,7 +168,24 @@ func (x *eventsSubscribeClient) Recv() (*Envelope, error) { // Server API for Events service type EventsServer interface { + // Publish an event to a topic. + // + // The event will be packed into a timestamp envelope with the namespace + // introspected from the context. The envelope will then be dispatched. Publish(context.Context, *PublishRequest) (*google_protobuf2.Empty, error) + // Forward sends an event that has already been packaged into an envelope + // with a timestamp and namespace. + // + // This is useful if earlier timestamping is required or when fowarding on + // behalf of another component, namespace or publisher. + Forward(context.Context, *ForwardRequest) (*google_protobuf2.Empty, error) + // Subscribe to a stream of events, possibly returning only that match any + // of the provided filters. + // + // Unlike many other methods in containerd, subscribers will get messages + // from all namespaces unless otherwise specified. If this is not desired, + // a filter can be provided in the format 'namespace==' to + // restrict the received events. Subscribe(*SubscribeRequest, Events_SubscribeServer) error } @@ -158,6 +211,24 @@ func _Events_Publish_Handler(srv interface{}, ctx context.Context, dec func(inte return interceptor(ctx, in, info, handler) } +func _Events_Forward_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ForwardRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EventsServer).Forward(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/containerd.services.events.v1.Events/Forward", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EventsServer).Forward(ctx, req.(*ForwardRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Events_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error { m := new(SubscribeRequest) if err := stream.RecvMsg(m); err != nil { @@ -187,6 +258,10 @@ var _Events_serviceDesc = grpc.ServiceDesc{ MethodName: "Publish", Handler: _Events_Publish_Handler, }, + { + MethodName: "Forward", + Handler: _Events_Forward_Handler, + }, }, Streams: []grpc.StreamDesc{ { @@ -198,6 +273,68 @@ var _Events_serviceDesc = grpc.ServiceDesc{ Metadata: "github.com/containerd/containerd/api/services/events/v1/events.proto", } +func (m *PublishRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PublishRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Topic) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintEvents(dAtA, i, uint64(len(m.Topic))) + i += copy(dAtA[i:], m.Topic) + } + if m.Event != nil { + dAtA[i] = 0x12 + i++ + i = encodeVarintEvents(dAtA, i, uint64(m.Event.Size())) + n1, err := m.Event.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } + return i, nil +} + +func (m *ForwardRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ForwardRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Envelope != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintEvents(dAtA, i, uint64(m.Envelope.Size())) + n2, err := m.Envelope.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 + } + return i, nil +} + func (m *SubscribeRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -231,34 +368,6 @@ func (m *SubscribeRequest) MarshalTo(dAtA []byte) (int, error) { return i, nil } -func (m *PublishRequest) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalTo(dAtA) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *PublishRequest) MarshalTo(dAtA []byte) (int, error) { - var i int - _ = i - var l int - _ = l - if m.Envelope != nil { - dAtA[i] = 0xa - i++ - i = encodeVarintEvents(dAtA, i, uint64(m.Envelope.Size())) - n1, err := m.Envelope.MarshalTo(dAtA[i:]) - if err != nil { - return 0, err - } - i += n1 - } - return i, nil -} - func (m *Envelope) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -277,11 +386,11 @@ func (m *Envelope) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintEvents(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp))) - n2, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:]) + n3, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:]) if err != nil { return 0, err } - i += n2 + i += n3 if len(m.Namespace) > 0 { dAtA[i] = 0x12 i++ @@ -298,11 +407,11 @@ func (m *Envelope) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x22 i++ i = encodeVarintEvents(dAtA, i, uint64(m.Event.Size())) - n3, err := m.Event.MarshalTo(dAtA[i:]) + n4, err := m.Event.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n3 + i += n4 } return i, nil } @@ -334,6 +443,30 @@ func encodeVarintEvents(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return offset + 1 } +func (m *PublishRequest) Size() (n int) { + var l int + _ = l + l = len(m.Topic) + if l > 0 { + n += 1 + l + sovEvents(uint64(l)) + } + if m.Event != nil { + l = m.Event.Size() + n += 1 + l + sovEvents(uint64(l)) + } + return n +} + +func (m *ForwardRequest) Size() (n int) { + var l int + _ = l + if m.Envelope != nil { + l = m.Envelope.Size() + n += 1 + l + sovEvents(uint64(l)) + } + return n +} + func (m *SubscribeRequest) Size() (n int) { var l int _ = l @@ -346,16 +479,6 @@ func (m *SubscribeRequest) Size() (n int) { return n } -func (m *PublishRequest) Size() (n int) { - var l int - _ = l - if m.Envelope != nil { - l = m.Envelope.Size() - n += 1 + l + sovEvents(uint64(l)) - } - return n -} - func (m *Envelope) Size() (n int) { var l int _ = l @@ -389,6 +512,27 @@ func sovEvents(x uint64) (n int) { func sozEvents(x uint64) (n int) { return sovEvents(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } +func (this *PublishRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&PublishRequest{`, + `Topic:` + fmt.Sprintf("%v", this.Topic) + `,`, + `Event:` + strings.Replace(fmt.Sprintf("%v", this.Event), "Any", "google_protobuf1.Any", 1) + `,`, + `}`, + }, "") + return s +} +func (this *ForwardRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ForwardRequest{`, + `Envelope:` + strings.Replace(fmt.Sprintf("%v", this.Envelope), "Envelope", "Envelope", 1) + `,`, + `}`, + }, "") + return s +} func (this *SubscribeRequest) String() string { if this == nil { return "nil" @@ -399,16 +543,6 @@ func (this *SubscribeRequest) String() string { }, "") return s } -func (this *PublishRequest) String() string { - if this == nil { - return "nil" - } - s := strings.Join([]string{`&PublishRequest{`, - `Envelope:` + strings.Replace(fmt.Sprintf("%v", this.Envelope), "Envelope", "Envelope", 1) + `,`, - `}`, - }, "") - return s -} func (this *Envelope) String() string { if this == nil { return "nil" @@ -430,6 +564,201 @@ func valueToStringEvents(v interface{}) string { pv := reflect.Indirect(rv).Interface() return fmt.Sprintf("*%v", pv) } +func (m *PublishRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PublishRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PublishRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Topic = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Event == nil { + m.Event = &google_protobuf1.Any{} + } + if err := m.Event.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipEvents(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ForwardRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ForwardRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ForwardRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Envelope", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Envelope == nil { + m.Envelope = &Envelope{} + } + if err := m.Envelope.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipEvents(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *SubscribeRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -509,89 +838,6 @@ func (m *SubscribeRequest) Unmarshal(dAtA []byte) error { } return nil } -func (m *PublishRequest) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowEvents - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: PublishRequest: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: PublishRequest: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Envelope", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowEvents - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthEvents - } - postIndex := iNdEx + msglen - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.Envelope == nil { - m.Envelope = &Envelope{} - } - if err := m.Envelope.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipEvents(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthEvents - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} func (m *Envelope) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -873,31 +1119,33 @@ func init() { } var fileDescriptorEvents = []byte{ - // 407 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0xcd, 0x6e, 0xd3, 0x40, - 0x10, 0xc7, 0xb3, 0x84, 0x7c, 0x78, 0x91, 0x10, 0x5a, 0x45, 0xc8, 0x18, 0x70, 0xa2, 0x5c, 0x88, - 0x10, 0xec, 0x92, 0x70, 0x44, 0x42, 0x22, 0x90, 0x7b, 0x64, 0x40, 0x42, 0xdc, 0x6c, 0x77, 0xe2, - 0xac, 0x64, 0x7b, 0x5d, 0xef, 0xda, 0x52, 0x6e, 0x7d, 0x84, 0x3e, 0x49, 0x5f, 0xa2, 0x97, 0x1c, - 0x7b, 0xec, 0xa9, 0x6d, 0xfc, 0x24, 0x55, 0xfc, 0x91, 0xb4, 0x89, 0xd4, 0x54, 0xbd, 0xcd, 0xec, - 0xff, 0x37, 0x3b, 0xfb, 0x9f, 0x59, 0xfc, 0xcb, 0xe3, 0x6a, 0x9e, 0x38, 0xd4, 0x15, 0x01, 0x73, - 0x45, 0xa8, 0x6c, 0x1e, 0x42, 0x7c, 0x74, 0x37, 0xb4, 0x23, 0xce, 0x24, 0xc4, 0x29, 0x77, 0x41, - 0x32, 0x48, 0x21, 0x54, 0x92, 0xa5, 0xc3, 0x32, 0xa2, 0x51, 0x2c, 0x94, 0x20, 0xef, 0xb7, 0x3c, - 0xad, 0x58, 0x5a, 0x12, 0xe9, 0xd0, 0xe8, 0x78, 0xc2, 0x13, 0x39, 0xc9, 0xd6, 0x51, 0x51, 0x64, - 0xbc, 0xf1, 0x84, 0xf0, 0x7c, 0x60, 0x79, 0xe6, 0x24, 0x33, 0x66, 0x87, 0x8b, 0x52, 0x7a, 0xbb, - 0x2b, 0x41, 0x10, 0xa9, 0x4a, 0xec, 0xee, 0x8a, 0x8a, 0x07, 0x20, 0x95, 0x1d, 0x44, 0x05, 0xd0, - 0xff, 0x84, 0x5f, 0xfd, 0x4e, 0x1c, 0xe9, 0xc6, 0xdc, 0x01, 0x0b, 0x8e, 0x13, 0x90, 0x8a, 0xe8, - 0xb8, 0x35, 0xe3, 0xbe, 0x82, 0x58, 0xea, 0xa8, 0x57, 0x1f, 0x68, 0x56, 0x95, 0xf6, 0xff, 0xe2, - 0x97, 0xd3, 0xc4, 0xf1, 0xb9, 0x9c, 0x57, 0xec, 0x4f, 0xdc, 0x86, 0x30, 0x05, 0x5f, 0x44, 0xa0, - 0xa3, 0x1e, 0x1a, 0xbc, 0x18, 0x7d, 0xa0, 0x0f, 0x1a, 0xa4, 0x93, 0x12, 0xb7, 0x36, 0x85, 0xfd, - 0x33, 0x84, 0xdb, 0xd5, 0x31, 0x19, 0x63, 0x6d, 0xf3, 0xc8, 0xf2, 0x4a, 0x83, 0x16, 0x36, 0x68, - 0x65, 0x83, 0xfe, 0xa9, 0x88, 0x71, 0x7b, 0x79, 0xd5, 0xad, 0x9d, 0x5e, 0x77, 0x91, 0xb5, 0x2d, - 0x23, 0xef, 0xb0, 0x16, 0xda, 0x01, 0xc8, 0xc8, 0x76, 0x41, 0x7f, 0xd6, 0x43, 0x03, 0xcd, 0xda, - 0x1e, 0x90, 0x0e, 0x6e, 0x28, 0x11, 0x71, 0x57, 0xaf, 0xe7, 0x4a, 0x91, 0x90, 0x8f, 0xb8, 0x91, - 0x3f, 0x52, 0x7f, 0x9e, 0xf7, 0xec, 0xec, 0xf5, 0xfc, 0x11, 0x2e, 0xac, 0x02, 0x19, 0x9d, 0x23, - 0xdc, 0x9c, 0xe4, 0x8e, 0xc8, 0x14, 0xb7, 0xca, 0x91, 0x90, 0xcf, 0x07, 0x9c, 0xdf, 0x1f, 0x9d, - 0xf1, 0x7a, 0xaf, 0xc3, 0x64, 0xbd, 0x39, 0xe2, 0x61, 0x6d, 0xb3, 0x12, 0xc2, 0x0e, 0xdc, 0xb9, - 0xbb, 0x3c, 0xe3, 0xb1, 0xe3, 0xff, 0x82, 0xc6, 0xff, 0x96, 0x2b, 0xb3, 0x76, 0xb9, 0x32, 0x6b, - 0x27, 0x99, 0x89, 0x96, 0x99, 0x89, 0x2e, 0x32, 0x13, 0xdd, 0x64, 0x26, 0xfa, 0xff, 0xfd, 0x89, - 0x3f, 0xfd, 0x5b, 0x11, 0x39, 0xcd, 0xdc, 0xd2, 0xd7, 0xdb, 0x00, 0x00, 0x00, 0xff, 0xff, 0x13, - 0x35, 0xd0, 0x60, 0x32, 0x03, 0x00, 0x00, + // 438 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0x41, 0x6f, 0xd3, 0x30, + 0x14, 0xc7, 0xeb, 0x8d, 0x6d, 0x8d, 0x27, 0x4d, 0xc8, 0xaa, 0x50, 0x08, 0x90, 0x56, 0xb9, 0x50, + 0x21, 0xb0, 0x59, 0x39, 0x22, 0x21, 0x51, 0x28, 0xe7, 0xc9, 0x80, 0x84, 0xb8, 0x25, 0xd9, 0x5b, + 0x66, 0x29, 0x89, 0x43, 0xec, 0x04, 0xed, 0xc6, 0x47, 0xe0, 0xc2, 0xd7, 0xe0, 0x73, 0xf4, 0xc8, + 0x91, 0x13, 0xb0, 0x7c, 0x12, 0xd4, 0x24, 0x6e, 0x58, 0x0b, 0x04, 0x71, 0x7b, 0xce, 0xfb, 0xbf, + 0x5f, 0xfc, 0xfe, 0xff, 0x04, 0x3f, 0x8f, 0x84, 0x3e, 0x2f, 0x02, 0x1a, 0xca, 0x84, 0x85, 0x32, + 0xd5, 0xbe, 0x48, 0x21, 0x3f, 0xfd, 0xb5, 0xf4, 0x33, 0xc1, 0x14, 0xe4, 0xa5, 0x08, 0x41, 0x31, + 0x28, 0x21, 0xd5, 0x8a, 0x95, 0xc7, 0x6d, 0x45, 0xb3, 0x5c, 0x6a, 0x49, 0xee, 0x74, 0x7a, 0x6a, + 0xb4, 0xb4, 0x55, 0x94, 0xc7, 0xce, 0x28, 0x92, 0x91, 0xac, 0x95, 0x6c, 0x55, 0x35, 0x43, 0xce, + 0xcd, 0x48, 0xca, 0x28, 0x06, 0x56, 0x9f, 0x82, 0xe2, 0x8c, 0xf9, 0xe9, 0x45, 0xdb, 0xba, 0xb5, + 0xd9, 0x82, 0x24, 0xd3, 0xa6, 0x39, 0xde, 0x6c, 0x6a, 0x91, 0x80, 0xd2, 0x7e, 0x92, 0x35, 0x02, + 0x8f, 0xe3, 0xa3, 0x93, 0x22, 0x88, 0x85, 0x3a, 0xe7, 0xf0, 0xae, 0x00, 0xa5, 0xc9, 0x08, 0xef, + 0x69, 0x99, 0x89, 0xd0, 0x46, 0x13, 0x34, 0xb5, 0x78, 0x73, 0x20, 0xf7, 0xf0, 0x5e, 0x7d, 0x47, + 0x7b, 0x67, 0x82, 0xa6, 0x87, 0xb3, 0x11, 0x6d, 0xc0, 0xd4, 0x80, 0xe9, 0xd3, 0xf4, 0x82, 0x37, + 0x12, 0xef, 0x35, 0x3e, 0x7a, 0x21, 0xf3, 0xf7, 0x7e, 0x7e, 0x6a, 0x98, 0xcf, 0xf0, 0x10, 0xd2, + 0x12, 0x62, 0x99, 0x41, 0x8d, 0x3d, 0x9c, 0xdd, 0xa5, 0x7f, 0xb5, 0x81, 0x2e, 0x5a, 0x39, 0x5f, + 0x0f, 0x7a, 0xf7, 0xf1, 0xf5, 0x97, 0x45, 0xa0, 0xc2, 0x5c, 0x04, 0x60, 0xc0, 0x36, 0x3e, 0x38, + 0x13, 0xb1, 0x86, 0x5c, 0xd9, 0x68, 0xb2, 0x3b, 0xb5, 0xb8, 0x39, 0x7a, 0x9f, 0x11, 0x1e, 0x1a, + 0x08, 0x99, 0x63, 0x6b, 0xbd, 0x78, 0x7b, 0x01, 0x67, 0x6b, 0x83, 0x57, 0x46, 0x31, 0x1f, 0x2e, + 0xbf, 0x8d, 0x07, 0x1f, 0xbf, 0x8f, 0x11, 0xef, 0xc6, 0xc8, 0x6d, 0x6c, 0xa5, 0x7e, 0x02, 0x2a, + 0xf3, 0x43, 0xa8, 0x5d, 0xb0, 0x78, 0xf7, 0xa0, 0x73, 0x6d, 0xf7, 0xb7, 0xae, 0x5d, 0xeb, 0x75, + 0x6d, 0xf6, 0x69, 0x07, 0xef, 0x2f, 0xea, 0xfd, 0xc9, 0x09, 0x3e, 0x68, 0x43, 0x21, 0x0f, 0x7a, + 0x7c, 0xba, 0x1a, 0x9e, 0x73, 0x63, 0xeb, 0x0d, 0x8b, 0xd5, 0xd7, 0xb0, 0x22, 0xb6, 0x91, 0xf4, + 0x12, 0xaf, 0x46, 0xf7, 0x47, 0x62, 0x84, 0xad, 0x75, 0x1a, 0x84, 0xf5, 0x30, 0x37, 0x73, 0x73, + 0xfe, 0x35, 0xfe, 0x87, 0x68, 0xfe, 0x66, 0x79, 0xe9, 0x0e, 0xbe, 0x5e, 0xba, 0x83, 0x0f, 0x95, + 0x8b, 0x96, 0x95, 0x8b, 0xbe, 0x54, 0x2e, 0xfa, 0x51, 0xb9, 0xe8, 0xed, 0x93, 0xff, 0xfc, 0x1f, + 0x1f, 0x37, 0x55, 0xb0, 0x5f, 0xaf, 0xf4, 0xe8, 0x67, 0x00, 0x00, 0x00, 0xff, 0xff, 0x3b, 0x5d, + 0x09, 0xd6, 0xd8, 0x03, 0x00, 0x00, } diff --git a/api/services/events/v1/events.proto b/api/services/events/v1/events.proto index cb470d1c1..154bdff96 100644 --- a/api/services/events/v1/events.proto +++ b/api/services/events/v1/events.proto @@ -10,18 +10,42 @@ import "google/protobuf/timestamp.proto"; option go_package = "github.com/containerd/containerd/api/services/events/v1;events"; service Events { + // Publish an event to a topic. + // + // The event will be packed into a timestamp envelope with the namespace + // introspected from the context. The envelope will then be dispatched. rpc Publish(PublishRequest) returns (google.protobuf.Empty); + + // Forward sends an event that has already been packaged into an envelope + // with a timestamp and namespace. + // + // This is useful if earlier timestamping is required or when fowarding on + // behalf of another component, namespace or publisher. + rpc Forward(ForwardRequest) returns (google.protobuf.Empty); + + // Subscribe to a stream of events, possibly returning only that match any + // of the provided filters. + // + // Unlike many other methods in containerd, subscribers will get messages + // from all namespaces unless otherwise specified. If this is not desired, + // a filter can be provided in the format 'namespace==' to + // restrict the received events. rpc Subscribe(SubscribeRequest) returns (stream Envelope); } +message PublishRequest { + string topic = 1; + google.protobuf.Any event = 2; +} + +message ForwardRequest { + Envelope envelope = 1; +} + message SubscribeRequest { repeated string filters = 1; } -message PublishRequest { - Envelope envelope = 1; -} - message Envelope { google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; string namespace = 2; diff --git a/cmd/ctr/events.go b/cmd/ctr/events.go index 253b7c81b..df4e54a7e 100644 --- a/cmd/ctr/events.go +++ b/cmd/ctr/events.go @@ -3,8 +3,6 @@ package main import ( "encoding/json" "fmt" - "os" - "text/tabwriter" eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/typeurl" @@ -22,39 +20,38 @@ var eventsCommand = cli.Command{ ctx, cancel := appContext(context) defer cancel() - events, err := eventsClient.Subscribe(ctx, &eventsapi.SubscribeRequest{}) + events, err := eventsClient.Subscribe(ctx, &eventsapi.SubscribeRequest{ + Filters: context.Args(), + }) if err != nil { return err } - w := tabwriter.NewWriter(os.Stdout, 10, 1, 3, ' ', 0) for { e, err := events.Recv() if err != nil { return err } - v, err := typeurl.UnmarshalAny(e.Event) - if err != nil { - return err - } - out, err := json.Marshal(v) - if err != nil { - return err + var out []byte + if e.Event != nil { + v, err := typeurl.UnmarshalAny(e.Event) + if err != nil { + return err + } + out, err = json.Marshal(v) + if err != nil { + return err + } } - if _, err := fmt.Fprintf(w, - "%s\t%s", + if _, err := fmt.Println( e.Timestamp, + e.Namespace, e.Topic, + string(out), ); err != nil { return err } - if _, err := fmt.Fprintf(w, "\t%s\n", out); err != nil { - return err - } - if err := w.Flush(); err != nil { - return err - } } }, } diff --git a/events/exchange.go b/events/exchange.go index 5b8891ae4..a48886211 100644 --- a/events/exchange.go +++ b/events/exchange.go @@ -13,6 +13,7 @@ import ( "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/typeurl" goevents "github.com/docker/go-events" + "github.com/gogo/protobuf/types" "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -28,20 +29,27 @@ func NewExchange() *Exchange { } // Forward accepts an envelope to be direcly distributed on the exchange. -func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) error { - log.G(ctx).WithFields(logrus.Fields{ - "topic": envelope.Topic, - "ns": envelope.Namespace, - "type": envelope.Event.TypeUrl, - }).Debug("forward event") - - if err := namespaces.Validate(envelope.Namespace); err != nil { - return errors.Wrapf(err, "event envelope has invalid namespace") +// +// This is useful when an event is forwaded on behalf of another namespace or +// when the event is propagated on behalf of another publisher. +func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err error) { + if err := validateEnvelope(envelope); err != nil { + return err } - if err := validateTopic(envelope.Topic); err != nil { - return errors.Wrapf(err, "envelope topic %q", envelope.Topic) - } + defer func() { + logger := log.G(ctx).WithFields(logrus.Fields{ + "topic": envelope.Topic, + "ns": envelope.Namespace, + "type": envelope.Event.TypeUrl, + }) + + if err != nil { + logger.WithError(err).Error("error forwarding event") + } else { + logger.Debug("event forwarded") + } + }() return e.broadcaster.Write(envelope) } @@ -49,8 +57,14 @@ func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) error // Publish packages and sends an event. The caller will be considered the // initial publisher of the event. This means the timestamp will be calculated // at this point and this method may read from the calling context. -func (e *Exchange) Publish(ctx context.Context, topic string, event Event) error { - namespace, err := namespaces.NamespaceRequired(ctx) +func (e *Exchange) Publish(ctx context.Context, topic string, event Event) (err error) { + var ( + namespace string + encoded *types.Any + envelope events.Envelope + ) + + namespace, err = namespaces.NamespaceRequired(ctx) if err != nil { return errors.Wrapf(err, "failed publishing event") } @@ -58,47 +72,76 @@ func (e *Exchange) Publish(ctx context.Context, topic string, event Event) error return errors.Wrapf(err, "envelope topic %q", topic) } - evany, err := typeurl.MarshalAny(event) + encoded, err = typeurl.MarshalAny(event) if err != nil { return err } - env := events.Envelope{ - Timestamp: time.Now().UTC(), - Topic: topic, - Event: evany, - } - if err := e.broadcaster.Write(&env); err != nil { - return err - } - log.G(ctx).WithFields(logrus.Fields{ - "topic": topic, - "type": evany.TypeUrl, - "ns": namespace, - }).Debug("published event") - return nil + envelope.Timestamp = time.Now().UTC() + envelope.Namespace = namespace + envelope.Topic = topic + envelope.Event = encoded + + defer func() { + logger := log.G(ctx).WithFields(logrus.Fields{ + "topic": envelope.Topic, + "ns": envelope.Namespace, + "type": envelope.Event.TypeUrl, + }) + + if err != nil { + logger.WithError(err).Error("error publishing event") + } else { + logger.Debug("event published") + } + }() + + return e.broadcaster.Write(&envelope) } // Subscribe to events on the exchange. Events are sent through the returned // channel ch. If an error is encountered, it will be sent on channel errs and // errs will be closed. To end the subscription, cancel the provided context. -func (e *Exchange) Subscribe(ctx context.Context, filters ...filters.Filter) (ch <-chan *events.Envelope, errs <-chan error) { +// +// Zero or more filters may be provided as strings. Only events that match +// *any* of the provided filters will be sent on the channel. The filters use +// the standard containerd filters package syntax. +func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) { var ( - evch = make(chan *events.Envelope) - errq = make(chan error, 1) - channel = goevents.NewChannel(0) - queue = goevents.NewQueue(channel) + evch = make(chan *events.Envelope) + errq = make(chan error, 1) + channel = goevents.NewChannel(0) + queue = goevents.NewQueue(channel) + dst goevents.Sink = queue ) - // TODO(stevvooe): Insert the filter! - - e.broadcaster.Add(queue) - - go func() { + closeAll := func() { defer close(errq) - defer e.broadcaster.Remove(queue) + defer e.broadcaster.Remove(dst) defer queue.Close() defer channel.Close() + } + + ch = evch + errs = errq + + if len(fs) > 0 { + filter, err := filters.ParseAll(fs...) + if err != nil { + errq <- errors.Wrapf(err, "failed parsing subscription filters") + closeAll() + return + } + + dst = goevents.NewFilter(queue, goevents.MatcherFunc(func(gev goevents.Event) bool { + return filter.Match(adapt(gev)) + })) + } + + e.broadcaster.Add(dst) + + go func() { + defer closeAll() var err error loop: @@ -133,9 +176,6 @@ func (e *Exchange) Subscribe(ctx context.Context, filters ...filters.Filter) (ch errq <- err }() - ch = evch - errs = errq - return } @@ -161,3 +201,46 @@ func validateTopic(topic string) error { return nil } + +func validateEnvelope(envelope *events.Envelope) error { + if err := namespaces.Validate(envelope.Namespace); err != nil { + return errors.Wrapf(err, "event envelope has invalid namespace") + } + + if err := validateTopic(envelope.Topic); err != nil { + return errors.Wrapf(err, "envelope topic %q", envelope.Topic) + } + + if envelope.Timestamp.IsZero() { + return errors.Wrapf(errdefs.ErrInvalidArgument, "timestamp must be set on forwarded event") + } + + return nil +} + +func adapt(ev interface{}) filters.Adaptor { + switch ev := ev.(type) { + case *events.Envelope: + return filters.AdapterFunc(func(fieldpath []string) (string, bool) { + if len(fieldpath) == 0 { + return "", false + } + + switch fieldpath[0] { + case "namespace": + return ev.Namespace, len(ev.Namespace) > 0 + case "topic": + return ev.Topic, len(ev.Topic) > 0 + default: + // TODO(stevvooe): Handle event fields. + return "", false + } + }) + case filters.Adaptor: + return ev + } + + return filters.AdapterFunc(func(fieldpath []string) (string, bool) { + return "", false + }) +} diff --git a/events/exchange_test.go b/events/exchange_test.go index dcd5859bf..38f059c67 100644 --- a/events/exchange_test.go +++ b/events/exchange_test.go @@ -6,6 +6,7 @@ import ( "reflect" "sync" "testing" + "time" events "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/errdefs" @@ -131,10 +132,12 @@ func TestExchangeValidateTopic(t *testing.T) { } envelope := events.Envelope{ + Timestamp: time.Now().UTC(), Namespace: namespace, Topic: testcase.input, Event: evany, } + // make sure we get same errors with forward. if err := exchange.Forward(ctx, &envelope); errors.Cause(err) != testcase.err { if err == nil { diff --git a/linux/shim/client.go b/linux/shim/client.go index 30b54a909..d2c1c02be 100644 --- a/linux/shim/client.go +++ b/linux/shim/client.go @@ -152,7 +152,7 @@ func WithConnect(ctx context.Context, config Config) (shim.ShimClient, io.Closer // WithLocal uses an in process shim func WithLocal(events *events.Exchange) func(context.Context, Config) (shim.ShimClient, io.Closer, error) { return func(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) { - service, err := NewService(config.Path, config.Namespace, &localEventsClient{forwarder: events}) + service, err := NewService(config.Path, config.Namespace, &localEventsClient{publisher: events}) if err != nil { return nil, nil, err } diff --git a/linux/shim/local.go b/linux/shim/local.go index c7d23b8a6..fee614222 100644 --- a/linux/shim/local.go +++ b/linux/shim/local.go @@ -94,11 +94,11 @@ type publisher interface { } type localEventsClient struct { - forwarder evt.Forwarder + publisher evt.Publisher } func (l *localEventsClient) Publish(ctx context.Context, r *events.PublishRequest, opts ...grpc.CallOption) (*google_protobuf.Empty, error) { - if err := l.forwarder.Forward(ctx, r.Envelope); err != nil { + if err := l.publisher.Publish(ctx, r.Topic, r.Event); err != nil { return nil, err } return empty, nil diff --git a/linux/shim/service.go b/linux/shim/service.go index ce7027e54..3b354eda4 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "sync" - "time" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -375,12 +374,8 @@ func (s *Service) forward(client publisher) { } if _, err := client.Publish(s.context, &events.PublishRequest{ - Envelope: &events.Envelope{ - Namespace: s.namespace, - Timestamp: time.Now().UTC(), - Topic: getTopic(e), - Event: a, - }, + Topic: getTopic(e), + Event: a, }); err != nil { log.G(s.context).WithError(err).Error("post event") } diff --git a/process.go b/process.go index 82473ea2b..440be26ea 100644 --- a/process.go +++ b/process.go @@ -6,6 +6,7 @@ import ( eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/api/services/tasks/v1" + "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/typeurl" specs "github.com/opencontainers/runtime-spec/specs-go" ) @@ -64,7 +65,9 @@ func (p *process) Kill(ctx context.Context, s syscall.Signal) error { } func (p *process) Wait(ctx context.Context) (uint32, error) { - eventstream, err := p.task.client.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{}) + eventstream, err := p.task.client.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{ + Filters: []string{"topic==" + runtime.TaskExitEventTopic}, + }) if err != nil { return UnknownExitStatus, err } diff --git a/services/events/service.go b/services/events/service.go index 30f630d27..eb9e99810 100644 --- a/services/events/service.go +++ b/services/events/service.go @@ -4,7 +4,6 @@ import ( api "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" - "github.com/containerd/containerd/filters" "github.com/containerd/containerd/plugin" "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" @@ -35,16 +34,27 @@ func (s *Service) Register(server *grpc.Server) error { return nil } +func (s *Service) Publish(ctx context.Context, r *api.PublishRequest) (*empty.Empty, error) { + if err := s.events.Publish(ctx, r.Topic, r.Event); err != nil { + return nil, errdefs.ToGRPC(err) + } + + return &empty.Empty{}, nil +} + +func (s *Service) Forward(ctx context.Context, r *api.ForwardRequest) (*empty.Empty, error) { + if err := s.events.Forward(ctx, r.Envelope); err != nil { + return nil, errdefs.ToGRPC(err) + } + + return &empty.Empty{}, nil +} + func (s *Service) Subscribe(req *api.SubscribeRequest, srv api.Events_SubscribeServer) error { ctx, cancel := context.WithCancel(srv.Context()) defer cancel() - filter, err := filters.ParseAll(req.Filters...) - if err != nil { - return errdefs.ToGRPC(err) - } - - eventq, errq := s.events.Subscribe(ctx, filter) + eventq, errq := s.events.Subscribe(ctx, req.Filters...) for { select { case ev := <-eventq: @@ -60,11 +70,3 @@ func (s *Service) Subscribe(req *api.SubscribeRequest, srv api.Events_SubscribeS } } } - -func (s *Service) Publish(ctx context.Context, r *api.PublishRequest) (*empty.Empty, error) { - if err := s.events.Forward(ctx, r.Envelope); err != nil { - return nil, errdefs.ToGRPC(err) - } - - return &empty.Empty{}, nil -} diff --git a/task.go b/task.go index 87f95a11d..640506497 100644 --- a/task.go +++ b/task.go @@ -18,6 +18,7 @@ import ( "github.com/containerd/containerd/linux/runcopts" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/rootfs" + "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/typeurl" digest "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/specs-go/v1" @@ -163,7 +164,9 @@ func (t *task) Status(ctx context.Context) (TaskStatus, error) { // Wait is a blocking call that will wait for the task to exit and return the exit status func (t *task) Wait(ctx context.Context) (uint32, error) { - eventstream, err := t.client.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{}) + eventstream, err := t.client.EventService().Subscribe(ctx, &eventsapi.SubscribeRequest{ + Filters: []string{"topic==" + runtime.TaskExitEventTopic}, + }) if err != nil { return UnknownExitStatus, errdefs.FromGRPC(err) } diff --git a/typeurl/types.go b/typeurl/types.go index e12b8a3bf..63b214b5e 100644 --- a/typeurl/types.go +++ b/typeurl/types.go @@ -66,17 +66,25 @@ func Is(any *types.Any, v interface{}) bool { // MarshalAny marshals the value v into an any with the correct TypeUrl func MarshalAny(v interface{}) (*types.Any, error) { - var data []byte + var marshal func(v interface{}) ([]byte, error) + switch t := v.(type) { + case *types.Any: + // avoid reserializing the type if we have an any. + return t, nil + case proto.Message: + marshal = func(v interface{}) ([]byte, error) { + return proto.Marshal(t) + } + default: + marshal = json.Marshal + } + url, err := TypeURL(v) if err != nil { return nil, err } - switch t := v.(type) { - case proto.Message: - data, err = proto.Marshal(t) - default: - data, err = json.Marshal(v) - } + + data, err := marshal(v) if err != nil { return nil, err } diff --git a/typeurl/types_test.go b/typeurl/types_test.go index fa5339237..0c17ea654 100644 --- a/typeurl/types_test.go +++ b/typeurl/types_test.go @@ -48,6 +48,17 @@ func TestMarshal(t *testing.T) { if any.TypeUrl != expected { t.Fatalf("expected %q but received %q", expected, any.TypeUrl) } + + // marshal it again and make sure we get the same thing back. + newany, err := MarshalAny(any) + if err != nil { + t.Fatal(err) + } + + if newany != any { // you that right: we want the same *pointer*! + t.Fatalf("expected to get back same object: %v != %v", newany, any) + } + } func TestMarshalUnmarshal(t *testing.T) {