From 4ba756edda18c0afac99a32ba89bc6070c27a0f1 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Wed, 10 Apr 2019 13:34:34 -0400 Subject: [PATCH 1/3] Fix API forward events for shims Signed-off-by: Michael Crosby --- api/next.pb.txt | 43 ++- api/services/ttrpc/events/v1/events.pb.go | 431 +++++++++++++++++++--- api/services/ttrpc/events/v1/events.proto | 23 +- runtime/v2/shim/shim.go | 14 +- services/events/ttrpc.go | 14 +- 5 files changed, 448 insertions(+), 77 deletions(-) diff --git a/api/next.pb.txt b/api/next.pb.txt index 0120a081a..bd201ad71 100755 --- a/api/next.pb.txt +++ b/api/next.pb.txt @@ -4211,28 +4211,61 @@ file { dependency: "google/protobuf/empty.proto" dependency: "google/protobuf/timestamp.proto" message_type { - name: "PublishRequest" + name: "ForwardRequest" + field { + name: "envelope" + number: 1 + label: LABEL_OPTIONAL + type: TYPE_MESSAGE + type_name: ".containerd.services.events.ttrpc.v1.Envelope" + json_name: "envelope" + } + } + message_type { + name: "Envelope" + field { + name: "timestamp" + number: 1 + label: LABEL_OPTIONAL + type: TYPE_MESSAGE + type_name: ".google.protobuf.Timestamp" + options { + 65001: 0 + 65010: 1 + } + json_name: "timestamp" + } + field { + name: "namespace" + number: 2 + label: LABEL_OPTIONAL + type: TYPE_STRING + json_name: "namespace" + } field { name: "topic" - number: 1 + number: 3 label: LABEL_OPTIONAL type: TYPE_STRING json_name: "topic" } field { name: "event" - number: 2 + number: 4 label: LABEL_OPTIONAL type: TYPE_MESSAGE type_name: ".google.protobuf.Any" json_name: "event" } + options { + 64400: 1 + } } service { name: "Events" method { - name: "Publish" - input_type: ".containerd.services.events.ttrpc.v1.PublishRequest" + name: "Forward" + input_type: ".containerd.services.events.ttrpc.v1.ForwardRequest" output_type: ".google.protobuf.Empty" } } diff --git a/api/services/ttrpc/events/v1/events.pb.go b/api/services/ttrpc/events/v1/events.pb.go index 4c73aba93..4bd5828a4 100644 --- a/api/services/ttrpc/events/v1/events.pb.go +++ b/api/services/ttrpc/events/v1/events.pb.go @@ -7,18 +7,22 @@ import ( context "context" fmt "fmt" github_com_containerd_ttrpc "github.com/containerd/ttrpc" + github_com_containerd_typeurl "github.com/containerd/typeurl" proto "github.com/gogo/protobuf/proto" + github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" types "github.com/gogo/protobuf/types" io "io" math "math" reflect "reflect" strings "strings" + time "time" ) // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf +var _ = time.Kitchen // This is a compile-time assertion to ensure that this generated file // is compatible with the proto package it is being compiled against. @@ -26,25 +30,24 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package -type PublishRequest struct { - Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` - Event *types.Any `protobuf:"bytes,2,opt,name=event,proto3" json:"event,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` +type ForwardRequest struct { + Envelope *Envelope `protobuf:"bytes,1,opt,name=envelope,proto3" json:"envelope,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } -func (m *PublishRequest) Reset() { *m = PublishRequest{} } -func (*PublishRequest) ProtoMessage() {} -func (*PublishRequest) Descriptor() ([]byte, []int) { +func (m *ForwardRequest) Reset() { *m = ForwardRequest{} } +func (*ForwardRequest) ProtoMessage() {} +func (*ForwardRequest) Descriptor() ([]byte, []int) { return fileDescriptor_19f98672016720b5, []int{0} } -func (m *PublishRequest) XXX_Unmarshal(b []byte) error { +func (m *ForwardRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) } -func (m *PublishRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { +func (m *ForwardRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { if deterministic { - return xxx_messageInfo_PublishRequest.Marshal(b, m, deterministic) + return xxx_messageInfo_ForwardRequest.Marshal(b, m, deterministic) } else { b = b[:cap(b)] n, err := m.MarshalTo(b) @@ -54,20 +57,63 @@ func (m *PublishRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, erro return b[:n], nil } } -func (m *PublishRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_PublishRequest.Merge(m, src) +func (m *ForwardRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ForwardRequest.Merge(m, src) } -func (m *PublishRequest) XXX_Size() int { +func (m *ForwardRequest) XXX_Size() int { return m.Size() } -func (m *PublishRequest) XXX_DiscardUnknown() { - xxx_messageInfo_PublishRequest.DiscardUnknown(m) +func (m *ForwardRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ForwardRequest.DiscardUnknown(m) } -var xxx_messageInfo_PublishRequest proto.InternalMessageInfo +var xxx_messageInfo_ForwardRequest proto.InternalMessageInfo + +type Envelope struct { + Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"timestamp"` + Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` + Topic string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"` + Event *types.Any `protobuf:"bytes,4,opt,name=event,proto3" json:"event,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Envelope) Reset() { *m = Envelope{} } +func (*Envelope) ProtoMessage() {} +func (*Envelope) Descriptor() ([]byte, []int) { + return fileDescriptor_19f98672016720b5, []int{1} +} +func (m *Envelope) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Envelope) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Envelope.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Envelope) XXX_Merge(src proto.Message) { + xxx_messageInfo_Envelope.Merge(m, src) +} +func (m *Envelope) XXX_Size() int { + return m.Size() +} +func (m *Envelope) XXX_DiscardUnknown() { + xxx_messageInfo_Envelope.DiscardUnknown(m) +} + +var xxx_messageInfo_Envelope proto.InternalMessageInfo func init() { - proto.RegisterType((*PublishRequest)(nil), "containerd.services.events.ttrpc.v1.PublishRequest") + proto.RegisterType((*ForwardRequest)(nil), "containerd.services.events.ttrpc.v1.ForwardRequest") + proto.RegisterType((*Envelope)(nil), "containerd.services.events.ttrpc.v1.Envelope") } func init() { @@ -75,30 +121,62 @@ func init() { } var fileDescriptor_19f98672016720b5 = []byte{ - // 311 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x51, 0x3f, 0x4b, 0x33, 0x31, - 0x18, 0x6f, 0x5e, 0x68, 0x5f, 0x8c, 0xe0, 0x70, 0x14, 0xa9, 0x15, 0x62, 0xd1, 0xa5, 0x38, 0x24, - 0xb4, 0x1d, 0x5d, 0x54, 0xec, 0xe2, 0x24, 0x37, 0x48, 0x71, 0x10, 0xef, 0xae, 0x69, 0x1a, 0xb8, - 0x4b, 0xe2, 0xe5, 0xb9, 0x83, 0x6e, 0x7e, 0xbc, 0x8e, 0x8e, 0x8e, 0xf6, 0x3e, 0x89, 0x34, 0xb9, - 0x52, 0xad, 0x83, 0x82, 0xdb, 0x2f, 0xf9, 0xfd, 0xcb, 0xf3, 0x04, 0xdf, 0x0a, 0x09, 0xf3, 0x22, - 0xa6, 0x89, 0xce, 0x58, 0xa2, 0x15, 0x44, 0x52, 0xf1, 0x7c, 0xfa, 0x19, 0x46, 0x46, 0x32, 0xcb, - 0xf3, 0x52, 0x26, 0xdc, 0x32, 0x80, 0xdc, 0x24, 0x8c, 0x97, 0x5c, 0x81, 0x65, 0xe5, 0xa0, 0x46, - 0xd4, 0xe4, 0x1a, 0x74, 0x70, 0xb6, 0x75, 0xd1, 0x8d, 0x83, 0xd6, 0x0a, 0x67, 0xa4, 0xe5, 0xa0, - 0x7b, 0xf9, 0x63, 0xa1, 0x0b, 0x8b, 0x8b, 0x19, 0x33, 0x69, 0x21, 0xa4, 0x62, 0x33, 0xc9, 0xd3, - 0xa9, 0x89, 0x60, 0xee, 0x6b, 0xba, 0x6d, 0xa1, 0x85, 0x76, 0x90, 0xad, 0x51, 0x7d, 0x7b, 0x24, - 0xb4, 0x16, 0x29, 0xdf, 0xba, 0x23, 0xb5, 0xa8, 0xa9, 0xe3, 0x5d, 0x8a, 0x67, 0x06, 0x36, 0xe4, - 0xc9, 0x2e, 0x09, 0x32, 0xe3, 0x16, 0xa2, 0xcc, 0x78, 0xc1, 0x69, 0x88, 0x0f, 0xee, 0x8a, 0x38, - 0x95, 0x76, 0x1e, 0xf2, 0xe7, 0x82, 0x5b, 0x08, 0xda, 0xb8, 0x09, 0xda, 0xc8, 0xa4, 0x83, 0x7a, - 0xa8, 0xbf, 0x17, 0xfa, 0x43, 0x70, 0x8e, 0x9b, 0x6e, 0xd6, 0xce, 0xbf, 0x1e, 0xea, 0xef, 0x0f, - 0xdb, 0xd4, 0x07, 0xd3, 0x4d, 0x30, 0xbd, 0x52, 0x8b, 0xd0, 0x4b, 0x86, 0x4f, 0xb8, 0x35, 0x76, - 0x7b, 0x09, 0xee, 0xf1, 0xff, 0x3a, 0x3d, 0x18, 0xd1, 0x5f, 0xec, 0x8f, 0x7e, 0x7d, 0x4b, 0xf7, - 0xf0, 0x5b, 0xcd, 0x78, 0x3d, 0xdc, 0xf5, 0xe3, 0x72, 0x45, 0x1a, 0x6f, 0x2b, 0xd2, 0x78, 0xa9, - 0x08, 0x5a, 0x56, 0x04, 0xbd, 0x56, 0x04, 0xbd, 0x57, 0x04, 0x3d, 0xdc, 0xfc, 0xe9, 0xc7, 0x2f, - 0x3c, 0x9a, 0x34, 0x26, 0x28, 0x6e, 0xb9, 0xce, 0xd1, 0x47, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8d, - 0x47, 0xe0, 0xf5, 0x44, 0x02, 0x00, 0x00, + // 396 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x52, 0xc1, 0x8e, 0xd3, 0x30, + 0x10, 0x8d, 0x61, 0x77, 0x69, 0x8d, 0xc4, 0xc1, 0xaa, 0x50, 0x08, 0x28, 0x59, 0x2d, 0x97, 0x15, + 0x12, 0xb6, 0x76, 0xf7, 0x06, 0x17, 0xa8, 0x28, 0x12, 0x1c, 0x23, 0x84, 0x2a, 0x90, 0x10, 0x6e, + 0x3a, 0x4d, 0x2d, 0x25, 0xb6, 0x49, 0x9c, 0xa0, 0xde, 0xfa, 0x09, 0x7c, 0x0c, 0x17, 0xfe, 0xa0, + 0x47, 0x8e, 0x9c, 0x80, 0xe6, 0x4b, 0x50, 0x9d, 0xa4, 0x81, 0xf6, 0x40, 0xa5, 0xbd, 0xbd, 0xcc, + 0x7b, 0x6f, 0xde, 0xcc, 0xc4, 0xf8, 0x75, 0x2c, 0xcc, 0xbc, 0x98, 0xd0, 0x48, 0xa5, 0x2c, 0x52, + 0xd2, 0x70, 0x21, 0x21, 0x9b, 0xfe, 0x0d, 0xb9, 0x16, 0x2c, 0x87, 0xac, 0x14, 0x11, 0xe4, 0xcc, + 0x98, 0x4c, 0x47, 0x0c, 0x4a, 0x90, 0x26, 0x67, 0xe5, 0x45, 0x83, 0xa8, 0xce, 0x94, 0x51, 0xe4, + 0x61, 0xe7, 0xa2, 0xad, 0x83, 0x36, 0x0a, 0x6b, 0xa4, 0xe5, 0x85, 0xf7, 0xec, 0xbf, 0x81, 0xb6, + 0xd9, 0xa4, 0x98, 0x31, 0x9d, 0x14, 0xb1, 0x90, 0x6c, 0x26, 0x20, 0x99, 0x6a, 0x6e, 0xe6, 0x75, + 0x8c, 0x37, 0x88, 0x55, 0xac, 0x2c, 0x64, 0x1b, 0xd4, 0x54, 0xef, 0xc5, 0x4a, 0xc5, 0x09, 0x74, + 0x6e, 0x2e, 0x17, 0x0d, 0x75, 0x7f, 0x97, 0x82, 0x54, 0x9b, 0x96, 0x0c, 0x76, 0x49, 0x23, 0x52, + 0xc8, 0x0d, 0x4f, 0x75, 0x2d, 0x38, 0x7b, 0x8f, 0xef, 0xbc, 0x54, 0xd9, 0x67, 0x9e, 0x4d, 0x43, + 0xf8, 0x54, 0x40, 0x6e, 0xc8, 0x2b, 0xdc, 0x03, 0x59, 0x42, 0xa2, 0x34, 0xb8, 0xe8, 0x14, 0x9d, + 0xdf, 0xbe, 0x7c, 0x4c, 0x0f, 0x58, 0x9d, 0x8e, 0x1a, 0x53, 0xb8, 0xb5, 0x9f, 0x7d, 0x45, 0xb8, + 0xd7, 0x96, 0xc9, 0x10, 0xf7, 0xb7, 0xe1, 0x4d, 0x63, 0x8f, 0xd6, 0xe3, 0xd1, 0x76, 0x3c, 0xfa, + 0xa6, 0x55, 0x0c, 0x7b, 0xab, 0x9f, 0x81, 0xf3, 0xe5, 0x57, 0x80, 0xc2, 0xce, 0x46, 0x1e, 0xe0, + 0xbe, 0xe4, 0x29, 0xe4, 0x9a, 0x47, 0xe0, 0xde, 0x38, 0x45, 0xe7, 0xfd, 0xb0, 0x2b, 0x90, 0x01, + 0x3e, 0x36, 0x4a, 0x8b, 0xc8, 0xbd, 0x69, 0x99, 0xfa, 0x83, 0x3c, 0xc2, 0xc7, 0x76, 0x54, 0xf7, + 0xc8, 0x66, 0x0e, 0xf6, 0x32, 0x9f, 0xcb, 0x45, 0x58, 0x4b, 0x9e, 0x1c, 0x2d, 0xbf, 0x05, 0xe8, + 0xf2, 0x23, 0x3e, 0x19, 0xd9, 0xe5, 0xc8, 0x5b, 0x7c, 0xab, 0xb9, 0x0e, 0xb9, 0x3a, 0xe8, 0x08, + 0xff, 0xde, 0xd2, 0xbb, 0xbb, 0x17, 0x36, 0xda, 0xfc, 0x9c, 0xe1, 0x87, 0xd5, 0xda, 0x77, 0x7e, + 0xac, 0x7d, 0x67, 0x59, 0xf9, 0x68, 0x55, 0xf9, 0xe8, 0x7b, 0xe5, 0xa3, 0xdf, 0x95, 0x8f, 0xde, + 0xbd, 0xb8, 0xd6, 0x8b, 0x7d, 0x5a, 0xa3, 0xb1, 0x33, 0x46, 0x93, 0x13, 0x9b, 0x79, 0xf5, 0x27, + 0x00, 0x00, 0xff, 0xff, 0xd4, 0x90, 0xbd, 0x09, 0x04, 0x03, 0x00, 0x00, } -func (m *PublishRequest) Marshal() (dAtA []byte, err error) { +// Field returns the value for the given fieldpath as a string, if defined. +// If the value is not defined, the second value will be false. +func (m *Envelope) Field(fieldpath []string) (string, bool) { + if len(fieldpath) == 0 { + return "", false + } + + switch fieldpath[0] { + // unhandled: timestamp + case "namespace": + return string(m.Namespace), len(m.Namespace) > 0 + case "topic": + return string(m.Topic), len(m.Topic) > 0 + case "event": + decoded, err := github_com_containerd_typeurl.UnmarshalAny(m.Event) + if err != nil { + return "", false + } + + adaptor, ok := decoded.(interface{ Field([]string) (string, bool) }) + if !ok { + return "", false + } + return adaptor.Field(fieldpath[1:]) + } + return "", false +} +func (m *ForwardRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalTo(dAtA) @@ -108,26 +186,71 @@ func (m *PublishRequest) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *PublishRequest) MarshalTo(dAtA []byte) (int, error) { +func (m *ForwardRequest) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l - if len(m.Topic) > 0 { + if m.Envelope != nil { dAtA[i] = 0xa i++ + i = encodeVarintEvents(dAtA, i, uint64(m.Envelope.Size())) + n1, err := m.Envelope.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + +func (m *Envelope) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Envelope) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0xa + i++ + i = encodeVarintEvents(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp))) + n2, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 + if len(m.Namespace) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintEvents(dAtA, i, uint64(len(m.Namespace))) + i += copy(dAtA[i:], m.Namespace) + } + if len(m.Topic) > 0 { + dAtA[i] = 0x1a + i++ i = encodeVarintEvents(dAtA, i, uint64(len(m.Topic))) i += copy(dAtA[i:], m.Topic) } if m.Event != nil { - dAtA[i] = 0x12 + dAtA[i] = 0x22 i++ i = encodeVarintEvents(dAtA, i, uint64(m.Event.Size())) - n1, err := m.Event.MarshalTo(dAtA[i:]) + n3, err := m.Event.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n1 + i += n3 } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) @@ -144,12 +267,34 @@ func encodeVarintEvents(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return offset + 1 } -func (m *PublishRequest) Size() (n int) { +func (m *ForwardRequest) Size() (n int) { if m == nil { return 0 } var l int _ = l + if m.Envelope != nil { + l = m.Envelope.Size() + n += 1 + l + sovEvents(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *Envelope) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp) + n += 1 + l + sovEvents(uint64(l)) + l = len(m.Namespace) + if l > 0 { + n += 1 + l + sovEvents(uint64(l)) + } l = len(m.Topic) if l > 0 { n += 1 + l + sovEvents(uint64(l)) @@ -177,11 +322,24 @@ func sovEvents(x uint64) (n int) { func sozEvents(x uint64) (n int) { return sovEvents(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } -func (this *PublishRequest) String() string { +func (this *ForwardRequest) String() string { if this == nil { return "nil" } - s := strings.Join([]string{`&PublishRequest{`, + s := strings.Join([]string{`&ForwardRequest{`, + `Envelope:` + strings.Replace(fmt.Sprintf("%v", this.Envelope), "Envelope", "Envelope", 1) + `,`, + `XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`, + `}`, + }, "") + return s +} +func (this *Envelope) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Envelope{`, + `Timestamp:` + strings.Replace(strings.Replace(this.Timestamp.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `Namespace:` + fmt.Sprintf("%v", this.Namespace) + `,`, `Topic:` + fmt.Sprintf("%v", this.Topic) + `,`, `Event:` + strings.Replace(fmt.Sprintf("%v", this.Event), "Any", "types.Any", 1) + `,`, `XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`, @@ -199,17 +357,17 @@ func valueToStringEvents(v interface{}) string { } type EventsService interface { - Publish(ctx context.Context, req *PublishRequest) (*types.Empty, error) + Forward(ctx context.Context, req *ForwardRequest) (*types.Empty, error) } func RegisterEventsService(srv *github_com_containerd_ttrpc.Server, svc EventsService) { srv.Register("containerd.services.events.ttrpc.v1.Events", map[string]github_com_containerd_ttrpc.Method{ - "Publish": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { - var req PublishRequest + "Forward": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req ForwardRequest if err := unmarshal(&req); err != nil { return nil, err } - return svc.Publish(ctx, &req) + return svc.Forward(ctx, &req) }, }) } @@ -224,14 +382,14 @@ func NewEventsClient(client *github_com_containerd_ttrpc.Client) EventsService { } } -func (c *eventsClient) Publish(ctx context.Context, req *PublishRequest) (*types.Empty, error) { +func (c *eventsClient) Forward(ctx context.Context, req *ForwardRequest) (*types.Empty, error) { var resp types.Empty - if err := c.client.Call(ctx, "containerd.services.events.ttrpc.v1.Events", "Publish", req, &resp); err != nil { + if err := c.client.Call(ctx, "containerd.services.events.ttrpc.v1.Events", "Forward", req, &resp); err != nil { return nil, err } return &resp, nil } -func (m *PublishRequest) Unmarshal(dAtA []byte) error { +func (m *ForwardRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -254,13 +412,168 @@ func (m *PublishRequest) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: PublishRequest: wiretype end group for non-group") + return fmt.Errorf("proto: ForwardRequest: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: PublishRequest: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: ForwardRequest: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Envelope", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Envelope == nil { + m.Envelope = &Envelope{} + } + if err := m.Envelope.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipEvents(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Envelope) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Envelope: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Envelope: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Namespace", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Namespace = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType) } @@ -292,7 +605,7 @@ func (m *PublishRequest) Unmarshal(dAtA []byte) error { } m.Topic = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex - case 2: + case 4: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType) } diff --git a/api/services/ttrpc/events/v1/events.proto b/api/services/ttrpc/events/v1/events.proto index 9e5aff77b..e140c3439 100644 --- a/api/services/ttrpc/events/v1/events.proto +++ b/api/services/ttrpc/events/v1/events.proto @@ -11,15 +11,22 @@ import "google/protobuf/timestamp.proto"; option go_package = "github.com/containerd/containerd/api/services/ttrpc/events/v1;events"; service Events { - // Publish an event to a topic. + // Forward sends an event that has already been packaged into an envelope + // with a timestamp and namespace. // - // The event will be packed into a timestamp envelope with the namespace - // introspected from the context. The envelope will then be dispatched. - rpc Publish(PublishRequest) returns (google.protobuf.Empty); - + // This is useful if earlier timestamping is required or when forwarding on + // behalf of another component, namespace or publisher. + rpc Forward(ForwardRequest) returns (google.protobuf.Empty); } -message PublishRequest { - string topic = 1; - google.protobuf.Any event = 2; +message ForwardRequest { + Envelope envelope = 1; +} + +message Envelope { + option (containerd.plugin.fieldpath) = true; + google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + string namespace = 2; + string topic = 3; + google.protobuf.Any event = 4; } diff --git a/runtime/v2/shim/shim.go b/runtime/v2/shim/shim.go index 7e994d6a4..a33c73c90 100644 --- a/runtime/v2/shim/shim.go +++ b/runtime/v2/shim/shim.go @@ -295,13 +295,21 @@ type remoteEventsPublisher struct { } func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } any, err := typeurl.MarshalAny(event) if err != nil { return err } - _, err = l.client.Publish(ctx, &v1.PublishRequest{ - Topic: topic, - Event: any, + _, err = l.client.Forward(ctx, &v1.ForwardRequest{ + Envelope: &v1.Envelope{ + Timestamp: time.Now(), + Namespace: ns, + Topic: topic, + Event: any, + }, }) return err } diff --git a/services/events/ttrpc.go b/services/events/ttrpc.go index 42b476d74..b1f28df19 100644 --- a/services/events/ttrpc.go +++ b/services/events/ttrpc.go @@ -21,6 +21,7 @@ import ( api "github.com/containerd/containerd/api/services/ttrpc/events/v1" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" "github.com/containerd/containerd/events/exchange" ptypes "github.com/gogo/protobuf/types" ) @@ -29,10 +30,19 @@ type ttrpcService struct { events *exchange.Exchange } -func (s *ttrpcService) Publish(ctx context.Context, r *api.PublishRequest) (*ptypes.Empty, error) { - if err := s.events.Publish(ctx, r.Topic, r.Event); err != nil { +func (s *ttrpcService) Forward(ctx context.Context, r *api.ForwardRequest) (*ptypes.Empty, error) { + if err := s.events.Forward(ctx, fromTProto(r.Envelope)); err != nil { return nil, errdefs.ToGRPC(err) } return &ptypes.Empty{}, nil } + +func fromTProto(env *api.Envelope) *events.Envelope { + return &events.Envelope{ + Timestamp: env.Timestamp, + Namespace: env.Namespace, + Topic: env.Topic, + Event: env.Event, + } +} From ae87730ad2511ae39239ac7aac75074768aa3c1b Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Wed, 10 Apr 2019 14:29:10 -0400 Subject: [PATCH 2/3] Improve shim shutdown logic Shims no longer call `os.Exit` but close the context on shutdown so that events and other resources have hit the `defer`s. Signed-off-by: Michael Crosby --- runtime/v2/example/example.go | 3 +- runtime/v2/runc/epoll.go | 6 ++-- runtime/v2/runc/v1/service.go | 19 ++++++----- runtime/v2/runc/v2/service.go | 19 ++++++----- runtime/v2/runhcs/service.go | 10 +++--- runtime/v2/shim/shim.go | 59 +++++++++++++++++++++++++++------ runtime/v2/shim/shim_unix.go | 7 ++-- runtime/v2/shim/shim_windows.go | 7 ++-- 8 files changed, 88 insertions(+), 42 deletions(-) diff --git a/runtime/v2/example/example.go b/runtime/v2/example/example.go index 3b5b73873..ba217403c 100644 --- a/runtime/v2/example/example.go +++ b/runtime/v2/example/example.go @@ -23,7 +23,6 @@ import ( "os" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" "github.com/containerd/containerd/runtime/v2/shim" taskAPI "github.com/containerd/containerd/runtime/v2/task" ptypes "github.com/gogo/protobuf/types" @@ -37,7 +36,7 @@ var ( ) // New returns a new shim service -func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { +func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) { return &service{}, nil } diff --git a/runtime/v2/runc/epoll.go b/runtime/v2/runc/epoll.go index 5425655ff..9ff19876a 100644 --- a/runtime/v2/runc/epoll.go +++ b/runtime/v2/runc/epoll.go @@ -24,15 +24,15 @@ import ( "github.com/containerd/cgroups" eventstypes "github.com/containerd/containerd/api/events" - "github.com/containerd/containerd/events" "github.com/containerd/containerd/runtime" + "github.com/containerd/containerd/runtime/v2/shim" "github.com/sirupsen/logrus" "golang.org/x/sys/unix" ) // NewOOMEpoller returns an epoll implementation that listens to OOM events // from a container's cgroups. -func NewOOMEpoller(publisher events.Publisher) (*Epoller, error) { +func NewOOMEpoller(publisher shim.Publisher) (*Epoller, error) { fd, err := unix.EpollCreate1(unix.EPOLL_CLOEXEC) if err != nil { return nil, err @@ -49,7 +49,7 @@ type Epoller struct { mu sync.Mutex fd int - publisher events.Publisher + publisher shim.Publisher set map[uintptr]*item } diff --git a/runtime/v2/runc/v1/service.go b/runtime/v2/runc/v1/service.go index cba16ca53..269d26471 100644 --- a/runtime/v2/runc/v1/service.go +++ b/runtime/v2/runc/v1/service.go @@ -33,7 +33,6 @@ import ( eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" @@ -58,12 +57,11 @@ var ( ) // New returns a new shim service that can be used via GRPC -func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { +func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) { ep, err := runc.NewOOMEpoller(publisher) if err != nil { return nil, err } - ctx, cancel := context.WithCancel(ctx) go ep.Run(ctx) s := &service{ id: id, @@ -71,15 +69,15 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, events: make(chan interface{}, 128), ec: shim.Default.Subscribe(), ep: ep, - cancel: cancel, + cancel: shutdown, } go s.processExits() runcC.Monitor = shim.Default if err := s.initPlatform(); err != nil { - cancel() + shutdown() return nil, errors.Wrap(err, "failed to initialized platform behavior") } - go s.forward(publisher) + go s.forward(ctx, publisher) return s, nil } @@ -511,7 +509,7 @@ func (s *service) Connect(ctx context.Context, r *taskAPI.ConnectRequest) (*task func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*ptypes.Empty, error) { s.cancel() - os.Exit(0) + close(s.events) return empty, nil } @@ -619,15 +617,18 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er return pids, nil } -func (s *service) forward(publisher events.Publisher) { +func (s *service) forward(ctx context.Context, publisher shim.Publisher) { + ns, _ := namespaces.Namespace(ctx) + ctx = namespaces.WithNamespace(context.Background(), ns) for e := range s.events { - ctx, cancel := context.WithTimeout(s.context, 5*time.Second) + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) err := publisher.Publish(ctx, runc.GetTopic(e), e) cancel() if err != nil { logrus.WithError(err).Error("post event") } } + publisher.Close() } func (s *service) getContainer() (*runc.Container, error) { diff --git a/runtime/v2/runc/v2/service.go b/runtime/v2/runc/v2/service.go index 3a25d85ce..dd88954d5 100644 --- a/runtime/v2/runc/v2/service.go +++ b/runtime/v2/runc/v2/service.go @@ -34,7 +34,6 @@ import ( eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" @@ -71,12 +70,11 @@ type spec struct { } // New returns a new shim service that can be used via GRPC -func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { +func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) { ep, err := runc.NewOOMEpoller(publisher) if err != nil { return nil, err } - ctx, cancel := context.WithCancel(ctx) go ep.Run(ctx) s := &service{ id: id, @@ -84,16 +82,16 @@ func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, events: make(chan interface{}, 128), ec: shim.Default.Subscribe(), ep: ep, - cancel: cancel, + cancel: shutdown, containers: make(map[string]*runc.Container), } go s.processExits() runcC.Monitor = shim.Default if err := s.initPlatform(); err != nil { - cancel() + shutdown() return nil, errors.Wrap(err, "failed to initialized platform behavior") } - go s.forward(publisher) + go s.forward(ctx, publisher) return s, nil } @@ -570,7 +568,7 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt return empty, nil } s.cancel() - os.Exit(0) + close(s.events) return empty, nil } @@ -689,15 +687,18 @@ func (s *service) getContainerPids(ctx context.Context, id string) ([]uint32, er return pids, nil } -func (s *service) forward(publisher events.Publisher) { +func (s *service) forward(ctx context.Context, publisher shim.Publisher) { + ns, _ := namespaces.Namespace(ctx) + ctx = namespaces.WithNamespace(context.Background(), ns) for e := range s.events { - ctx, cancel := context.WithTimeout(s.context, 5*time.Second) + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) err := publisher.Publish(ctx, runc.GetTopic(e), e) cancel() if err != nil { logrus.WithError(err).Error("post event") } } + publisher.Close() } func (s *service) getContainer(id string) (*runc.Container, error) { diff --git a/runtime/v2/runhcs/service.go b/runtime/v2/runhcs/service.go index d20cf7e01..ee6f9b72e 100644 --- a/runtime/v2/runhcs/service.go +++ b/runtime/v2/runhcs/service.go @@ -41,7 +41,6 @@ import ( containerd_types "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" @@ -129,12 +128,13 @@ func forwardRunhcsLogs(ctx context.Context, c net.Conn, fields logrus.Fields) { } // New returns a new runhcs shim service that can be used via GRPC -func New(ctx context.Context, id string, publisher events.Publisher) (shim.Shim, error) { +func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func()) (shim.Shim, error) { return &service{ context: ctx, id: id, processes: make(map[string]*process), publisher: publisher, + shutdown: shutdown, }, nil } @@ -159,7 +159,8 @@ type service struct { id string processes map[string]*process - publisher events.Publisher + publisher shim.Publisher + shutdown func() } func (s *service) newRunhcs() *runhcs.Runhcs { @@ -1068,7 +1069,8 @@ func (s *service) Shutdown(ctx context.Context, r *taskAPI.ShutdownRequest) (*pt if s.debugListener != nil { s.debugListener.Close() } + s.publisher.Close() + s.shutdown() - os.Exit(0) return empty, nil } diff --git a/runtime/v2/shim/shim.go b/runtime/v2/shim/shim.go index a33c73c90..2bc8f1b07 100644 --- a/runtime/v2/shim/shim.go +++ b/runtime/v2/shim/shim.go @@ -20,11 +20,13 @@ import ( "context" "flag" "fmt" + "io" "net" "os" "runtime" "runtime/debug" "strings" + "sync" "time" v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1" @@ -46,8 +48,14 @@ type Client struct { signals chan os.Signal } +// Publisher for events +type Publisher interface { + events.Publisher + io.Closer +} + // Init func for the creation of a shim server -type Init func(context.Context, string, events.Publisher) (Shim, error) +type Init func(context.Context, string, Publisher, func()) (Shim, error) // Shim server interface type Shim interface { @@ -156,15 +164,18 @@ func run(id string, initFunc Init, config Config) error { return err } } - - publisher := &remoteEventsPublisher{ - address: fmt.Sprintf("%s.ttrpc", addressFlag), - } - conn, err := connect(publisher.address, dialer) + address := fmt.Sprintf("%s.ttrpc", addressFlag) + conn, err := connect(address, dialer) if err != nil { return err } - defer conn.Close() + publisher := &remoteEventsPublisher{ + address: address, + conn: conn, + closed: make(chan struct{}), + } + defer publisher.Close() + publisher.client = v1.NewEventsClient(ttrpc.NewClient(conn)) if namespaceFlag == "" { return fmt.Errorf("shim namespace cannot be empty") @@ -172,8 +183,9 @@ func run(id string, initFunc Init, config Config) error { ctx := namespaces.WithNamespace(context.Background(), namespaceFlag) ctx = context.WithValue(ctx, OptsKey{}, Opts{BundlePath: bundlePath, Debug: debugFlag}) ctx = log.WithLogger(ctx, log.G(ctx).WithField("runtime", id)) + ctx, cancel := context.WithCancel(ctx) - service, err := initFunc(ctx, idFlag, publisher) + service, err := initFunc(ctx, idFlag, publisher, cancel) if err != nil { return err } @@ -183,7 +195,7 @@ func run(id string, initFunc Init, config Config) error { "pid": os.Getpid(), "namespace": namespaceFlag, }) - go handleSignals(logger, signals) + go handleSignals(ctx, logger, signals) response, err := service.Cleanup(ctx) if err != nil { return err @@ -210,7 +222,17 @@ func run(id string, initFunc Init, config Config) error { return err } client := NewShimClient(ctx, service, signals) - return client.Serve() + if err := client.Serve(); err != nil { + if err != context.Canceled { + return err + } + } + select { + case <-publisher.Done(): + return nil + case <-time.After(5 * time.Second): + return errors.New("publisher not closed") + } } } @@ -254,7 +276,7 @@ func (s *Client) Serve() error { dumpStacks(logger) } }() - return handleSignals(logger, s.signals) + return handleSignals(s.context, logger, s.signals) } // serve serves the ttrpc API over a unix socket at the provided path @@ -291,7 +313,22 @@ func dumpStacks(logger *logrus.Entry) { type remoteEventsPublisher struct { address string + conn net.Conn client v1.EventsService + closed chan struct{} + closer sync.Once +} + +func (l *remoteEventsPublisher) Done() <-chan struct{} { + return l.closed +} + +func (l *remoteEventsPublisher) Close() (err error) { + l.closer.Do(func() { + err = l.conn.Close() + close(l.closed) + }) + return err } func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { diff --git a/runtime/v2/shim/shim_unix.go b/runtime/v2/shim/shim_unix.go index a1ab23c79..d7de92a08 100644 --- a/runtime/v2/shim/shim_unix.go +++ b/runtime/v2/shim/shim_unix.go @@ -71,11 +71,14 @@ func serveListener(path string) (net.Listener, error) { return l, nil } -func handleSignals(logger *logrus.Entry, signals chan os.Signal) error { +func handleSignals(ctx context.Context, logger *logrus.Entry, signals chan os.Signal) error { logger.Info("starting signal loop") for { - for s := range signals { + select { + case <-ctx.Done(): + return ctx.Err() + case s := <-signals: switch s { case unix.SIGCHLD: if err := Reap(); err != nil { diff --git a/runtime/v2/shim/shim_windows.go b/runtime/v2/shim/shim_windows.go index ac5e3d719..a399da0cb 100644 --- a/runtime/v2/shim/shim_windows.go +++ b/runtime/v2/shim/shim_windows.go @@ -104,11 +104,14 @@ func serveListener(path string) (net.Listener, error) { return l, nil } -func handleSignals(logger *logrus.Entry, signals chan os.Signal) error { +func handleSignals(ctx context.Context, logger *logrus.Entry, signals chan os.Signal) error { logger.Info("starting signal loop") for { - for s := range signals { + select { + case <-ctx.Done(): + return ctx.Err() + case s := <-signals: switch s { case os.Interrupt: return nil From 047348e198cc17404e34717d00ece685bfe3fbfa Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 11 Apr 2019 11:37:01 -0400 Subject: [PATCH 3/3] Add dialer for events service Signed-off-by: Michael Crosby --- runtime/v2/shim/dialer.go | 87 +++++++++++++++++++++++++++++++++ runtime/v2/shim/publisher.go | 87 +++++++++++++++++++++++++++++++++ runtime/v2/shim/shim.go | 60 +---------------------- runtime/v2/shim/shim_unix.go | 2 +- runtime/v2/shim/shim_windows.go | 2 +- 5 files changed, 178 insertions(+), 60 deletions(-) create mode 100644 runtime/v2/shim/dialer.go create mode 100644 runtime/v2/shim/publisher.go diff --git a/runtime/v2/shim/dialer.go b/runtime/v2/shim/dialer.go new file mode 100644 index 000000000..5fdc06b23 --- /dev/null +++ b/runtime/v2/shim/dialer.go @@ -0,0 +1,87 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package shim + +import ( + "net" + "sync" + + v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1" + "github.com/containerd/ttrpc" + "github.com/pkg/errors" +) + +type dialConnect func() (net.Conn, error) + +var errDialerClosed = errors.New("events dialer is closed") + +func newDialier(newFn dialConnect) *dialer { + return &dialer{ + newFn: newFn, + } +} + +type dialer struct { + mu sync.Mutex + + newFn dialConnect + service v1.EventsService + conn net.Conn + closed bool +} + +func (d *dialer) Get() (v1.EventsService, error) { + d.mu.Lock() + defer d.mu.Unlock() + + if d.closed { + return nil, errDialerClosed + } + if d.service == nil { + conn, err := d.newFn() + if err != nil { + return nil, err + } + d.conn = conn + d.service = v1.NewEventsClient(ttrpc.NewClient(conn)) + } + return d.service, nil +} + +func (d *dialer) Put(err error) { + if err != nil { + d.mu.Lock() + d.conn.Close() + d.service = nil + d.mu.Unlock() + } +} + +func (d *dialer) Close() (err error) { + d.mu.Lock() + if d.closed { + return errDialerClosed + } + if d.conn != nil { + err = d.conn.Close() + } + d.service = nil + d.closed = true + d.mu.Unlock() + + return err +} diff --git a/runtime/v2/shim/publisher.go b/runtime/v2/shim/publisher.go new file mode 100644 index 000000000..915d5cd4d --- /dev/null +++ b/runtime/v2/shim/publisher.go @@ -0,0 +1,87 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package shim + +import ( + "context" + "net" + "sync" + "time" + + v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/typeurl" +) + +func newPublisher(address string) *remoteEventsPublisher { + return &remoteEventsPublisher{ + dialer: newDialier(func() (net.Conn, error) { + return connect(address, dial) + }), + closed: make(chan struct{}), + } +} + +type remoteEventsPublisher struct { + dialer *dialer + closed chan struct{} + closer sync.Once +} + +func (l *remoteEventsPublisher) Done() <-chan struct{} { + return l.closed +} + +func (l *remoteEventsPublisher) Close() (err error) { + err = l.dialer.Close() + l.closer.Do(func() { + close(l.closed) + }) + return err +} + +func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { + client, err := l.dialer.Get() + if err != nil { + return err + } + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + any, err := typeurl.MarshalAny(event) + if err != nil { + return err + } + if _, err := client.Forward(ctx, &v1.ForwardRequest{ + Envelope: &v1.Envelope{ + Timestamp: time.Now(), + Namespace: ns, + Topic: topic, + Event: any, + }, + }); err != nil { + l.dialer.Put(err) + return err + } + return nil +} + +func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) { + return d(address, 5*time.Second) +} diff --git a/runtime/v2/shim/shim.go b/runtime/v2/shim/shim.go index 2bc8f1b07..18937f8ef 100644 --- a/runtime/v2/shim/shim.go +++ b/runtime/v2/shim/shim.go @@ -21,21 +21,17 @@ import ( "flag" "fmt" "io" - "net" "os" "runtime" "runtime/debug" "strings" - "sync" "time" - v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1" "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" shimapi "github.com/containerd/containerd/runtime/v2/task" "github.com/containerd/ttrpc" - "github.com/containerd/typeurl" "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -165,18 +161,10 @@ func run(id string, initFunc Init, config Config) error { } } address := fmt.Sprintf("%s.ttrpc", addressFlag) - conn, err := connect(address, dialer) - if err != nil { - return err - } - publisher := &remoteEventsPublisher{ - address: address, - conn: conn, - closed: make(chan struct{}), - } + + publisher := newPublisher(address) defer publisher.Close() - publisher.client = v1.NewEventsClient(ttrpc.NewClient(conn)) if namespaceFlag == "" { return fmt.Errorf("shim namespace cannot be empty") } @@ -310,47 +298,3 @@ func dumpStacks(logger *logrus.Entry) { buf = buf[:stackSize] logger.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) } - -type remoteEventsPublisher struct { - address string - conn net.Conn - client v1.EventsService - closed chan struct{} - closer sync.Once -} - -func (l *remoteEventsPublisher) Done() <-chan struct{} { - return l.closed -} - -func (l *remoteEventsPublisher) Close() (err error) { - l.closer.Do(func() { - err = l.conn.Close() - close(l.closed) - }) - return err -} - -func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { - ns, err := namespaces.NamespaceRequired(ctx) - if err != nil { - return err - } - any, err := typeurl.MarshalAny(event) - if err != nil { - return err - } - _, err = l.client.Forward(ctx, &v1.ForwardRequest{ - Envelope: &v1.Envelope{ - Timestamp: time.Now(), - Namespace: ns, - Topic: topic, - Event: any, - }, - }) - return err -} - -func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) { - return d(address, 100*time.Second) -} diff --git a/runtime/v2/shim/shim_unix.go b/runtime/v2/shim/shim_unix.go index d7de92a08..603e51bfb 100644 --- a/runtime/v2/shim/shim_unix.go +++ b/runtime/v2/shim/shim_unix.go @@ -94,7 +94,7 @@ func openLog(ctx context.Context, _ string) (io.Writer, error) { return fifo.OpenFifo(ctx, "log", unix.O_WRONLY, 0700) } -func dialer(address string, timeout time.Duration) (net.Conn, error) { +func dial(address string, timeout time.Duration) (net.Conn, error) { address = strings.TrimPrefix(address, "unix://") return net.DialTimeout("unix", address, timeout) } diff --git a/runtime/v2/shim/shim_windows.go b/runtime/v2/shim/shim_windows.go index a399da0cb..084be2b28 100644 --- a/runtime/v2/shim/shim_windows.go +++ b/runtime/v2/shim/shim_windows.go @@ -287,7 +287,7 @@ func openLog(ctx context.Context, id string) (io.Writer, error) { return dswl, nil } -func dialer(address string, timeout time.Duration) (net.Conn, error) { +func dial(address string, timeout time.Duration) (net.Conn, error) { var c net.Conn var lastError error timedOutError := errors.Errorf("timed out waiting for npipe %s", address)