Merge pull request #3204 from crosbymichael/fix-forward

Fix API forward events for shims
This commit is contained in:
Derek McGowan 2019-04-11 11:46:46 -07:00 committed by GitHub
commit acca107732
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 680 additions and 145 deletions

View File

@ -4211,28 +4211,61 @@ file {
dependency: "google/protobuf/empty.proto" dependency: "google/protobuf/empty.proto"
dependency: "google/protobuf/timestamp.proto" dependency: "google/protobuf/timestamp.proto"
message_type { message_type {
name: "PublishRequest" name: "ForwardRequest"
field {
name: "envelope"
number: 1
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".containerd.services.events.ttrpc.v1.Envelope"
json_name: "envelope"
}
}
message_type {
name: "Envelope"
field {
name: "timestamp"
number: 1
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".google.protobuf.Timestamp"
options {
65001: 0
65010: 1
}
json_name: "timestamp"
}
field {
name: "namespace"
number: 2
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "namespace"
}
field { field {
name: "topic" name: "topic"
number: 1 number: 3
label: LABEL_OPTIONAL label: LABEL_OPTIONAL
type: TYPE_STRING type: TYPE_STRING
json_name: "topic" json_name: "topic"
} }
field { field {
name: "event" name: "event"
number: 2 number: 4
label: LABEL_OPTIONAL label: LABEL_OPTIONAL
type: TYPE_MESSAGE type: TYPE_MESSAGE
type_name: ".google.protobuf.Any" type_name: ".google.protobuf.Any"
json_name: "event" json_name: "event"
} }
options {
64400: 1
}
} }
service { service {
name: "Events" name: "Events"
method { method {
name: "Publish" name: "Forward"
input_type: ".containerd.services.events.ttrpc.v1.PublishRequest" input_type: ".containerd.services.events.ttrpc.v1.ForwardRequest"
output_type: ".google.protobuf.Empty" output_type: ".google.protobuf.Empty"
} }
} }

View File

@ -7,18 +7,22 @@ import (
context "context" context "context"
fmt "fmt" fmt "fmt"
github_com_containerd_ttrpc "github.com/containerd/ttrpc" github_com_containerd_ttrpc "github.com/containerd/ttrpc"
github_com_containerd_typeurl "github.com/containerd/typeurl"
proto "github.com/gogo/protobuf/proto" proto "github.com/gogo/protobuf/proto"
github_com_gogo_protobuf_types "github.com/gogo/protobuf/types"
types "github.com/gogo/protobuf/types" types "github.com/gogo/protobuf/types"
io "io" io "io"
math "math" math "math"
reflect "reflect" reflect "reflect"
strings "strings" strings "strings"
time "time"
) )
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal var _ = proto.Marshal
var _ = fmt.Errorf var _ = fmt.Errorf
var _ = math.Inf var _ = math.Inf
var _ = time.Kitchen
// This is a compile-time assertion to ensure that this generated file // This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against. // is compatible with the proto package it is being compiled against.
@ -26,25 +30,24 @@ var _ = math.Inf
// proto package needs to be updated. // proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type PublishRequest struct { type ForwardRequest struct {
Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` Envelope *Envelope `protobuf:"bytes,1,opt,name=envelope,proto3" json:"envelope,omitempty"`
Event *types.Any `protobuf:"bytes,2,opt,name=event,proto3" json:"event,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
} }
func (m *PublishRequest) Reset() { *m = PublishRequest{} } func (m *ForwardRequest) Reset() { *m = ForwardRequest{} }
func (*PublishRequest) ProtoMessage() {} func (*ForwardRequest) ProtoMessage() {}
func (*PublishRequest) Descriptor() ([]byte, []int) { func (*ForwardRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_19f98672016720b5, []int{0} return fileDescriptor_19f98672016720b5, []int{0}
} }
func (m *PublishRequest) XXX_Unmarshal(b []byte) error { func (m *ForwardRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
} }
func (m *PublishRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { func (m *ForwardRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic { if deterministic {
return xxx_messageInfo_PublishRequest.Marshal(b, m, deterministic) return xxx_messageInfo_ForwardRequest.Marshal(b, m, deterministic)
} else { } else {
b = b[:cap(b)] b = b[:cap(b)]
n, err := m.MarshalTo(b) n, err := m.MarshalTo(b)
@ -54,20 +57,63 @@ func (m *PublishRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, erro
return b[:n], nil return b[:n], nil
} }
} }
func (m *PublishRequest) XXX_Merge(src proto.Message) { func (m *ForwardRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_PublishRequest.Merge(m, src) xxx_messageInfo_ForwardRequest.Merge(m, src)
} }
func (m *PublishRequest) XXX_Size() int { func (m *ForwardRequest) XXX_Size() int {
return m.Size() return m.Size()
} }
func (m *PublishRequest) XXX_DiscardUnknown() { func (m *ForwardRequest) XXX_DiscardUnknown() {
xxx_messageInfo_PublishRequest.DiscardUnknown(m) xxx_messageInfo_ForwardRequest.DiscardUnknown(m)
} }
var xxx_messageInfo_PublishRequest proto.InternalMessageInfo var xxx_messageInfo_ForwardRequest proto.InternalMessageInfo
type Envelope struct {
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"timestamp"`
Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"`
Topic string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"`
Event *types.Any `protobuf:"bytes,4,opt,name=event,proto3" json:"event,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Envelope) Reset() { *m = Envelope{} }
func (*Envelope) ProtoMessage() {}
func (*Envelope) Descriptor() ([]byte, []int) {
return fileDescriptor_19f98672016720b5, []int{1}
}
func (m *Envelope) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Envelope) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Envelope.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Envelope) XXX_Merge(src proto.Message) {
xxx_messageInfo_Envelope.Merge(m, src)
}
func (m *Envelope) XXX_Size() int {
return m.Size()
}
func (m *Envelope) XXX_DiscardUnknown() {
xxx_messageInfo_Envelope.DiscardUnknown(m)
}
var xxx_messageInfo_Envelope proto.InternalMessageInfo
func init() { func init() {
proto.RegisterType((*PublishRequest)(nil), "containerd.services.events.ttrpc.v1.PublishRequest") proto.RegisterType((*ForwardRequest)(nil), "containerd.services.events.ttrpc.v1.ForwardRequest")
proto.RegisterType((*Envelope)(nil), "containerd.services.events.ttrpc.v1.Envelope")
} }
func init() { func init() {
@ -75,30 +121,62 @@ func init() {
} }
var fileDescriptor_19f98672016720b5 = []byte{ var fileDescriptor_19f98672016720b5 = []byte{
// 311 bytes of a gzipped FileDescriptorProto // 396 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x51, 0x3f, 0x4b, 0x33, 0x31, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x52, 0xc1, 0x8e, 0xd3, 0x30,
0x18, 0x6f, 0x5e, 0x68, 0x5f, 0x8c, 0xe0, 0x70, 0x14, 0xa9, 0x15, 0x62, 0xd1, 0xa5, 0x38, 0x24, 0x10, 0x8d, 0x61, 0x77, 0x69, 0x8d, 0xc4, 0xc1, 0xaa, 0x50, 0x08, 0x28, 0x59, 0x2d, 0x97, 0x15,
0xb4, 0x1d, 0x5d, 0x54, 0xec, 0xe2, 0x24, 0x37, 0x48, 0x71, 0x10, 0xef, 0xae, 0x69, 0x1a, 0xb8, 0x12, 0xb6, 0x76, 0xf7, 0x06, 0x17, 0xa8, 0x28, 0x12, 0x1c, 0x23, 0x84, 0x2a, 0x90, 0x10, 0x6e,
0x4b, 0xe2, 0xe5, 0xb9, 0x83, 0x6e, 0x7e, 0xbc, 0x8e, 0x8e, 0x8e, 0xf6, 0x3e, 0x89, 0x34, 0xb9, 0x3a, 0x4d, 0x2d, 0x25, 0xb6, 0x49, 0x9c, 0xa0, 0xde, 0xfa, 0x09, 0x7c, 0x0c, 0x17, 0xfe, 0xa0,
0x52, 0xad, 0x83, 0x82, 0xdb, 0x2f, 0xf9, 0xfd, 0xcb, 0xf3, 0x04, 0xdf, 0x0a, 0x09, 0xf3, 0x22, 0x47, 0x8e, 0x9c, 0x80, 0xe6, 0x4b, 0x50, 0x9d, 0xa4, 0x81, 0xf6, 0x40, 0xa5, 0xbd, 0xbd, 0xcc,
0xa6, 0x89, 0xce, 0x58, 0xa2, 0x15, 0x44, 0x52, 0xf1, 0x7c, 0xfa, 0x19, 0x46, 0x46, 0x32, 0xcb, 0x7b, 0x6f, 0xde, 0xcc, 0xc4, 0xf8, 0x75, 0x2c, 0xcc, 0xbc, 0x98, 0xd0, 0x48, 0xa5, 0x2c, 0x52,
0xf3, 0x52, 0x26, 0xdc, 0x32, 0x80, 0xdc, 0x24, 0x8c, 0x97, 0x5c, 0x81, 0x65, 0xe5, 0xa0, 0x46, 0xd2, 0x70, 0x21, 0x21, 0x9b, 0xfe, 0x0d, 0xb9, 0x16, 0x2c, 0x87, 0xac, 0x14, 0x11, 0xe4, 0xcc,
0xd4, 0xe4, 0x1a, 0x74, 0x70, 0xb6, 0x75, 0xd1, 0x8d, 0x83, 0xd6, 0x0a, 0x67, 0xa4, 0xe5, 0xa0, 0x98, 0x4c, 0x47, 0x0c, 0x4a, 0x90, 0x26, 0x67, 0xe5, 0x45, 0x83, 0xa8, 0xce, 0x94, 0x51, 0xe4,
0x7b, 0xf9, 0x63, 0xa1, 0x0b, 0x8b, 0x8b, 0x19, 0x33, 0x69, 0x21, 0xa4, 0x62, 0x33, 0xc9, 0xd3, 0x61, 0xe7, 0xa2, 0xad, 0x83, 0x36, 0x0a, 0x6b, 0xa4, 0xe5, 0x85, 0xf7, 0xec, 0xbf, 0x81, 0xb6,
0xa9, 0x89, 0x60, 0xee, 0x6b, 0xba, 0x6d, 0xa1, 0x85, 0x76, 0x90, 0xad, 0x51, 0x7d, 0x7b, 0x24, 0xd9, 0xa4, 0x98, 0x31, 0x9d, 0x14, 0xb1, 0x90, 0x6c, 0x26, 0x20, 0x99, 0x6a, 0x6e, 0xe6, 0x75,
0xb4, 0x16, 0x29, 0xdf, 0xba, 0x23, 0xb5, 0xa8, 0xa9, 0xe3, 0x5d, 0x8a, 0x67, 0x06, 0x36, 0xe4, 0x8c, 0x37, 0x88, 0x55, 0xac, 0x2c, 0x64, 0x1b, 0xd4, 0x54, 0xef, 0xc5, 0x4a, 0xc5, 0x09, 0x74,
0xc9, 0x2e, 0x09, 0x32, 0xe3, 0x16, 0xa2, 0xcc, 0x78, 0xc1, 0x69, 0x88, 0x0f, 0xee, 0x8a, 0x38, 0x6e, 0x2e, 0x17, 0x0d, 0x75, 0x7f, 0x97, 0x82, 0x54, 0x9b, 0x96, 0x0c, 0x76, 0x49, 0x23, 0x52,
0x95, 0x76, 0x1e, 0xf2, 0xe7, 0x82, 0x5b, 0x08, 0xda, 0xb8, 0x09, 0xda, 0xc8, 0xa4, 0x83, 0x7a, 0xc8, 0x0d, 0x4f, 0x75, 0x2d, 0x38, 0x7b, 0x8f, 0xef, 0xbc, 0x54, 0xd9, 0x67, 0x9e, 0x4d, 0x43,
0xa8, 0xbf, 0x17, 0xfa, 0x43, 0x70, 0x8e, 0x9b, 0x6e, 0xd6, 0xce, 0xbf, 0x1e, 0xea, 0xef, 0x0f, 0xf8, 0x54, 0x40, 0x6e, 0xc8, 0x2b, 0xdc, 0x03, 0x59, 0x42, 0xa2, 0x34, 0xb8, 0xe8, 0x14, 0x9d,
0xdb, 0xd4, 0x07, 0xd3, 0x4d, 0x30, 0xbd, 0x52, 0x8b, 0xd0, 0x4b, 0x86, 0x4f, 0xb8, 0x35, 0x76, 0xdf, 0xbe, 0x7c, 0x4c, 0x0f, 0x58, 0x9d, 0x8e, 0x1a, 0x53, 0xb8, 0xb5, 0x9f, 0x7d, 0x45, 0xb8,
0x7b, 0x09, 0xee, 0xf1, 0xff, 0x3a, 0x3d, 0x18, 0xd1, 0x5f, 0xec, 0x8f, 0x7e, 0x7d, 0x4b, 0xf7, 0xd7, 0x96, 0xc9, 0x10, 0xf7, 0xb7, 0xe1, 0x4d, 0x63, 0x8f, 0xd6, 0xe3, 0xd1, 0x76, 0x3c, 0xfa,
0xf0, 0x5b, 0xcd, 0x78, 0x3d, 0xdc, 0xf5, 0xe3, 0x72, 0x45, 0x1a, 0x6f, 0x2b, 0xd2, 0x78, 0xa9, 0xa6, 0x55, 0x0c, 0x7b, 0xab, 0x9f, 0x81, 0xf3, 0xe5, 0x57, 0x80, 0xc2, 0xce, 0x46, 0x1e, 0xe0,
0x08, 0x5a, 0x56, 0x04, 0xbd, 0x56, 0x04, 0xbd, 0x57, 0x04, 0x3d, 0xdc, 0xfc, 0xe9, 0xc7, 0x2f, 0xbe, 0xe4, 0x29, 0xe4, 0x9a, 0x47, 0xe0, 0xde, 0x38, 0x45, 0xe7, 0xfd, 0xb0, 0x2b, 0x90, 0x01,
0x3c, 0x9a, 0x34, 0x26, 0x28, 0x6e, 0xb9, 0xce, 0xd1, 0x47, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8d, 0x3e, 0x36, 0x4a, 0x8b, 0xc8, 0xbd, 0x69, 0x99, 0xfa, 0x83, 0x3c, 0xc2, 0xc7, 0x76, 0x54, 0xf7,
0x47, 0xe0, 0xf5, 0x44, 0x02, 0x00, 0x00, 0xc8, 0x66, 0x0e, 0xf6, 0x32, 0x9f, 0xcb, 0x45, 0x58, 0x4b, 0x9e, 0x1c, 0x2d, 0xbf, 0x05, 0xe8,
0xf2, 0x23, 0x3e, 0x19, 0xd9, 0xe5, 0xc8, 0x5b, 0x7c, 0xab, 0xb9, 0x0e, 0xb9, 0x3a, 0xe8, 0x08,
0xff, 0xde, 0xd2, 0xbb, 0xbb, 0x17, 0x36, 0xda, 0xfc, 0x9c, 0xe1, 0x87, 0xd5, 0xda, 0x77, 0x7e,
0xac, 0x7d, 0x67, 0x59, 0xf9, 0x68, 0x55, 0xf9, 0xe8, 0x7b, 0xe5, 0xa3, 0xdf, 0x95, 0x8f, 0xde,
0xbd, 0xb8, 0xd6, 0x8b, 0x7d, 0x5a, 0xa3, 0xb1, 0x33, 0x46, 0x93, 0x13, 0x9b, 0x79, 0xf5, 0x27,
0x00, 0x00, 0xff, 0xff, 0xd4, 0x90, 0xbd, 0x09, 0x04, 0x03, 0x00, 0x00,
} }
func (m *PublishRequest) Marshal() (dAtA []byte, err error) { // Field returns the value for the given fieldpath as a string, if defined.
// If the value is not defined, the second value will be false.
func (m *Envelope) Field(fieldpath []string) (string, bool) {
if len(fieldpath) == 0 {
return "", false
}
switch fieldpath[0] {
// unhandled: timestamp
case "namespace":
return string(m.Namespace), len(m.Namespace) > 0
case "topic":
return string(m.Topic), len(m.Topic) > 0
case "event":
decoded, err := github_com_containerd_typeurl.UnmarshalAny(m.Event)
if err != nil {
return "", false
}
adaptor, ok := decoded.(interface{ Field([]string) (string, bool) })
if !ok {
return "", false
}
return adaptor.Field(fieldpath[1:])
}
return "", false
}
func (m *ForwardRequest) Marshal() (dAtA []byte, err error) {
size := m.Size() size := m.Size()
dAtA = make([]byte, size) dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA) n, err := m.MarshalTo(dAtA)
@ -108,26 +186,71 @@ func (m *PublishRequest) Marshal() (dAtA []byte, err error) {
return dAtA[:n], nil return dAtA[:n], nil
} }
func (m *PublishRequest) MarshalTo(dAtA []byte) (int, error) { func (m *ForwardRequest) MarshalTo(dAtA []byte) (int, error) {
var i int var i int
_ = i _ = i
var l int var l int
_ = l _ = l
if len(m.Topic) > 0 { if m.Envelope != nil {
dAtA[i] = 0xa dAtA[i] = 0xa
i++ i++
i = encodeVarintEvents(dAtA, i, uint64(m.Envelope.Size()))
n1, err := m.Envelope.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n1
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
}
return i, nil
}
func (m *Envelope) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Envelope) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
dAtA[i] = 0xa
i++
i = encodeVarintEvents(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp)))
n2, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:])
if err != nil {
return 0, err
}
i += n2
if len(m.Namespace) > 0 {
dAtA[i] = 0x12
i++
i = encodeVarintEvents(dAtA, i, uint64(len(m.Namespace)))
i += copy(dAtA[i:], m.Namespace)
}
if len(m.Topic) > 0 {
dAtA[i] = 0x1a
i++
i = encodeVarintEvents(dAtA, i, uint64(len(m.Topic))) i = encodeVarintEvents(dAtA, i, uint64(len(m.Topic)))
i += copy(dAtA[i:], m.Topic) i += copy(dAtA[i:], m.Topic)
} }
if m.Event != nil { if m.Event != nil {
dAtA[i] = 0x12 dAtA[i] = 0x22
i++ i++
i = encodeVarintEvents(dAtA, i, uint64(m.Event.Size())) i = encodeVarintEvents(dAtA, i, uint64(m.Event.Size()))
n1, err := m.Event.MarshalTo(dAtA[i:]) n3, err := m.Event.MarshalTo(dAtA[i:])
if err != nil { if err != nil {
return 0, err return 0, err
} }
i += n1 i += n3
} }
if m.XXX_unrecognized != nil { if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized) i += copy(dAtA[i:], m.XXX_unrecognized)
@ -144,12 +267,34 @@ func encodeVarintEvents(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v) dAtA[offset] = uint8(v)
return offset + 1 return offset + 1
} }
func (m *PublishRequest) Size() (n int) { func (m *ForwardRequest) Size() (n int) {
if m == nil { if m == nil {
return 0 return 0
} }
var l int var l int
_ = l _ = l
if m.Envelope != nil {
l = m.Envelope.Size()
n += 1 + l + sovEvents(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *Envelope) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp)
n += 1 + l + sovEvents(uint64(l))
l = len(m.Namespace)
if l > 0 {
n += 1 + l + sovEvents(uint64(l))
}
l = len(m.Topic) l = len(m.Topic)
if l > 0 { if l > 0 {
n += 1 + l + sovEvents(uint64(l)) n += 1 + l + sovEvents(uint64(l))
@ -177,11 +322,24 @@ func sovEvents(x uint64) (n int) {
func sozEvents(x uint64) (n int) { func sozEvents(x uint64) (n int) {
return sovEvents(uint64((x << 1) ^ uint64((int64(x) >> 63)))) return sovEvents(uint64((x << 1) ^ uint64((int64(x) >> 63))))
} }
func (this *PublishRequest) String() string { func (this *ForwardRequest) String() string {
if this == nil { if this == nil {
return "nil" return "nil"
} }
s := strings.Join([]string{`&PublishRequest{`, s := strings.Join([]string{`&ForwardRequest{`,
`Envelope:` + strings.Replace(fmt.Sprintf("%v", this.Envelope), "Envelope", "Envelope", 1) + `,`,
`XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`,
`}`,
}, "")
return s
}
func (this *Envelope) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&Envelope{`,
`Timestamp:` + strings.Replace(strings.Replace(this.Timestamp.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`Namespace:` + fmt.Sprintf("%v", this.Namespace) + `,`,
`Topic:` + fmt.Sprintf("%v", this.Topic) + `,`, `Topic:` + fmt.Sprintf("%v", this.Topic) + `,`,
`Event:` + strings.Replace(fmt.Sprintf("%v", this.Event), "Any", "types.Any", 1) + `,`, `Event:` + strings.Replace(fmt.Sprintf("%v", this.Event), "Any", "types.Any", 1) + `,`,
`XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`, `XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`,
@ -199,17 +357,17 @@ func valueToStringEvents(v interface{}) string {
} }
type EventsService interface { type EventsService interface {
Publish(ctx context.Context, req *PublishRequest) (*types.Empty, error) Forward(ctx context.Context, req *ForwardRequest) (*types.Empty, error)
} }
func RegisterEventsService(srv *github_com_containerd_ttrpc.Server, svc EventsService) { func RegisterEventsService(srv *github_com_containerd_ttrpc.Server, svc EventsService) {
srv.Register("containerd.services.events.ttrpc.v1.Events", map[string]github_com_containerd_ttrpc.Method{ srv.Register("containerd.services.events.ttrpc.v1.Events", map[string]github_com_containerd_ttrpc.Method{
"Publish": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { "Forward": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req PublishRequest var req ForwardRequest
if err := unmarshal(&req); err != nil { if err := unmarshal(&req); err != nil {
return nil, err return nil, err
} }
return svc.Publish(ctx, &req) return svc.Forward(ctx, &req)
}, },
}) })
} }
@ -224,14 +382,14 @@ func NewEventsClient(client *github_com_containerd_ttrpc.Client) EventsService {
} }
} }
func (c *eventsClient) Publish(ctx context.Context, req *PublishRequest) (*types.Empty, error) { func (c *eventsClient) Forward(ctx context.Context, req *ForwardRequest) (*types.Empty, error) {
var resp types.Empty var resp types.Empty
if err := c.client.Call(ctx, "containerd.services.events.ttrpc.v1.Events", "Publish", req, &resp); err != nil { if err := c.client.Call(ctx, "containerd.services.events.ttrpc.v1.Events", "Forward", req, &resp); err != nil {
return nil, err return nil, err
} }
return &resp, nil return &resp, nil
} }
func (m *PublishRequest) Unmarshal(dAtA []byte) error { func (m *ForwardRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA) l := len(dAtA)
iNdEx := 0 iNdEx := 0
for iNdEx < l { for iNdEx < l {
@ -254,13 +412,168 @@ func (m *PublishRequest) Unmarshal(dAtA []byte) error {
fieldNum := int32(wire >> 3) fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7) wireType := int(wire & 0x7)
if wireType == 4 { if wireType == 4 {
return fmt.Errorf("proto: PublishRequest: wiretype end group for non-group") return fmt.Errorf("proto: ForwardRequest: wiretype end group for non-group")
} }
if fieldNum <= 0 { if fieldNum <= 0 {
return fmt.Errorf("proto: PublishRequest: illegal tag %d (wire type %d)", fieldNum, wire) return fmt.Errorf("proto: ForwardRequest: illegal tag %d (wire type %d)", fieldNum, wire)
} }
switch fieldNum { switch fieldNum {
case 1: case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Envelope", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthEvents
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthEvents
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Envelope == nil {
m.Envelope = &Envelope{}
}
if err := m.Envelope.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipEvents(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthEvents
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthEvents
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Envelope) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Envelope: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Envelope: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthEvents
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthEvents
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Namespace", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthEvents
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthEvents
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Namespace = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 2 { if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType) return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType)
} }
@ -292,7 +605,7 @@ func (m *PublishRequest) Unmarshal(dAtA []byte) error {
} }
m.Topic = string(dAtA[iNdEx:postIndex]) m.Topic = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex iNdEx = postIndex
case 2: case 4:
if wireType != 2 { if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType) return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType)
} }

View File

@ -11,15 +11,22 @@ import "google/protobuf/timestamp.proto";
option go_package = "github.com/containerd/containerd/api/services/ttrpc/events/v1;events"; option go_package = "github.com/containerd/containerd/api/services/ttrpc/events/v1;events";
service Events { service Events {
// Publish an event to a topic. // Forward sends an event that has already been packaged into an envelope
// with a timestamp and namespace.
// //
// The event will be packed into a timestamp envelope with the namespace // This is useful if earlier timestamping is required or when forwarding on
// introspected from the context. The envelope will then be dispatched. // behalf of another component, namespace or publisher.
rpc Publish(PublishRequest) returns (google.protobuf.Empty); rpc Forward(ForwardRequest) returns (google.protobuf.Empty);
} }
message PublishRequest { message ForwardRequest {
string topic = 1; Envelope envelope = 1;
google.protobuf.Any event = 2; }
message Envelope {
option (containerd.plugin.fieldpath) = true;
google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
string namespace = 2;
string topic = 3;
google.protobuf.Any event = 4;
} }

View File

@ -23,7 +23,6 @@ import (
"os" "os"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/runtime/v2/shim" "github.com/containerd/containerd/runtime/v2/shim"
taskAPI "github.com/containerd/containerd/runtime/v2/task" taskAPI "github.com/containerd/containerd/runtime/v2/task"
ptypes "github.com/gogo/protobuf/types" ptypes "github.com/gogo/protobuf/types"
@ -37,7 +36,7 @@ var (
) )
// New returns a new shim service // New returns a new shim service
func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
return &service{}, nil return &service{}, nil
} }

View File

@ -24,15 +24,15 @@ import (
"github.com/containerd/cgroups" "github.com/containerd/cgroups"
eventstypes "github.com/containerd/containerd/api/events" eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/v2/shim"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
// NewOOMEpoller returns an epoll implementation that listens to OOM events // NewOOMEpoller returns an epoll implementation that listens to OOM events
// from a container's cgroups. // from a container's cgroups.
func NewOOMEpoller(publisher events.Publisher) (*Epoller, error) { func NewOOMEpoller(publisher shim.Publisher) (*Epoller, error) {
fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC) fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC)
if err != nil { if err != nil {
return nil, err return nil, err
@ -49,7 +49,7 @@ type Epoller struct {
mu sync.Mutex mu sync.Mutex
fd int fd int
publisher events.Publisher publisher shim.Publisher
set map[uintptr]*item set map[uintptr]*item
} }

View File

@ -33,7 +33,6 @@ import (
eventstypes "github.com/containerd/containerd/api/events" eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount" "github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
@ -58,12 +57,11 @@ var (
) )
// New returns a new shim service that can be used via GRPC // New returns a new shim service that can be used via GRPC
func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
ep, err := runc.NewOOMEpoller(publisher) ep, err := runc.NewOOMEpoller(publisher)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ctx, cancel := context.WithCancel(ctx)
go ep.Run(ctx) go ep.Run(ctx)
s := &service{ s := &service{
id: id, id: id,
@ -71,15 +69,15 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim,
events: make(chan interface{}, 128), events: make(chan interface{}, 128),
ec: shim.Default.Subscribe(), ec: shim.Default.Subscribe(),
ep: ep, ep: ep,
cancel: cancel, cancel: shutdown,
} }
go s.processExits() go s.processExits()
runcC.Monitor = shim.Default runcC.Monitor = shim.Default
if err := s.initPlatform(); err != nil { if err := s.initPlatform(); err != nil {
cancel() shutdown()
return nil, errors.Wrap(err, "failed to initialized platform behavior") return nil, errors.Wrap(err, "failed to initialized platform behavior")
} }
go s.forward(publisher) go s.forward(ctx, publisher)
return s, nil return s, nil
} }
@ -511,7 +509,7 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task
func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) {
s.cancel() s.cancel()
os.Exit(0) close(s.events)
return empty, nil return empty, nil
} }
@ -619,15 +617,18 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er
return pids, nil return pids, nil
} }
func (s *service) forward(publisher events.Publisher) { func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
ns, _ := namespaces.Namespace(ctx)
ctx = namespaces.WithNamespace(context.Background(), ns)
for e := range s.events { for e := range s.events {
ctx, cancel := context.WithTimeout(s.context, 5*time.Second) ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
err := publisher.Publish(ctx, runc.GetTopic(e), e) err := publisher.Publish(ctx, runc.GetTopic(e), e)
cancel() cancel()
if err != nil { if err != nil {
logrus.WithError(err).Error("post event") logrus.WithError(err).Error("post event")
} }
} }
publisher.Close()
} }
func (s *service) getContainer() (*runc.Container, error) { func (s *service) getContainer() (*runc.Container, error) {

View File

@ -34,7 +34,6 @@ import (
eventstypes "github.com/containerd/containerd/api/events" eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount" "github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
@ -71,12 +70,11 @@ type spec struct {
} }
// New returns a new shim service that can be used via GRPC // New returns a new shim service that can be used via GRPC
func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
ep, err := runc.NewOOMEpoller(publisher) ep, err := runc.NewOOMEpoller(publisher)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ctx, cancel := context.WithCancel(ctx)
go ep.Run(ctx) go ep.Run(ctx)
s := &service{ s := &service{
id: id, id: id,
@ -84,16 +82,16 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim,
events: make(chan interface{}, 128), events: make(chan interface{}, 128),
ec: shim.Default.Subscribe(), ec: shim.Default.Subscribe(),
ep: ep, ep: ep,
cancel: cancel, cancel: shutdown,
containers: make(map[string]*runc.Container), containers: make(map[string]*runc.Container),
} }
go s.processExits() go s.processExits()
runcC.Monitor = shim.Default runcC.Monitor = shim.Default
if err := s.initPlatform(); err != nil { if err := s.initPlatform(); err != nil {
cancel() shutdown()
return nil, errors.Wrap(err, "failed to initialized platform behavior") return nil, errors.Wrap(err, "failed to initialized platform behavior")
} }
go s.forward(publisher) go s.forward(ctx, publisher)
return s, nil return s, nil
} }
@ -570,7 +568,7 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt
return empty, nil return empty, nil
} }
s.cancel() s.cancel()
os.Exit(0) close(s.events)
return empty, nil return empty, nil
} }
@ -689,15 +687,18 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er
return pids, nil return pids, nil
} }
func (s *service) forward(publisher events.Publisher) { func (s *service) forward(ctx context.Context, publisher shim.Publisher) {
ns, _ := namespaces.Namespace(ctx)
ctx = namespaces.WithNamespace(context.Background(), ns)
for e := range s.events { for e := range s.events {
ctx, cancel := context.WithTimeout(s.context, 5*time.Second) ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
err := publisher.Publish(ctx, runc.GetTopic(e), e) err := publisher.Publish(ctx, runc.GetTopic(e), e)
cancel() cancel()
if err != nil { if err != nil {
logrus.WithError(err).Error("post event") logrus.WithError(err).Error("post event")
} }
} }
publisher.Close()
} }
func (s *service) getContainer(id string) (*runc.Container, error) { func (s *service) getContainer(id string) (*runc.Container, error) {

View File

@ -41,7 +41,6 @@ import (
containerd_types "github.com/containerd/containerd/api/types" containerd_types "github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount" "github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
@ -129,12 +128,13 @@ func forwardRunhcsLogs(ctx context.Context, c net.Conn, fields logrus.Fields) {
} }
// New returns a new runhcs shim service that can be used via GRPC // New returns a new runhcs shim service that can be used via GRPC
func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) {
return &service{ return &service{
context: ctx, context: ctx,
id: id, id: id,
processes: make(map[string]*process), processes: make(map[string]*process),
publisher: publisher, publisher: publisher,
shutdown: shutdown,
}, nil }, nil
} }
@ -159,7 +159,8 @@ type service struct {
id string id string
processes map[string]*process processes map[string]*process
publisher events.Publisher publisher shim.Publisher
shutdown func()
} }
func (s *service) newRunhcs() *runhcs.Runhcs { func (s *service) newRunhcs() *runhcs.Runhcs {
@ -1068,7 +1069,8 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt
if s.debugListener != nil { if s.debugListener != nil {
s.debugListener.Close() s.debugListener.Close()
} }
s.publisher.Close()
s.shutdown()
os.Exit(0)
return empty, nil return empty, nil
} }

87
runtime/v2/shim/dialer.go Normal file
View File

@ -0,0 +1,87 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"net"
"sync"
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/ttrpc"
"github.com/pkg/errors"
)
type dialConnect func() (net.Conn, error)
var errDialerClosed = errors.New("events dialer is closed")
func newDialier(newFn dialConnect) *dialer {
return &dialer{
newFn: newFn,
}
}
type dialer struct {
mu sync.Mutex
newFn dialConnect
service v1.EventsService
conn net.Conn
closed bool
}
func (d *dialer) Get() (v1.EventsService, error) {
d.mu.Lock()
defer d.mu.Unlock()
if d.closed {
return nil, errDialerClosed
}
if d.service == nil {
conn, err := d.newFn()
if err != nil {
return nil, err
}
d.conn = conn
d.service = v1.NewEventsClient(ttrpc.NewClient(conn))
}
return d.service, nil
}
func (d *dialer) Put(err error) {
if err != nil {
d.mu.Lock()
d.conn.Close()
d.service = nil
d.mu.Unlock()
}
}
func (d *dialer) Close() (err error) {
d.mu.Lock()
if d.closed {
return errDialerClosed
}
if d.conn != nil {
err = d.conn.Close()
}
d.service = nil
d.closed = true
d.mu.Unlock()
return err
}

View File

@ -0,0 +1,87 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"context"
"net"
"sync"
"time"
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/typeurl"
)
func newPublisher(address string) *remoteEventsPublisher {
return &remoteEventsPublisher{
dialer: newDialier(func() (net.Conn, error) {
return connect(address, dial)
}),
closed: make(chan struct{}),
}
}
type remoteEventsPublisher struct {
dialer *dialer
closed chan struct{}
closer sync.Once
}
func (l *remoteEventsPublisher) Done() <-chan struct{} {
return l.closed
}
func (l *remoteEventsPublisher) Close() (err error) {
err = l.dialer.Close()
l.closer.Do(func() {
close(l.closed)
})
return err
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
client, err := l.dialer.Get()
if err != nil {
return err
}
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
any, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
if _, err := client.Forward(ctx, &v1.ForwardRequest{
Envelope: &v1.Envelope{
Timestamp: time.Now(),
Namespace: ns,
Topic: topic,
Event: any,
},
}); err != nil {
l.dialer.Put(err)
return err
}
return nil
}
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
return d(address, 5*time.Second)
}

View File

@ -20,20 +20,18 @@ import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"net" "io"
"os" "os"
"runtime" "runtime"
"runtime/debug" "runtime/debug"
"strings" "strings"
"time" "time"
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/events" "github.com/containerd/containerd/events"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
shimapi "github.com/containerd/containerd/runtime/v2/task" shimapi "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/ttrpc" "github.com/containerd/ttrpc"
"github.com/containerd/typeurl"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -46,8 +44,14 @@ type Client struct {
signals chan os.Signal signals chan os.Signal
} }
// Publisher for events
type Publisher interface {
events.Publisher
io.Closer
}
// Init func for the creation of a shim server // Init func for the creation of a shim server
type Init func(context.Context, string, events.Publisher) (Shim, error) type Init func(context.Context, string, Publisher, func()) (Shim, error)
// Shim server interface // Shim server interface
type Shim interface { type Shim interface {
@ -156,24 +160,20 @@ func run(id string, initFunc Init, config Config) error {
return err return err
} }
} }
address := fmt.Sprintf("%s.ttrpc", addressFlag)
publisher := newPublisher(address)
defer publisher.Close()
publisher := &remoteEventsPublisher{
address: fmt.Sprintf("%s.ttrpc", addressFlag),
}
conn, err := connect(publisher.address, dialer)
if err != nil {
return err
}
defer conn.Close()
publisher.client = v1.NewEventsClient(ttrpc.NewClient(conn))
if namespaceFlag == "" { if namespaceFlag == "" {
return fmt.Errorf("shim namespace cannot be empty") return fmt.Errorf("shim namespace cannot be empty")
} }
ctx := namespaces.WithNamespace(context.Background(), namespaceFlag) ctx := namespaces.WithNamespace(context.Background(), namespaceFlag)
ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag}) ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag})
ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id)) ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id))
ctx, cancel := context.WithCancel(ctx)
service, err := initFunc(ctx, idFlag, publisher) service, err := initFunc(ctx, idFlag, publisher, cancel)
if err != nil { if err != nil {
return err return err
} }
@ -183,7 +183,7 @@ func run(id string, initFunc Init, config Config) error {
"pid": os.Getpid(), "pid": os.Getpid(),
"namespace": namespaceFlag, "namespace": namespaceFlag,
}) })
go handleSignals(logger, signals) go handleSignals(ctx, logger, signals)
response, err := service.Cleanup(ctx) response, err := service.Cleanup(ctx)
if err != nil { if err != nil {
return err return err
@ -210,7 +210,17 @@ func run(id string, initFunc Init, config Config) error {
return err return err
} }
client := NewShimClient(ctx, service, signals) client := NewShimClient(ctx, service, signals)
return client.Serve() if err := client.Serve(); err != nil {
if err != context.Canceled {
return err
}
}
select {
case <-publisher.Done():
return nil
case <-time.After(5 * time.Second):
return errors.New("publisher not closed")
}
} }
} }
@ -254,7 +264,7 @@ func (s *Client) Serve() error {
dumpStacks(logger) dumpStacks(logger)
} }
}() }()
return handleSignals(logger, s.signals) return handleSignals(s.context, logger, s.signals)
} }
// serve serves the ttrpc API over a unix socket at the provided path // serve serves the ttrpc API over a unix socket at the provided path
@ -288,24 +298,3 @@ func dumpStacks(logger *logrus.Entry) {
buf = buf[:stackSize] buf = buf[:stackSize]
logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf)
} }
type remoteEventsPublisher struct {
address string
client v1.EventsService
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
any, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
_, err = l.client.Publish(ctx, &v1.PublishRequest{
Topic: topic,
Event: any,
})
return err
}
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
return d(address, 100*time.Second)
}

View File

@ -71,11 +71,14 @@ func serveListener(path string) (net.Listener, error) {
return l, nil return l, nil
} }
func handleSignals(logger *logrus.Entry, signals chan os.Signal) error { func handleSignals(ctx context.Context, logger *logrus.Entry, signals chan os.Signal) error {
logger.Info("starting signal loop") logger.Info("starting signal loop")
for { for {
for s := range signals { select {
case <-ctx.Done():
return ctx.Err()
case s := <-signals:
switch s { switch s {
case unix.SIGCHLD: case unix.SIGCHLD:
if err := Reap(); err != nil { if err := Reap(); err != nil {
@ -91,7 +94,7 @@ func openLog(ctx context.Context, _ string) (io.Writer, error) {
return fifo.OpenFifo(ctx, "log", unix.O_WRONLY, 0700) return fifo.OpenFifo(ctx, "log", unix.O_WRONLY, 0700)
} }
func dialer(address string, timeout time.Duration) (net.Conn, error) { func dial(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://") address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", address, timeout) return net.DialTimeout("unix", address, timeout)
} }

View File

@ -104,11 +104,14 @@ func serveListener(path string) (net.Listener, error) {
return l, nil return l, nil
} }
func handleSignals(logger *logrus.Entry, signals chan os.Signal) error { func handleSignals(ctx context.Context, logger *logrus.Entry, signals chan os.Signal) error {
logger.Info("starting signal loop") logger.Info("starting signal loop")
for { for {
for s := range signals { select {
case <-ctx.Done():
return ctx.Err()
case s := <-signals:
switch s { switch s {
case os.Interrupt: case os.Interrupt:
return nil return nil
@ -284,7 +287,7 @@ func openLog(ctx context.Context, id string) (io.Writer, error) {
return dswl, nil return dswl, nil
} }
func dialer(address string, timeout time.Duration) (net.Conn, error) { func dial(address string, timeout time.Duration) (net.Conn, error) {
var c net.Conn var c net.Conn
var lastError error var lastError error
timedOutError := errors.Errorf("timed out waiting for npipe %s", address) timedOutError := errors.Errorf("timed out waiting for npipe %s", address)

View File

@ -21,6 +21,7 @@ import (
api "github.com/containerd/containerd/api/services/ttrpc/events/v1" api "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/events/exchange"
ptypes "github.com/gogo/protobuf/types" ptypes "github.com/gogo/protobuf/types"
) )
@ -29,10 +30,19 @@ type ttrpcService struct {
events *exchange.Exchange events *exchange.Exchange
} }
func (s *ttrpcService) Publish(ctx context.Context, r *api.PublishRequest) (*ptypes.Empty, error) { func (s *ttrpcService) Forward(ctx context.Context, r *api.ForwardRequest) (*ptypes.Empty, error) {
if err := s.events.Publish(ctx, r.Topic, r.Event); err != nil { if err := s.events.Forward(ctx, fromTProto(r.Envelope)); err != nil {
return nil, errdefs.ToGRPC(err) return nil, errdefs.ToGRPC(err)
} }
return &ptypes.Empty{}, nil return &ptypes.Empty{}, nil
} }
func fromTProto(env *api.Envelope) *events.Envelope {
return &events.Envelope{
Timestamp: env.Timestamp,
Namespace: env.Namespace,
Topic: env.Topic,
Event: env.Event,
}
}