Fix API forward events for shims

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2019-04-10 13:34:34 -04:00
parent 475619c29e
commit 4ba756edda
5 changed files with 448 additions and 77 deletions

View File

@ -4211,28 +4211,61 @@ file {
dependency: "google/protobuf/empty.proto"
dependency: "google/protobuf/timestamp.proto"
message_type {
name: "PublishRequest"
name: "ForwardRequest"
field {
name: "envelope"
number: 1
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".containerd.services.events.ttrpc.v1.Envelope"
json_name: "envelope"
}
}
message_type {
name: "Envelope"
field {
name: "timestamp"
number: 1
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".google.protobuf.Timestamp"
options {
65001: 0
65010: 1
}
json_name: "timestamp"
}
field {
name: "namespace"
number: 2
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "namespace"
}
field {
name: "topic"
number: 1
number: 3
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "topic"
}
field {
name: "event"
number: 2
number: 4
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".google.protobuf.Any"
json_name: "event"
}
options {
64400: 1
}
}
service {
name: "Events"
method {
name: "Publish"
input_type: ".containerd.services.events.ttrpc.v1.PublishRequest"
name: "Forward"
input_type: ".containerd.services.events.ttrpc.v1.ForwardRequest"
output_type: ".google.protobuf.Empty"
}
}

View File

@ -7,18 +7,22 @@ import (
context "context"
fmt "fmt"
github_com_containerd_ttrpc "github.com/containerd/ttrpc"
github_com_containerd_typeurl "github.com/containerd/typeurl"
proto "github.com/gogo/protobuf/proto"
github_com_gogo_protobuf_types "github.com/gogo/protobuf/types"
types "github.com/gogo/protobuf/types"
io "io"
math "math"
reflect "reflect"
strings "strings"
time "time"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
var _ = time.Kitchen
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
@ -26,25 +30,24 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type PublishRequest struct {
Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"`
Event *types.Any `protobuf:"bytes,2,opt,name=event,proto3" json:"event,omitempty"`
type ForwardRequest struct {
Envelope *Envelope `protobuf:"bytes,1,opt,name=envelope,proto3" json:"envelope,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *PublishRequest) Reset() { *m = PublishRequest{} }
func (*PublishRequest) ProtoMessage() {}
func (*PublishRequest) Descriptor() ([]byte, []int) {
func (m *ForwardRequest) Reset() { *m = ForwardRequest{} }
func (*ForwardRequest) ProtoMessage() {}
func (*ForwardRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_19f98672016720b5, []int{0}
}
func (m *PublishRequest) XXX_Unmarshal(b []byte) error {
func (m *ForwardRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *PublishRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
func (m *ForwardRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_PublishRequest.Marshal(b, m, deterministic)
return xxx_messageInfo_ForwardRequest.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
@ -54,20 +57,63 @@ func (m *PublishRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, erro
return b[:n], nil
}
}
func (m *PublishRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_PublishRequest.Merge(m, src)
func (m *ForwardRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_ForwardRequest.Merge(m, src)
}
func (m *PublishRequest) XXX_Size() int {
func (m *ForwardRequest) XXX_Size() int {
return m.Size()
}
func (m *PublishRequest) XXX_DiscardUnknown() {
xxx_messageInfo_PublishRequest.DiscardUnknown(m)
func (m *ForwardRequest) XXX_DiscardUnknown() {
xxx_messageInfo_ForwardRequest.DiscardUnknown(m)
}
var xxx_messageInfo_PublishRequest proto.InternalMessageInfo
var xxx_messageInfo_ForwardRequest proto.InternalMessageInfo
type Envelope struct {
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"timestamp"`
Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"`
Topic string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"`
Event *types.Any `protobuf:"bytes,4,opt,name=event,proto3" json:"event,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Envelope) Reset() { *m = Envelope{} }
func (*Envelope) ProtoMessage() {}
func (*Envelope) Descriptor() ([]byte, []int) {
return fileDescriptor_19f98672016720b5, []int{1}
}
func (m *Envelope) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Envelope) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Envelope.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Envelope) XXX_Merge(src proto.Message) {
xxx_messageInfo_Envelope.Merge(m, src)
}
func (m *Envelope) XXX_Size() int {
return m.Size()
}
func (m *Envelope) XXX_DiscardUnknown() {
xxx_messageInfo_Envelope.DiscardUnknown(m)
}
var xxx_messageInfo_Envelope proto.InternalMessageInfo
func init() {
proto.RegisterType((*PublishRequest)(nil), "containerd.services.events.ttrpc.v1.PublishRequest")
proto.RegisterType((*ForwardRequest)(nil), "containerd.services.events.ttrpc.v1.ForwardRequest")
proto.RegisterType((*Envelope)(nil), "containerd.services.events.ttrpc.v1.Envelope")
}
func init() {
@ -75,30 +121,62 @@ func init() {
}
var fileDescriptor_19f98672016720b5 = []byte{
// 311 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x51, 0x3f, 0x4b, 0x33, 0x31,
0x18, 0x6f, 0x5e, 0x68, 0x5f, 0x8c, 0xe0, 0x70, 0x14, 0xa9, 0x15, 0x62, 0xd1, 0xa5, 0x38, 0x24,
0xb4, 0x1d, 0x5d, 0x54, 0xec, 0xe2, 0x24, 0x37, 0x48, 0x71, 0x10, 0xef, 0xae, 0x69, 0x1a, 0xb8,
0x4b, 0xe2, 0xe5, 0xb9, 0x83, 0x6e, 0x7e, 0xbc, 0x8e, 0x8e, 0x8e, 0xf6, 0x3e, 0x89, 0x34, 0xb9,
0x52, 0xad, 0x83, 0x82, 0xdb, 0x2f, 0xf9, 0xfd, 0xcb, 0xf3, 0x04, 0xdf, 0x0a, 0x09, 0xf3, 0x22,
0xa6, 0x89, 0xce, 0x58, 0xa2, 0x15, 0x44, 0x52, 0xf1, 0x7c, 0xfa, 0x19, 0x46, 0x46, 0x32, 0xcb,
0xf3, 0x52, 0x26, 0xdc, 0x32, 0x80, 0xdc, 0x24, 0x8c, 0x97, 0x5c, 0x81, 0x65, 0xe5, 0xa0, 0x46,
0xd4, 0xe4, 0x1a, 0x74, 0x70, 0xb6, 0x75, 0xd1, 0x8d, 0x83, 0xd6, 0x0a, 0x67, 0xa4, 0xe5, 0xa0,
0x7b, 0xf9, 0x63, 0xa1, 0x0b, 0x8b, 0x8b, 0x19, 0x33, 0x69, 0x21, 0xa4, 0x62, 0x33, 0xc9, 0xd3,
0xa9, 0x89, 0x60, 0xee, 0x6b, 0xba, 0x6d, 0xa1, 0x85, 0x76, 0x90, 0xad, 0x51, 0x7d, 0x7b, 0x24,
0xb4, 0x16, 0x29, 0xdf, 0xba, 0x23, 0xb5, 0xa8, 0xa9, 0xe3, 0x5d, 0x8a, 0x67, 0x06, 0x36, 0xe4,
0xc9, 0x2e, 0x09, 0x32, 0xe3, 0x16, 0xa2, 0xcc, 0x78, 0xc1, 0x69, 0x88, 0x0f, 0xee, 0x8a, 0x38,
0x95, 0x76, 0x1e, 0xf2, 0xe7, 0x82, 0x5b, 0x08, 0xda, 0xb8, 0x09, 0xda, 0xc8, 0xa4, 0x83, 0x7a,
0xa8, 0xbf, 0x17, 0xfa, 0x43, 0x70, 0x8e, 0x9b, 0x6e, 0xd6, 0xce, 0xbf, 0x1e, 0xea, 0xef, 0x0f,
0xdb, 0xd4, 0x07, 0xd3, 0x4d, 0x30, 0xbd, 0x52, 0x8b, 0xd0, 0x4b, 0x86, 0x4f, 0xb8, 0x35, 0x76,
0x7b, 0x09, 0xee, 0xf1, 0xff, 0x3a, 0x3d, 0x18, 0xd1, 0x5f, 0xec, 0x8f, 0x7e, 0x7d, 0x4b, 0xf7,
0xf0, 0x5b, 0xcd, 0x78, 0x3d, 0xdc, 0xf5, 0xe3, 0x72, 0x45, 0x1a, 0x6f, 0x2b, 0xd2, 0x78, 0xa9,
0x08, 0x5a, 0x56, 0x04, 0xbd, 0x56, 0x04, 0xbd, 0x57, 0x04, 0x3d, 0xdc, 0xfc, 0xe9, 0xc7, 0x2f,
0x3c, 0x9a, 0x34, 0x26, 0x28, 0x6e, 0xb9, 0xce, 0xd1, 0x47, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8d,
0x47, 0xe0, 0xf5, 0x44, 0x02, 0x00, 0x00,
// 396 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x52, 0xc1, 0x8e, 0xd3, 0x30,
0x10, 0x8d, 0x61, 0x77, 0x69, 0x8d, 0xc4, 0xc1, 0xaa, 0x50, 0x08, 0x28, 0x59, 0x2d, 0x97, 0x15,
0x12, 0xb6, 0x76, 0xf7, 0x06, 0x17, 0xa8, 0x28, 0x12, 0x1c, 0x23, 0x84, 0x2a, 0x90, 0x10, 0x6e,
0x3a, 0x4d, 0x2d, 0x25, 0xb6, 0x49, 0x9c, 0xa0, 0xde, 0xfa, 0x09, 0x7c, 0x0c, 0x17, 0xfe, 0xa0,
0x47, 0x8e, 0x9c, 0x80, 0xe6, 0x4b, 0x50, 0x9d, 0xa4, 0x81, 0xf6, 0x40, 0xa5, 0xbd, 0xbd, 0xcc,
0x7b, 0x6f, 0xde, 0xcc, 0xc4, 0xf8, 0x75, 0x2c, 0xcc, 0xbc, 0x98, 0xd0, 0x48, 0xa5, 0x2c, 0x52,
0xd2, 0x70, 0x21, 0x21, 0x9b, 0xfe, 0x0d, 0xb9, 0x16, 0x2c, 0x87, 0xac, 0x14, 0x11, 0xe4, 0xcc,
0x98, 0x4c, 0x47, 0x0c, 0x4a, 0x90, 0x26, 0x67, 0xe5, 0x45, 0x83, 0xa8, 0xce, 0x94, 0x51, 0xe4,
0x61, 0xe7, 0xa2, 0xad, 0x83, 0x36, 0x0a, 0x6b, 0xa4, 0xe5, 0x85, 0xf7, 0xec, 0xbf, 0x81, 0xb6,
0xd9, 0xa4, 0x98, 0x31, 0x9d, 0x14, 0xb1, 0x90, 0x6c, 0x26, 0x20, 0x99, 0x6a, 0x6e, 0xe6, 0x75,
0x8c, 0x37, 0x88, 0x55, 0xac, 0x2c, 0x64, 0x1b, 0xd4, 0x54, 0xef, 0xc5, 0x4a, 0xc5, 0x09, 0x74,
0x6e, 0x2e, 0x17, 0x0d, 0x75, 0x7f, 0x97, 0x82, 0x54, 0x9b, 0x96, 0x0c, 0x76, 0x49, 0x23, 0x52,
0xc8, 0x0d, 0x4f, 0x75, 0x2d, 0x38, 0x7b, 0x8f, 0xef, 0xbc, 0x54, 0xd9, 0x67, 0x9e, 0x4d, 0x43,
0xf8, 0x54, 0x40, 0x6e, 0xc8, 0x2b, 0xdc, 0x03, 0x59, 0x42, 0xa2, 0x34, 0xb8, 0xe8, 0x14, 0x9d,
0xdf, 0xbe, 0x7c, 0x4c, 0x0f, 0x58, 0x9d, 0x8e, 0x1a, 0x53, 0xb8, 0xb5, 0x9f, 0x7d, 0x45, 0xb8,
0xd7, 0x96, 0xc9, 0x10, 0xf7, 0xb7, 0xe1, 0x4d, 0x63, 0x8f, 0xd6, 0xe3, 0xd1, 0x76, 0x3c, 0xfa,
0xa6, 0x55, 0x0c, 0x7b, 0xab, 0x9f, 0x81, 0xf3, 0xe5, 0x57, 0x80, 0xc2, 0xce, 0x46, 0x1e, 0xe0,
0xbe, 0xe4, 0x29, 0xe4, 0x9a, 0x47, 0xe0, 0xde, 0x38, 0x45, 0xe7, 0xfd, 0xb0, 0x2b, 0x90, 0x01,
0x3e, 0x36, 0x4a, 0x8b, 0xc8, 0xbd, 0x69, 0x99, 0xfa, 0x83, 0x3c, 0xc2, 0xc7, 0x76, 0x54, 0xf7,
0xc8, 0x66, 0x0e, 0xf6, 0x32, 0x9f, 0xcb, 0x45, 0x58, 0x4b, 0x9e, 0x1c, 0x2d, 0xbf, 0x05, 0xe8,
0xf2, 0x23, 0x3e, 0x19, 0xd9, 0xe5, 0xc8, 0x5b, 0x7c, 0xab, 0xb9, 0x0e, 0xb9, 0x3a, 0xe8, 0x08,
0xff, 0xde, 0xd2, 0xbb, 0xbb, 0x17, 0x36, 0xda, 0xfc, 0x9c, 0xe1, 0x87, 0xd5, 0xda, 0x77, 0x7e,
0xac, 0x7d, 0x67, 0x59, 0xf9, 0x68, 0x55, 0xf9, 0xe8, 0x7b, 0xe5, 0xa3, 0xdf, 0x95, 0x8f, 0xde,
0xbd, 0xb8, 0xd6, 0x8b, 0x7d, 0x5a, 0xa3, 0xb1, 0x33, 0x46, 0x93, 0x13, 0x9b, 0x79, 0xf5, 0x27,
0x00, 0x00, 0xff, 0xff, 0xd4, 0x90, 0xbd, 0x09, 0x04, 0x03, 0x00, 0x00,
}
func (m *PublishRequest) Marshal() (dAtA []byte, err error) {
// Field returns the value for the given fieldpath as a string, if defined.
// If the value is not defined, the second value will be false.
func (m *Envelope) Field(fieldpath []string) (string, bool) {
if len(fieldpath) == 0 {
return "", false
}
switch fieldpath[0] {
// unhandled: timestamp
case "namespace":
return string(m.Namespace), len(m.Namespace) > 0
case "topic":
return string(m.Topic), len(m.Topic) > 0
case "event":
decoded, err := github_com_containerd_typeurl.UnmarshalAny(m.Event)
if err != nil {
return "", false
}
adaptor, ok := decoded.(interface{ Field([]string) (string, bool) })
if !ok {
return "", false
}
return adaptor.Field(fieldpath[1:])
}
return "", false
}
func (m *ForwardRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
@ -108,26 +186,71 @@ func (m *PublishRequest) Marshal() (dAtA []byte, err error) {
return dAtA[:n], nil
}
func (m *PublishRequest) MarshalTo(dAtA []byte) (int, error) {
func (m *ForwardRequest) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.Topic) > 0 {
if m.Envelope != nil {
dAtA[i] = 0xa
i++
i = encodeVarintEvents(dAtA, i, uint64(m.Envelope.Size()))
n1, err := m.Envelope.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n1
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
}
return i, nil
}
func (m *Envelope) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Envelope) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
dAtA[i] = 0xa
i++
i = encodeVarintEvents(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp)))
n2, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:])
if err != nil {
return 0, err
}
i += n2
if len(m.Namespace) > 0 {
dAtA[i] = 0x12
i++
i = encodeVarintEvents(dAtA, i, uint64(len(m.Namespace)))
i += copy(dAtA[i:], m.Namespace)
}
if len(m.Topic) > 0 {
dAtA[i] = 0x1a
i++
i = encodeVarintEvents(dAtA, i, uint64(len(m.Topic)))
i += copy(dAtA[i:], m.Topic)
}
if m.Event != nil {
dAtA[i] = 0x12
dAtA[i] = 0x22
i++
i = encodeVarintEvents(dAtA, i, uint64(m.Event.Size()))
n1, err := m.Event.MarshalTo(dAtA[i:])
n3, err := m.Event.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n1
i += n3
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
@ -144,12 +267,34 @@ func encodeVarintEvents(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
return offset + 1
}
func (m *PublishRequest) Size() (n int) {
func (m *ForwardRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Envelope != nil {
l = m.Envelope.Size()
n += 1 + l + sovEvents(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *Envelope) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp)
n += 1 + l + sovEvents(uint64(l))
l = len(m.Namespace)
if l > 0 {
n += 1 + l + sovEvents(uint64(l))
}
l = len(m.Topic)
if l > 0 {
n += 1 + l + sovEvents(uint64(l))
@ -177,11 +322,24 @@ func sovEvents(x uint64) (n int) {
func sozEvents(x uint64) (n int) {
return sovEvents(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (this *PublishRequest) String() string {
func (this *ForwardRequest) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&PublishRequest{`,
s := strings.Join([]string{`&ForwardRequest{`,
`Envelope:` + strings.Replace(fmt.Sprintf("%v", this.Envelope), "Envelope", "Envelope", 1) + `,`,
`XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`,
`}`,
}, "")
return s
}
func (this *Envelope) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&Envelope{`,
`Timestamp:` + strings.Replace(strings.Replace(this.Timestamp.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`Namespace:` + fmt.Sprintf("%v", this.Namespace) + `,`,
`Topic:` + fmt.Sprintf("%v", this.Topic) + `,`,
`Event:` + strings.Replace(fmt.Sprintf("%v", this.Event), "Any", "types.Any", 1) + `,`,
`XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`,
@ -199,17 +357,17 @@ func valueToStringEvents(v interface{}) string {
}
type EventsService interface {
Publish(ctx context.Context, req *PublishRequest) (*types.Empty, error)
Forward(ctx context.Context, req *ForwardRequest) (*types.Empty, error)
}
func RegisterEventsService(srv *github_com_containerd_ttrpc.Server, svc EventsService) {
srv.Register("containerd.services.events.ttrpc.v1.Events", map[string]github_com_containerd_ttrpc.Method{
"Publish": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req PublishRequest
"Forward": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req ForwardRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.Publish(ctx, &req)
return svc.Forward(ctx, &req)
},
})
}
@ -224,14 +382,14 @@ func NewEventsClient(client *github_com_containerd_ttrpc.Client) EventsService {
}
}
func (c *eventsClient) Publish(ctx context.Context, req *PublishRequest) (*types.Empty, error) {
func (c *eventsClient) Forward(ctx context.Context, req *ForwardRequest) (*types.Empty, error) {
var resp types.Empty
if err := c.client.Call(ctx, "containerd.services.events.ttrpc.v1.Events", "Publish", req, &resp); err != nil {
if err := c.client.Call(ctx, "containerd.services.events.ttrpc.v1.Events", "Forward", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (m *PublishRequest) Unmarshal(dAtA []byte) error {
func (m *ForwardRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
@ -254,13 +412,168 @@ func (m *PublishRequest) Unmarshal(dAtA []byte) error {
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PublishRequest: wiretype end group for non-group")
return fmt.Errorf("proto: ForwardRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PublishRequest: illegal tag %d (wire type %d)", fieldNum, wire)
return fmt.Errorf("proto: ForwardRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Envelope", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthEvents
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthEvents
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Envelope == nil {
m.Envelope = &Envelope{}
}
if err := m.Envelope.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipEvents(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthEvents
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthEvents
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Envelope) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Envelope: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Envelope: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthEvents
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthEvents
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Namespace", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthEvents
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthEvents
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Namespace = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType)
}
@ -292,7 +605,7 @@ func (m *PublishRequest) Unmarshal(dAtA []byte) error {
}
m.Topic = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType)
}

View File

@ -11,15 +11,22 @@ import "google/protobuf/timestamp.proto";
option go_package = "github.com/containerd/containerd/api/services/ttrpc/events/v1;events";
service Events {
// Publish an event to a topic.
// Forward sends an event that has already been packaged into an envelope
// with a timestamp and namespace.
//
// The event will be packed into a timestamp envelope with the namespace
// introspected from the context. The envelope will then be dispatched.
rpc Publish(PublishRequest) returns (google.protobuf.Empty);
// This is useful if earlier timestamping is required or when forwarding on
// behalf of another component, namespace or publisher.
rpc Forward(ForwardRequest) returns (google.protobuf.Empty);
}
message PublishRequest {
string topic = 1;
google.protobuf.Any event = 2;
message ForwardRequest {
Envelope envelope = 1;
}
message Envelope {
option (containerd.plugin.fieldpath) = true;
google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
string namespace = 2;
string topic = 3;
google.protobuf.Any event = 4;
}

View File

@ -295,13 +295,21 @@ type remoteEventsPublisher struct {
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
any, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
_, err = l.client.Publish(ctx, &v1.PublishRequest{
_, err = l.client.Forward(ctx, &v1.ForwardRequest{
Envelope: &v1.Envelope{
Timestamp: time.Now(),
Namespace: ns,
Topic: topic,
Event: any,
},
})
return err
}

View File

@ -21,6 +21,7 @@ import (
api "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/events/exchange"
ptypes "github.com/gogo/protobuf/types"
)
@ -29,10 +30,19 @@ type ttrpcService struct {
events *exchange.Exchange
}
func (s *ttrpcService) Publish(ctx context.Context, r *api.PublishRequest) (*ptypes.Empty, error) {
if err := s.events.Publish(ctx, r.Topic, r.Event); err != nil {
func (s *ttrpcService) Forward(ctx context.Context, r *api.ForwardRequest) (*ptypes.Empty, error) {
if err := s.events.Forward(ctx, fromTProto(r.Envelope)); err != nil {
return nil, errdefs.ToGRPC(err)
}
return &ptypes.Empty{}, nil
}
func fromTProto(env *api.Envelope) *events.Envelope {
return &events.Envelope{
Timestamp: env.Timestamp,
Namespace: env.Namespace,
Topic: env.Topic,
Event: env.Event,
}
}