Add Post to events service

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2017-07-07 14:23:10 -07:00
parent b9fb2793a8
commit f39693eabe
6 changed files with 234 additions and 34 deletions

View File

@ -21,6 +21,7 @@
ContainerDelete
ContentDelete
StreamEventsRequest
PostEventRequest
Envelope
ImageUpdate
ImageDelete

View File

@ -8,8 +8,9 @@ import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
import _ "github.com/gogo/protobuf/gogoproto"
import _ "github.com/gogo/protobuf/types"
import google_protobuf1 "github.com/gogo/protobuf/types"
import google_protobuf2 "github.com/golang/protobuf/ptypes/empty"
import _ "github.com/gogo/protobuf/types"
import time "time"
@ -38,6 +39,14 @@ func (m *StreamEventsRequest) Reset() { *m = StreamEventsRequ
func (*StreamEventsRequest) ProtoMessage() {}
func (*StreamEventsRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{0} }
type PostEventRequest struct {
Envelope *Envelope `protobuf:"bytes,1,opt,name=envelope" json:"envelope,omitempty"`
}
func (m *PostEventRequest) Reset() { *m = PostEventRequest{} }
func (*PostEventRequest) ProtoMessage() {}
func (*PostEventRequest) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{1} }
type Envelope struct {
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,stdtime" json:"timestamp"`
Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"`
@ -46,10 +55,11 @@ type Envelope struct {
func (m *Envelope) Reset() { *m = Envelope{} }
func (*Envelope) ProtoMessage() {}
func (*Envelope) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{1} }
func (*Envelope) Descriptor() ([]byte, []int) { return fileDescriptorEvents, []int{2} }
func init() {
proto.RegisterType((*StreamEventsRequest)(nil), "containerd.services.events.v1.StreamEventsRequest")
proto.RegisterType((*PostEventRequest)(nil), "containerd.services.events.v1.PostEventRequest")
proto.RegisterType((*Envelope)(nil), "containerd.services.events.v1.Envelope")
}
@ -65,6 +75,7 @@ const _ = grpc.SupportPackageIsVersion4
type EventsClient interface {
Stream(ctx context.Context, in *StreamEventsRequest, opts ...grpc.CallOption) (Events_StreamClient, error)
Post(ctx context.Context, in *PostEventRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error)
}
type eventsClient struct {
@ -107,10 +118,20 @@ func (x *eventsStreamClient) Recv() (*Envelope, error) {
return m, nil
}
func (c *eventsClient) Post(ctx context.Context, in *PostEventRequest, opts ...grpc.CallOption) (*google_protobuf2.Empty, error) {
out := new(google_protobuf2.Empty)
err := grpc.Invoke(ctx, "/containerd.services.events.v1.Events/Post", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Events service
type EventsServer interface {
Stream(*StreamEventsRequest, Events_StreamServer) error
Post(context.Context, *PostEventRequest) (*google_protobuf2.Empty, error)
}
func RegisterEventsServer(s *grpc.Server, srv EventsServer) {
@ -138,10 +159,33 @@ func (x *eventsStreamServer) Send(m *Envelope) error {
return x.ServerStream.SendMsg(m)
}
func _Events_Post_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PostEventRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(EventsServer).Post(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/containerd.services.events.v1.Events/Post",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(EventsServer).Post(ctx, req.(*PostEventRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Events_serviceDesc = grpc.ServiceDesc{
ServiceName: "containerd.services.events.v1.Events",
HandlerType: (*EventsServer)(nil),
Methods: []grpc.MethodDesc{},
Methods: []grpc.MethodDesc{
{
MethodName: "Post",
Handler: _Events_Post_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Stream",
@ -170,6 +214,34 @@ func (m *StreamEventsRequest) MarshalTo(dAtA []byte) (int, error) {
return i, nil
}
func (m *PostEventRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PostEventRequest) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.Envelope != nil {
dAtA[i] = 0xa
i++
i = encodeVarintEvents(dAtA, i, uint64(m.Envelope.Size()))
n1, err := m.Envelope.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n1
}
return i, nil
}
func (m *Envelope) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -188,11 +260,11 @@ func (m *Envelope) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0xa
i++
i = encodeVarintEvents(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp)))
n1, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:])
n2, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:])
if err != nil {
return 0, err
}
i += n1
i += n2
if len(m.Topic) > 0 {
dAtA[i] = 0x12
i++
@ -203,11 +275,11 @@ func (m *Envelope) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x1a
i++
i = encodeVarintEvents(dAtA, i, uint64(m.Event.Size()))
n2, err := m.Event.MarshalTo(dAtA[i:])
n3, err := m.Event.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n2
i += n3
}
return i, nil
}
@ -245,6 +317,16 @@ func (m *StreamEventsRequest) Size() (n int) {
return n
}
func (m *PostEventRequest) Size() (n int) {
var l int
_ = l
if m.Envelope != nil {
l = m.Envelope.Size()
n += 1 + l + sovEvents(uint64(l))
}
return n
}
func (m *Envelope) Size() (n int) {
var l int
_ = l
@ -283,12 +365,22 @@ func (this *StreamEventsRequest) String() string {
}, "")
return s
}
func (this *PostEventRequest) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&PostEventRequest{`,
`Envelope:` + strings.Replace(fmt.Sprintf("%v", this.Envelope), "Envelope", "Envelope", 1) + `,`,
`}`,
}, "")
return s
}
func (this *Envelope) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&Envelope{`,
`Timestamp:` + strings.Replace(strings.Replace(this.Timestamp.String(), "Timestamp", "google_protobuf2.Timestamp", 1), `&`, ``, 1) + `,`,
`Timestamp:` + strings.Replace(strings.Replace(this.Timestamp.String(), "Timestamp", "google_protobuf3.Timestamp", 1), `&`, ``, 1) + `,`,
`Topic:` + fmt.Sprintf("%v", this.Topic) + `,`,
`Event:` + strings.Replace(fmt.Sprintf("%v", this.Event), "Any", "google_protobuf1.Any", 1) + `,`,
`}`,
@ -353,6 +445,89 @@ func (m *StreamEventsRequest) Unmarshal(dAtA []byte) error {
}
return nil
}
func (m *PostEventRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PostEventRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PostEventRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Envelope", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthEvents
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Envelope == nil {
m.Envelope = &Envelope{}
}
if err := m.Envelope.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipEvents(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthEvents
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Envelope) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
@ -605,25 +780,28 @@ func init() {
}
var fileDescriptorEvents = []byte{
// 313 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x91, 0xc1, 0x4a, 0xc3, 0x30,
0x1c, 0xc6, 0x17, 0x65, 0x63, 0x8b, 0xb7, 0x38, 0x61, 0x16, 0xcc, 0xc6, 0x2e, 0x0e, 0x0f, 0x89,
0x9b, 0x47, 0x41, 0x70, 0xb8, 0x17, 0xa8, 0x1e, 0xc4, 0x5b, 0x57, 0xff, 0xc6, 0xc0, 0x9a, 0x74,
0x6d, 0x56, 0xd8, 0xcd, 0x47, 0xd8, 0x63, 0xf5, 0xe8, 0xd1, 0x93, 0xba, 0x3e, 0x89, 0x98, 0xb4,
0x4e, 0x54, 0x14, 0xbc, 0x7d, 0x7f, 0xf2, 0xfb, 0xbe, 0xfc, 0xbf, 0x04, 0x5f, 0x08, 0x69, 0xee,
0x17, 0x53, 0x16, 0xea, 0x88, 0x87, 0x5a, 0x99, 0x40, 0x2a, 0x48, 0x6e, 0x3f, 0xcb, 0x20, 0x96,
0x3c, 0x85, 0x24, 0x93, 0x21, 0xa4, 0x1c, 0x32, 0x50, 0x26, 0xe5, 0xd9, 0xb0, 0x54, 0x2c, 0x4e,
0xb4, 0xd1, 0xe4, 0x60, 0xc3, 0xb3, 0x8a, 0x65, 0x25, 0x91, 0x0d, 0xbd, 0xb6, 0xd0, 0x42, 0x5b,
0x92, 0xbf, 0x2b, 0x67, 0xf2, 0xba, 0x42, 0x6b, 0x31, 0x03, 0x6e, 0xa7, 0xe9, 0xe2, 0x8e, 0x1b,
0x19, 0x41, 0x6a, 0x82, 0x28, 0x2e, 0x81, 0xfd, 0xaf, 0x40, 0xa0, 0x96, 0xee, 0xa8, 0xbf, 0x87,
0x77, 0x2f, 0x4d, 0x02, 0x41, 0x34, 0xb1, 0x97, 0xf8, 0x30, 0x5f, 0x40, 0x6a, 0xfa, 0x2b, 0x84,
0x9b, 0x13, 0x95, 0xc1, 0x4c, 0xc7, 0x40, 0xc6, 0xb8, 0xf5, 0x91, 0xd8, 0x41, 0x3d, 0x34, 0xd8,
0x19, 0x79, 0xcc, 0x45, 0xb2, 0x2a, 0x92, 0x5d, 0x55, 0xc4, 0xb8, 0x99, 0x3f, 0x77, 0x6b, 0xab,
0x97, 0x2e, 0xf2, 0x37, 0x36, 0xd2, 0xc6, 0x75, 0xa3, 0x63, 0x19, 0x76, 0xb6, 0x7a, 0x68, 0xd0,
0xf2, 0xdd, 0x40, 0x8e, 0x70, 0xdd, 0x96, 0xeb, 0x6c, 0xdb, 0xd4, 0xf6, 0xb7, 0xd4, 0x73, 0xb5,
0xf4, 0x1d, 0x32, 0x9a, 0xe3, 0x86, 0xdb, 0x91, 0x08, 0xdc, 0x70, 0x3b, 0x93, 0x11, 0xfb, 0xf5,
0xbd, 0xd8, 0x0f, 0xd5, 0xbc, 0xc3, 0x3f, 0x3c, 0x55, 0xed, 0x63, 0x34, 0xbe, 0xce, 0xd7, 0xb4,
0xf6, 0xb4, 0xa6, 0xb5, 0x87, 0x82, 0xa2, 0xbc, 0xa0, 0xe8, 0xb1, 0xa0, 0xe8, 0xb5, 0xa0, 0xe8,
0xe6, 0xec, 0x9f, 0xbf, 0x7d, 0xea, 0xd4, 0xb4, 0x61, 0x1b, 0x9e, 0xbc, 0x05, 0x00, 0x00, 0xff,
0xff, 0xc3, 0x54, 0xa2, 0xf3, 0x36, 0x02, 0x00, 0x00,
// 367 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0xc1, 0x4e, 0xc2, 0x40,
0x10, 0x86, 0x59, 0x15, 0x02, 0xeb, 0xc5, 0xac, 0x68, 0xb0, 0xc6, 0x42, 0xb8, 0x48, 0x3c, 0xec,
0x0a, 0x1e, 0x4d, 0x4c, 0x44, 0x39, 0x6b, 0xaa, 0x89, 0xc6, 0x5b, 0xa9, 0x63, 0x6d, 0x42, 0xbb,
0xb5, 0x5d, 0x9a, 0x70, 0xf3, 0x11, 0x78, 0x26, 0x4f, 0x1c, 0x3d, 0x7a, 0x52, 0xe9, 0x93, 0x18,
0x76, 0xb7, 0x60, 0xc0, 0x88, 0xf1, 0x36, 0x3b, 0xf3, 0xcd, 0xf4, 0x9f, 0x7f, 0x8a, 0xcf, 0x5d,
0x4f, 0x3c, 0xf6, 0xbb, 0xd4, 0xe1, 0x3e, 0x73, 0x78, 0x20, 0x6c, 0x2f, 0x80, 0xe8, 0xfe, 0x7b,
0x68, 0x87, 0x1e, 0x8b, 0x21, 0x4a, 0x3c, 0x07, 0x62, 0x06, 0x09, 0x04, 0x22, 0x66, 0x49, 0x53,
0x47, 0x34, 0x8c, 0xb8, 0xe0, 0x64, 0x6f, 0xc6, 0xd3, 0x8c, 0xa5, 0x9a, 0x48, 0x9a, 0x46, 0xd9,
0xe5, 0x2e, 0x97, 0x24, 0x9b, 0x44, 0xaa, 0xc9, 0xd8, 0x71, 0x39, 0x77, 0x7b, 0xc0, 0xe4, 0xab,
0xdb, 0x7f, 0x60, 0x76, 0x30, 0xd0, 0xa5, 0xdd, 0xf9, 0x12, 0xf8, 0xa1, 0xc8, 0x8a, 0xd5, 0xf9,
0xa2, 0xf0, 0x7c, 0x88, 0x85, 0xed, 0x87, 0x0a, 0xa8, 0x6f, 0xe1, 0xcd, 0x2b, 0x11, 0x81, 0xed,
0x77, 0xa4, 0x02, 0x0b, 0x9e, 0xfa, 0x10, 0x8b, 0xfa, 0x0d, 0xde, 0xb8, 0xe4, 0xb1, 0x90, 0x49,
0x9d, 0x23, 0x67, 0xb8, 0x08, 0x41, 0x02, 0x3d, 0x1e, 0x42, 0x05, 0xd5, 0x50, 0x63, 0xbd, 0xb5,
0x4f, 0x7f, 0xdd, 0x85, 0x76, 0x34, 0x6e, 0x4d, 0x1b, 0xeb, 0x43, 0x84, 0x8b, 0x59, 0x9a, 0xb4,
0x71, 0x69, 0xaa, 0x47, 0x8f, 0x34, 0xa8, 0x52, 0x4c, 0x33, 0xc5, 0xf4, 0x3a, 0x23, 0xda, 0xc5,
0xd1, 0x7b, 0x35, 0x37, 0xfc, 0xa8, 0x22, 0x6b, 0xd6, 0x46, 0xca, 0x38, 0x2f, 0x78, 0xe8, 0x39,
0x95, 0x95, 0x1a, 0x6a, 0x94, 0x2c, 0xf5, 0x20, 0x07, 0x38, 0x2f, 0x65, 0x54, 0x56, 0xe5, 0xd4,
0xf2, 0xc2, 0xd4, 0xd3, 0x60, 0x60, 0x29, 0xa4, 0xf5, 0x82, 0x70, 0x41, 0x6d, 0x4f, 0x5c, 0x5c,
0x50, 0x6e, 0x90, 0xd6, 0x92, 0xd5, 0x7e, 0x30, 0xcd, 0xf8, 0xab, 0x1d, 0x87, 0x88, 0x5c, 0xe0,
0xb5, 0x89, 0xbf, 0x84, 0x2d, 0x69, 0x99, 0x3f, 0x82, 0xb1, 0xbd, 0xb0, 0x49, 0x67, 0x72, 0xee,
0xf6, 0xed, 0x68, 0x6c, 0xe6, 0xde, 0xc6, 0x66, 0xee, 0x39, 0x35, 0xd1, 0x28, 0x35, 0xd1, 0x6b,
0x6a, 0xa2, 0xcf, 0xd4, 0x44, 0x77, 0x27, 0xff, 0xfc, 0x6b, 0x8f, 0x55, 0xd4, 0x2d, 0xc8, 0x2f,
0x1d, 0x7d, 0x05, 0x00, 0x00, 0xff, 0xff, 0xdd, 0x37, 0xcb, 0x0e, 0xfe, 0x02, 0x00, 0x00,
}

View File

@ -3,17 +3,23 @@ syntax = "proto3";
package containerd.services.events.v1;
import "gogoproto/gogo.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
option go_package = "github.com/containerd/containerd/api/services/events/v1;events";
service Events {
rpc Stream(StreamEventsRequest) returns (stream Envelope);
rpc Post(PostEventRequest) returns (google.protobuf.Empty);
}
message StreamEventsRequest {}
message PostEventRequest {
Envelope envelope = 1;
}
message Envelope {
google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
string topic = 2;

View File

@ -583,8 +583,8 @@ func (this *RuntimeEvent) String() string {
`Type:` + fmt.Sprintf("%v", this.Type) + `,`,
`Pid:` + fmt.Sprintf("%v", this.Pid) + `,`,
`ExitStatus:` + fmt.Sprintf("%v", this.ExitStatus) + `,`,
`ExitedAt:` + strings.Replace(strings.Replace(this.ExitedAt.String(), "Timestamp", "google_protobuf2.Timestamp", 1), `&`, ``, 1) + `,`,
`Timestamp:` + strings.Replace(strings.Replace(this.Timestamp.String(), "Timestamp", "google_protobuf2.Timestamp", 1), `&`, ``, 1) + `,`,
`ExitedAt:` + strings.Replace(strings.Replace(this.ExitedAt.String(), "Timestamp", "google_protobuf3.Timestamp", 1), `&`, ``, 1) + `,`,
`Timestamp:` + strings.Replace(strings.Replace(this.Timestamp.String(), "Timestamp", "google_protobuf3.Timestamp", 1), `&`, ``, 1) + `,`,
`}`,
}, "")
return s
@ -597,7 +597,7 @@ func (this *RuntimeDelete) String() string {
`ContainerID:` + fmt.Sprintf("%v", this.ContainerID) + `,`,
`Runtime:` + fmt.Sprintf("%v", this.Runtime) + `,`,
`ExitStatus:` + fmt.Sprintf("%v", this.ExitStatus) + `,`,
`ExitedAt:` + strings.Replace(strings.Replace(this.ExitedAt.String(), "Timestamp", "google_protobuf2.Timestamp", 1), `&`, ``, 1) + `,`,
`ExitedAt:` + strings.Replace(strings.Replace(this.ExitedAt.String(), "Timestamp", "google_protobuf3.Timestamp", 1), `&`, ``, 1) + `,`,
`}`,
}, "")
return s

View File

@ -28,7 +28,6 @@ func (s *eventSink) Write(evt goevents.Event) error {
if !ok {
return errors.New("event is not a sink event")
}
topic := getTopic(e.ctx)
ns, _ := namespaces.Namespace(e.ctx)
if ns != "" && ns != s.ns {
@ -36,6 +35,12 @@ func (s *eventSink) Write(evt goevents.Event) error {
return nil
}
if ev, ok := e.event.(*events.Envelope); ok {
s.ch <- ev
return nil
}
topic := getTopic(e.ctx)
eventData, err := typeurl.MarshalAny(e.event)
if err != nil {
return err

View File

@ -8,6 +8,8 @@ import (
api "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/plugin"
"github.com/golang/protobuf/ptypes/empty"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
@ -52,3 +54,11 @@ func (s *Service) Stream(req *api.StreamEventsRequest, srv api.Events_StreamServ
}
}
}
func (s *Service) Post(ctx context.Context, r *api.PostEventRequest) (*empty.Empty, error) {
ctx = events.WithTopic(ctx, r.Envelope.Topic)
if err := s.emitter.Post(ctx, r.Envelope); err != nil {
return nil, err
}
return &empty.Empty{}, nil
}