diff --git a/Protobuild.toml b/Protobuild.toml index 005625f38..3bb2e6287 100644 --- a/Protobuild.toml +++ b/Protobuild.toml @@ -35,6 +35,10 @@ plugins = ["grpc", "fieldpath"] prefixes = ["github.com/containerd/containerd/api/events"] plugins = ["fieldpath"] # disable grpc for this package +[[overrides]] +prefixes = ["github.com/containerd/containerd/api/services/ttrpc/events/v1"] +plugins = ["ttrpc", "fieldpath"] + [[overrides]] # enable ttrpc and disable fieldpath and grpc for the shim prefixes = ["github.com/containerd/containerd/runtime/v1/shim/v1", "github.com/containerd/containerd/runtime/v2/task"] diff --git a/api/next.pb.txt b/api/next.pb.txt index 7a0ed2347..423e05945 100755 --- a/api/next.pb.txt +++ b/api/next.pb.txt @@ -4202,6 +4202,119 @@ file { weak_dependency: 2 syntax: "proto3" } +file { + name: "github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto" + package: "containerd.services.events.ttrpc.v1" + dependency: "github.com/containerd/containerd/protobuf/plugin/fieldpath.proto" + dependency: "gogoproto/gogo.proto" + dependency: "google/protobuf/any.proto" + dependency: "google/protobuf/empty.proto" + dependency: "google/protobuf/timestamp.proto" + message_type { + name: "PublishRequest" + field { + name: "topic" + number: 1 + label: LABEL_OPTIONAL + type: TYPE_STRING + json_name: "topic" + } + field { + name: "event" + number: 2 + label: LABEL_OPTIONAL + type: TYPE_MESSAGE + type_name: ".google.protobuf.Any" + json_name: "event" + } + } + message_type { + name: "ForwardRequest" + field { + name: "envelope" + number: 1 + label: LABEL_OPTIONAL + type: TYPE_MESSAGE + type_name: ".containerd.services.events.ttrpc.v1.Envelope" + json_name: "envelope" + } + } + message_type { + name: "SubscribeRequest" + field { + name: "filters" + number: 1 + label: LABEL_REPEATED + type: TYPE_STRING + json_name: "filters" + } + } + message_type { + name: "Envelope" + field { + name: "timestamp" + number: 1 + label: LABEL_OPTIONAL + type: TYPE_MESSAGE + type_name: ".google.protobuf.Timestamp" + options { + 65001: 0 + 65010: 1 + } + json_name: "timestamp" + } + field { + name: "namespace" + number: 2 + label: LABEL_OPTIONAL + type: TYPE_STRING + json_name: "namespace" + } + field { + name: "topic" + number: 3 + label: LABEL_OPTIONAL + type: TYPE_STRING + json_name: "topic" + } + field { + name: "event" + number: 4 + label: LABEL_OPTIONAL + type: TYPE_MESSAGE + type_name: ".google.protobuf.Any" + json_name: "event" + } + options { + 64400: 1 + } + } + service { + name: "Events" + method { + name: "Publish" + input_type: ".containerd.services.events.ttrpc.v1.PublishRequest" + output_type: ".google.protobuf.Empty" + } + method { + name: "Forward" + input_type: ".containerd.services.events.ttrpc.v1.ForwardRequest" + output_type: ".google.protobuf.Empty" + } + method { + name: "Subscribe" + input_type: ".containerd.services.events.ttrpc.v1.SubscribeRequest" + output_type: ".containerd.services.events.ttrpc.v1.Envelope" + server_streaming: true + } + } + options { + go_package: "github.com/containerd/containerd/api/services/ttrpc/events/v1;events" + } + weak_dependency: 0 + weak_dependency: 1 + syntax: "proto3" +} file { name: "github.com/containerd/containerd/api/services/version/v1/version.proto" package: "containerd.services.version.v1" diff --git a/api/services/ttrpc/events/v1/doc.go b/api/services/ttrpc/events/v1/doc.go new file mode 100644 index 000000000..b7f86da86 --- /dev/null +++ b/api/services/ttrpc/events/v1/doc.go @@ -0,0 +1,18 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +// Package events defines the event pushing and subscription service. +package events diff --git a/api/services/ttrpc/events/v1/events.pb.go b/api/services/ttrpc/events/v1/events.pb.go new file mode 100644 index 000000000..3f9356cb7 --- /dev/null +++ b/api/services/ttrpc/events/v1/events.pb.go @@ -0,0 +1,1238 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto + +package events + +import ( + context "context" + fmt "fmt" + github_com_containerd_ttrpc "github.com/containerd/ttrpc" + github_com_containerd_typeurl "github.com/containerd/typeurl" + proto "github.com/gogo/protobuf/proto" + github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" + types "github.com/gogo/protobuf/types" + io "io" + math "math" + reflect "reflect" + strings "strings" + time "time" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf +var _ = time.Kitchen + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type PublishRequest struct { + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Event *types.Any `protobuf:"bytes,2,opt,name=event,proto3" json:"event,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PublishRequest) Reset() { *m = PublishRequest{} } +func (*PublishRequest) ProtoMessage() {} +func (*PublishRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_19f98672016720b5, []int{0} +} +func (m *PublishRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PublishRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PublishRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PublishRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_PublishRequest.Merge(m, src) +} +func (m *PublishRequest) XXX_Size() int { + return m.Size() +} +func (m *PublishRequest) XXX_DiscardUnknown() { + xxx_messageInfo_PublishRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_PublishRequest proto.InternalMessageInfo + +type ForwardRequest struct { + Envelope *Envelope `protobuf:"bytes,1,opt,name=envelope,proto3" json:"envelope,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ForwardRequest) Reset() { *m = ForwardRequest{} } +func (*ForwardRequest) ProtoMessage() {} +func (*ForwardRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_19f98672016720b5, []int{1} +} +func (m *ForwardRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ForwardRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ForwardRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ForwardRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ForwardRequest.Merge(m, src) +} +func (m *ForwardRequest) XXX_Size() int { + return m.Size() +} +func (m *ForwardRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ForwardRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ForwardRequest proto.InternalMessageInfo + +type SubscribeRequest struct { + Filters []string `protobuf:"bytes,1,rep,name=filters,proto3" json:"filters,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SubscribeRequest) Reset() { *m = SubscribeRequest{} } +func (*SubscribeRequest) ProtoMessage() {} +func (*SubscribeRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_19f98672016720b5, []int{2} +} +func (m *SubscribeRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SubscribeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SubscribeRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SubscribeRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_SubscribeRequest.Merge(m, src) +} +func (m *SubscribeRequest) XXX_Size() int { + return m.Size() +} +func (m *SubscribeRequest) XXX_DiscardUnknown() { + xxx_messageInfo_SubscribeRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_SubscribeRequest proto.InternalMessageInfo + +type Envelope struct { + Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"timestamp"` + Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` + Topic string `protobuf:"bytes,3,opt,name=topic,proto3" json:"topic,omitempty"` + Event *types.Any `protobuf:"bytes,4,opt,name=event,proto3" json:"event,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Envelope) Reset() { *m = Envelope{} } +func (*Envelope) ProtoMessage() {} +func (*Envelope) Descriptor() ([]byte, []int) { + return fileDescriptor_19f98672016720b5, []int{3} +} +func (m *Envelope) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Envelope) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Envelope.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Envelope) XXX_Merge(src proto.Message) { + xxx_messageInfo_Envelope.Merge(m, src) +} +func (m *Envelope) XXX_Size() int { + return m.Size() +} +func (m *Envelope) XXX_DiscardUnknown() { + xxx_messageInfo_Envelope.DiscardUnknown(m) +} + +var xxx_messageInfo_Envelope proto.InternalMessageInfo + +func init() { + proto.RegisterType((*PublishRequest)(nil), "containerd.services.events.ttrpc.v1.PublishRequest") + proto.RegisterType((*ForwardRequest)(nil), "containerd.services.events.ttrpc.v1.ForwardRequest") + proto.RegisterType((*SubscribeRequest)(nil), "containerd.services.events.ttrpc.v1.SubscribeRequest") + proto.RegisterType((*Envelope)(nil), "containerd.services.events.ttrpc.v1.Envelope") +} + +func init() { + proto.RegisterFile("github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto", fileDescriptor_19f98672016720b5) +} + +var fileDescriptor_19f98672016720b5 = []byte{ + // 478 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x94, 0xcf, 0x8e, 0xd3, 0x30, + 0x10, 0xc6, 0xeb, 0xfd, 0xdb, 0x78, 0xa5, 0x15, 0x8a, 0x2a, 0x54, 0x02, 0x4a, 0xab, 0x72, 0xa9, + 0x10, 0x38, 0x6c, 0x57, 0x5c, 0xe0, 0x02, 0x15, 0x45, 0x82, 0x13, 0x0a, 0x08, 0xad, 0x40, 0x42, + 0x4a, 0xd2, 0x69, 0x6a, 0x29, 0x89, 0x4d, 0xec, 0x04, 0xed, 0x6d, 0x1f, 0x81, 0x17, 0xe1, 0xc6, + 0x85, 0x37, 0xe8, 0x91, 0x23, 0x27, 0x60, 0xfb, 0x24, 0xa8, 0x8e, 0xdd, 0xb0, 0x5d, 0x50, 0x8b, + 0xf6, 0x36, 0xf6, 0xcc, 0xf7, 0x9b, 0xcc, 0x37, 0x56, 0xf0, 0x8b, 0x98, 0xca, 0x69, 0x11, 0x92, + 0x88, 0xa5, 0x5e, 0xc4, 0x32, 0x19, 0xd0, 0x0c, 0xf2, 0xf1, 0x9f, 0x61, 0xc0, 0xa9, 0x27, 0x20, + 0x2f, 0x69, 0x04, 0xc2, 0x93, 0x32, 0xe7, 0x91, 0x07, 0x25, 0x64, 0x52, 0x78, 0xe5, 0x91, 0x8e, + 0x08, 0xcf, 0x99, 0x64, 0xf6, 0xed, 0x5a, 0x45, 0x8c, 0x82, 0xe8, 0x0a, 0x25, 0x24, 0xe5, 0x91, + 0xf3, 0x78, 0x6d, 0x43, 0x05, 0x0b, 0x8b, 0x89, 0xc7, 0x93, 0x22, 0xa6, 0x99, 0x37, 0xa1, 0x90, + 0x8c, 0x79, 0x20, 0xa7, 0x55, 0x1b, 0xa7, 0x15, 0xb3, 0x98, 0xa9, 0xd0, 0x5b, 0x44, 0xfa, 0xf6, + 0x46, 0xcc, 0x58, 0x9c, 0x40, 0xad, 0x0e, 0xb2, 0x53, 0x9d, 0xba, 0xb9, 0x9a, 0x82, 0x94, 0x4b, + 0x93, 0xec, 0xac, 0x26, 0x25, 0x4d, 0x41, 0xc8, 0x20, 0xe5, 0x55, 0x41, 0xcf, 0xc7, 0x87, 0x2f, + 0x8b, 0x30, 0xa1, 0x62, 0xea, 0xc3, 0x87, 0x02, 0x84, 0xb4, 0x5b, 0x78, 0x57, 0x32, 0x4e, 0xa3, + 0x36, 0xea, 0xa2, 0xbe, 0xe5, 0x57, 0x07, 0xfb, 0x0e, 0xde, 0x55, 0xb3, 0xb6, 0xb7, 0xba, 0xa8, + 0x7f, 0x30, 0x68, 0x91, 0x0a, 0x4c, 0x0c, 0x98, 0x3c, 0xc9, 0x4e, 0xfd, 0xaa, 0xa4, 0xf7, 0x0e, + 0x1f, 0x3e, 0x63, 0xf9, 0xc7, 0x20, 0x1f, 0x1b, 0xe6, 0x73, 0xdc, 0x84, 0xac, 0x84, 0x84, 0x71, + 0x50, 0xd8, 0x83, 0xc1, 0x3d, 0xb2, 0x81, 0x9d, 0x64, 0xa4, 0x45, 0xfe, 0x52, 0xde, 0xbb, 0x8b, + 0xaf, 0xbd, 0x2a, 0x42, 0x11, 0xe5, 0x34, 0x04, 0x83, 0x6f, 0xe3, 0xfd, 0x09, 0x4d, 0x24, 0xe4, + 0xa2, 0x8d, 0xba, 0xdb, 0x7d, 0xcb, 0x37, 0xc7, 0xde, 0x17, 0x84, 0x9b, 0x06, 0x62, 0x0f, 0xb1, + 0xb5, 0x1c, 0x5f, 0x7f, 0x86, 0x73, 0x69, 0x8e, 0xd7, 0xa6, 0x62, 0xd8, 0x9c, 0xfd, 0xe8, 0x34, + 0x3e, 0xfd, 0xec, 0x20, 0xbf, 0x96, 0xd9, 0xb7, 0xb0, 0x95, 0x05, 0x29, 0x08, 0x1e, 0x44, 0xa0, + 0xbc, 0xb0, 0xfc, 0xfa, 0xa2, 0xf6, 0x6e, 0xfb, 0xaf, 0xde, 0xed, 0xac, 0xf5, 0xee, 0xe1, 0xce, + 0xd9, 0xd7, 0x0e, 0x1a, 0x7c, 0xde, 0xc2, 0x7b, 0x23, 0xe5, 0x85, 0xfd, 0x06, 0xef, 0xeb, 0x05, + 0xd9, 0xc7, 0x1b, 0x79, 0x76, 0x71, 0x9d, 0xce, 0xf5, 0x4b, 0xdd, 0x46, 0x8b, 0xf7, 0xb1, 0xe0, + 0xea, 0x25, 0x6d, 0xc8, 0xbd, 0xb8, 0xd2, 0x7f, 0x72, 0x05, 0xb6, 0x96, 0xfb, 0xb1, 0x1f, 0x6c, + 0x44, 0x5e, 0xdd, 0xa7, 0xf3, 0x7f, 0x8f, 0xe3, 0x3e, 0x1a, 0xbe, 0x9f, 0x9d, 0xbb, 0x8d, 0xef, + 0xe7, 0x6e, 0xe3, 0x6c, 0xee, 0xa2, 0xd9, 0xdc, 0x45, 0xdf, 0xe6, 0x2e, 0xfa, 0x35, 0x77, 0xd1, + 0xdb, 0xa7, 0x57, 0xfa, 0x03, 0x3c, 0xaa, 0xa2, 0x93, 0xc6, 0x09, 0x0a, 0xf7, 0xd4, 0xa0, 0xc7, + 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x8c, 0x5b, 0x8a, 0x72, 0x54, 0x04, 0x00, 0x00, +} + +// Field returns the value for the given fieldpath as a string, if defined. +// If the value is not defined, the second value will be false. +func (m *Envelope) Field(fieldpath []string) (string, bool) { + if len(fieldpath) == 0 { + return "", false + } + + switch fieldpath[0] { + // unhandled: timestamp + case "namespace": + return string(m.Namespace), len(m.Namespace) > 0 + case "topic": + return string(m.Topic), len(m.Topic) > 0 + case "event": + decoded, err := github_com_containerd_typeurl.UnmarshalAny(m.Event) + if err != nil { + return "", false + } + + adaptor, ok := decoded.(interface{ Field([]string) (string, bool) }) + if !ok { + return "", false + } + return adaptor.Field(fieldpath[1:]) + } + return "", false +} +func (m *PublishRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PublishRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Topic) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintEvents(dAtA, i, uint64(len(m.Topic))) + i += copy(dAtA[i:], m.Topic) + } + if m.Event != nil { + dAtA[i] = 0x12 + i++ + i = encodeVarintEvents(dAtA, i, uint64(m.Event.Size())) + n1, err := m.Event.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + +func (m *ForwardRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ForwardRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Envelope != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintEvents(dAtA, i, uint64(m.Envelope.Size())) + n2, err := m.Envelope.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + +func (m *SubscribeRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SubscribeRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Filters) > 0 { + for _, s := range m.Filters { + dAtA[i] = 0xa + i++ + l = len(s) + for l >= 1<<7 { + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ + } + dAtA[i] = uint8(l) + i++ + i += copy(dAtA[i:], s) + } + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + +func (m *Envelope) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Envelope) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0xa + i++ + i = encodeVarintEvents(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp))) + n3, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.Timestamp, dAtA[i:]) + if err != nil { + return 0, err + } + i += n3 + if len(m.Namespace) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintEvents(dAtA, i, uint64(len(m.Namespace))) + i += copy(dAtA[i:], m.Namespace) + } + if len(m.Topic) > 0 { + dAtA[i] = 0x1a + i++ + i = encodeVarintEvents(dAtA, i, uint64(len(m.Topic))) + i += copy(dAtA[i:], m.Topic) + } + if m.Event != nil { + dAtA[i] = 0x22 + i++ + i = encodeVarintEvents(dAtA, i, uint64(m.Event.Size())) + n4, err := m.Event.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n4 + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + +func encodeVarintEvents(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *PublishRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Topic) + if l > 0 { + n += 1 + l + sovEvents(uint64(l)) + } + if m.Event != nil { + l = m.Event.Size() + n += 1 + l + sovEvents(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *ForwardRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Envelope != nil { + l = m.Envelope.Size() + n += 1 + l + sovEvents(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *SubscribeRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Filters) > 0 { + for _, s := range m.Filters { + l = len(s) + n += 1 + l + sovEvents(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *Envelope) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.Timestamp) + n += 1 + l + sovEvents(uint64(l)) + l = len(m.Namespace) + if l > 0 { + n += 1 + l + sovEvents(uint64(l)) + } + l = len(m.Topic) + if l > 0 { + n += 1 + l + sovEvents(uint64(l)) + } + if m.Event != nil { + l = m.Event.Size() + n += 1 + l + sovEvents(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovEvents(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozEvents(x uint64) (n int) { + return sovEvents(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *PublishRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&PublishRequest{`, + `Topic:` + fmt.Sprintf("%v", this.Topic) + `,`, + `Event:` + strings.Replace(fmt.Sprintf("%v", this.Event), "Any", "types.Any", 1) + `,`, + `XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`, + `}`, + }, "") + return s +} +func (this *ForwardRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ForwardRequest{`, + `Envelope:` + strings.Replace(fmt.Sprintf("%v", this.Envelope), "Envelope", "Envelope", 1) + `,`, + `XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`, + `}`, + }, "") + return s +} +func (this *SubscribeRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&SubscribeRequest{`, + `Filters:` + fmt.Sprintf("%v", this.Filters) + `,`, + `XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`, + `}`, + }, "") + return s +} +func (this *Envelope) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Envelope{`, + `Timestamp:` + strings.Replace(strings.Replace(this.Timestamp.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `Namespace:` + fmt.Sprintf("%v", this.Namespace) + `,`, + `Topic:` + fmt.Sprintf("%v", this.Topic) + `,`, + `Event:` + strings.Replace(fmt.Sprintf("%v", this.Event), "Any", "types.Any", 1) + `,`, + `XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`, + `}`, + }, "") + return s +} +func valueToStringEvents(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} + +type EventsService interface { + Publish(ctx context.Context, req *PublishRequest) (*types.Empty, error) + Forward(ctx context.Context, req *ForwardRequest) (*types.Empty, error) + Subscribe(ctx context.Context, req *SubscribeRequest) (*Envelope, error) +} + +func RegisterEventsService(srv *github_com_containerd_ttrpc.Server, svc EventsService) { + srv.Register("containerd.services.events.ttrpc.v1.Events", map[string]github_com_containerd_ttrpc.Method{ + "Publish": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req PublishRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.Publish(ctx, &req) + }, + "Forward": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req ForwardRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.Forward(ctx, &req) + }, + "Subscribe": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req SubscribeRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.Subscribe(ctx, &req) + }, + }) +} + +type eventsClient struct { + client *github_com_containerd_ttrpc.Client +} + +func NewEventsClient(client *github_com_containerd_ttrpc.Client) EventsService { + return &eventsClient{ + client: client, + } +} + +func (c *eventsClient) Publish(ctx context.Context, req *PublishRequest) (*types.Empty, error) { + var resp types.Empty + if err := c.client.Call(ctx, "containerd.services.events.ttrpc.v1.Events", "Publish", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *eventsClient) Forward(ctx context.Context, req *ForwardRequest) (*types.Empty, error) { + var resp types.Empty + if err := c.client.Call(ctx, "containerd.services.events.ttrpc.v1.Events", "Forward", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func (c *eventsClient) Subscribe(ctx context.Context, req *SubscribeRequest) (*Envelope, error) { + var resp Envelope + if err := c.client.Call(ctx, "containerd.services.events.ttrpc.v1.Events", "Subscribe", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} +func (m *PublishRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PublishRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PublishRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Topic = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Event == nil { + m.Event = &types.Any{} + } + if err := m.Event.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipEvents(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ForwardRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ForwardRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ForwardRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Envelope", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Envelope == nil { + m.Envelope = &Envelope{} + } + if err := m.Envelope.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipEvents(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SubscribeRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SubscribeRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SubscribeRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Filters", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Filters = append(m.Filters, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipEvents(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Envelope) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Envelope: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Envelope: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Namespace", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Namespace = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Topic = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Event == nil { + m.Event = &types.Any{} + } + if err := m.Event.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipEvents(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipEvents(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowEvents + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowEvents + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowEvents + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthEvents + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthEvents + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowEvents + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipEvents(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthEvents + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthEvents = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowEvents = fmt.Errorf("proto: integer overflow") +) diff --git a/api/services/ttrpc/events/v1/events.proto b/api/services/ttrpc/events/v1/events.proto new file mode 100644 index 000000000..fb3543e63 --- /dev/null +++ b/api/services/ttrpc/events/v1/events.proto @@ -0,0 +1,56 @@ +syntax = "proto3"; + +package containerd.services.events.ttrpc.v1; + +import weak "github.com/containerd/containerd/protobuf/plugin/fieldpath.proto"; +import weak "gogoproto/gogo.proto"; +import "google/protobuf/any.proto"; +import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; + +option go_package = "github.com/containerd/containerd/api/services/ttrpc/events/v1;events"; + +service Events { + // Publish an event to a topic. + // + // The event will be packed into a timestamp envelope with the namespace + // introspected from the context. The envelope will then be dispatched. + rpc Publish(PublishRequest) returns (google.protobuf.Empty); + + // Forward sends an event that has already been packaged into an envelope + // with a timestamp and namespace. + // + // This is useful if earlier timestamping is required or when forwarding on + // behalf of another component, namespace or publisher. + rpc Forward(ForwardRequest) returns (google.protobuf.Empty); + + // Subscribe to a stream of events, possibly returning only that match any + // of the provided filters. + // + // Unlike many other methods in containerd, subscribers will get messages + // from all namespaces unless otherwise specified. If this is not desired, + // a filter can be provided in the format 'namespace==' to + // restrict the received events. + rpc Subscribe(SubscribeRequest) returns (stream Envelope); +} + +message PublishRequest { + string topic = 1; + google.protobuf.Any event = 2; +} + +message ForwardRequest { + Envelope envelope = 1; +} + +message SubscribeRequest { + repeated string filters = 1; +} + +message Envelope { + option (containerd.plugin.fieldpath) = true; + google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + string namespace = 2; + string topic = 3; + google.protobuf.Any event = 4; +} diff --git a/cmd/containerd/command/main.go b/cmd/containerd/command/main.go index 1c469c040..91e3e5a3b 100644 --- a/cmd/containerd/command/main.go +++ b/cmd/containerd/command/main.go @@ -147,7 +147,10 @@ func App() *cli.App { for _, w := range warnings { log.G(ctx).WithError(w).Warn("cleanup temp mount") } - address := config.GRPC.Address + var ( + address = config.GRPC.Address + ttrpcAddress = fmt.Sprintf("%s.ttrpc", config.GRPC.Address) + ) if address == "" { return errors.New("grpc address cannot be empty") } @@ -188,7 +191,14 @@ func App() *cli.App { } serve(ctx, l, server.ServeMetrics) } + // setup the ttrpc endpoint + tl, err := sys.GetLocalListener(ttrpcAddress, config.GRPC.UID, config.GRPC.GID) + if err != nil { + return errors.Wrapf(err, "failed to get listener for main ttrpc endpoint") + } + serve(ctx, tl, server.ServeTTRPC) + // setup the main grpc endpoint l, err := sys.GetLocalListener(address, config.GRPC.UID, config.GRPC.GID) if err != nil { return errors.Wrapf(err, "failed to get listener for main endpoint") diff --git a/plugin/plugin.go b/plugin/plugin.go index 5e69145ef..e727dd5b8 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -20,6 +20,7 @@ import ( "fmt" "sync" + "github.com/containerd/ttrpc" "github.com/pkg/errors" "google.golang.org/grpc" ) @@ -123,6 +124,11 @@ type Service interface { Register(*grpc.Server) error } +// TTRPCService allows TTRPC services to be registered with the underlying server +type TTRPCService interface { + RegisterTTRPC(*ttrpc.Server) error +} + var register = struct { sync.RWMutex r []*Registration diff --git a/services/events/service.go b/services/events/service.go index d620cbf02..fc1684862 100644 --- a/services/events/service.go +++ b/services/events/service.go @@ -20,10 +20,12 @@ import ( "context" api "github.com/containerd/containerd/api/services/events/v1" + apittrpc "github.com/containerd/containerd/api/services/ttrpc/events/v1" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/plugin" + "github.com/containerd/ttrpc" ptypes "github.com/gogo/protobuf/types" "github.com/pkg/errors" "google.golang.org/grpc" @@ -40,12 +42,18 @@ func init() { } type service struct { - events *exchange.Exchange + ttService *ttrpcService + events *exchange.Exchange } // NewService returns the GRPC events server func NewService(events *exchange.Exchange) api.EventsServer { - return &service{events: events} + return &service{ + ttService: &ttrpcService{ + events: events, + }, + events: events, + } } func (s *service) Register(server *grpc.Server) error { @@ -53,6 +61,11 @@ func (s *service) Register(server *grpc.Server) error { return nil } +func (s *service) RegisterTTRPC(server *ttrpc.Server) error { + apittrpc.RegisterEventsService(server, s.ttService) + return nil +} + func (s *service) Publish(ctx context.Context, r *api.PublishRequest) (*ptypes.Empty, error) { if err := s.events.Publish(ctx, r.Topic, r.Event); err != nil { return nil, errdefs.ToGRPC(err) diff --git a/services/events/ttrpc.go b/services/events/ttrpc.go new file mode 100644 index 000000000..f3767a0bf --- /dev/null +++ b/services/events/ttrpc.go @@ -0,0 +1,60 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "context" + + api "github.com/containerd/containerd/api/services/ttrpc/events/v1" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" + "github.com/containerd/containerd/events/exchange" + ptypes "github.com/gogo/protobuf/types" +) + +type ttrpcService struct { + events *exchange.Exchange +} + +func (s *ttrpcService) Publish(ctx context.Context, r *api.PublishRequest) (*ptypes.Empty, error) { + if err := s.events.Publish(ctx, r.Topic, r.Event); err != nil { + return nil, errdefs.ToGRPC(err) + } + + return &ptypes.Empty{}, nil +} + +func (s *ttrpcService) Forward(ctx context.Context, r *api.ForwardRequest) (*ptypes.Empty, error) { + if err := s.events.Forward(ctx, fromTTProto(r.Envelope)); err != nil { + return nil, errdefs.ToGRPC(err) + } + + return &ptypes.Empty{}, nil +} + +func (s *ttrpcService) Subscribe(ctx context.Context, req *api.SubscribeRequest) (*api.Envelope, error) { + return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "ttrpc does not support streaming") +} + +func fromTTProto(env *api.Envelope) *events.Envelope { + return &events.Envelope{ + Timestamp: env.Timestamp, + Namespace: env.Namespace, + Topic: env.Topic, + Event: env.Event, + } +} diff --git a/services/server/server.go b/services/server/server.go index 140f54f6e..38ffb0756 100644 --- a/services/server/server.go +++ b/services/server/server.go @@ -44,6 +44,7 @@ import ( "github.com/containerd/containerd/snapshots" ssproxy "github.com/containerd/containerd/snapshots/proxy" "github.com/containerd/containerd/sys" + "github.com/containerd/ttrpc" metrics "github.com/docker/go-metrics" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/pkg/errors" @@ -91,13 +92,19 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { if config.GRPC.MaxSendMsgSize > 0 { serverOpts = append(serverOpts, grpc.MaxSendMsgSize(config.GRPC.MaxSendMsgSize)) } - rpc := grpc.NewServer(serverOpts...) + ttrpcServer, err := newTTRPCServer() + if err != nil { + return nil, err + } + grpcServer := grpc.NewServer(serverOpts...) var ( - services []plugin.Service - s = &Server{ - rpc: rpc, - events: exchange.NewExchange(), - config: config, + grpcServices []plugin.Service + ttrpcServices []plugin.TTRPCService + s = &Server{ + grpcServer: grpcServer, + ttrpcServer: ttrpcServer, + events: exchange.NewExchange(), + config: config, } initialized = plugin.NewPluginSet() ) @@ -138,14 +145,22 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { continue } // check for grpc services that should be registered with the server - if service, ok := instance.(plugin.Service); ok { - services = append(services, service) + if src, ok := instance.(plugin.Service); ok { + grpcServices = append(grpcServices, src) + } + if src, ok := instance.(plugin.TTRPCService); ok { + ttrpcServices = append(ttrpcServices, src) } s.plugins = append(s.plugins, result) } // register services after all plugins have been initialized - for _, service := range services { - if err := service.Register(rpc); err != nil { + for _, service := range grpcServices { + if err := service.Register(grpcServer); err != nil { + return nil, err + } + } + for _, service := range ttrpcServices { + if err := service.RegisterTTRPC(ttrpcServer); err != nil { return nil, err } } @@ -154,10 +169,11 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { // Server is the containerd main daemon type Server struct { - rpc *grpc.Server - events *exchange.Exchange - config *srvconfig.Config - plugins []*plugin.Plugin + grpcServer *grpc.Server + ttrpcServer *ttrpc.Server + events *exchange.Exchange + config *srvconfig.Config + plugins []*plugin.Plugin } // ServeGRPC provides the containerd grpc APIs on the provided listener @@ -169,8 +185,13 @@ func (s *Server) ServeGRPC(l net.Listener) error { // before we start serving the grpc API register the grpc_prometheus metrics // handler. This needs to be the last service registered so that it can collect // metrics for every other service - grpc_prometheus.Register(s.rpc) - return trapClosedConnErr(s.rpc.Serve(l)) + grpc_prometheus.Register(s.grpcServer) + return trapClosedConnErr(s.grpcServer.Serve(l)) +} + +// ServeTTRPC provides the containerd ttrpc APIs on the provided listener +func (s *Server) ServeTTRPC(l net.Listener) error { + return trapClosedConnErr(s.ttrpcServer.Serve(context.Background(), l)) } // ServeMetrics provides a prometheus endpoint for exposing metrics @@ -196,7 +217,7 @@ func (s *Server) ServeDebug(l net.Listener) error { // Stop the containerd server canceling any open connections func (s *Server) Stop() { - s.rpc.Stop() + s.grpcServer.Stop() for i := len(s.plugins) - 1; i >= 0; i-- { p := s.plugins[i] instance, err := p.Instance() diff --git a/services/server/server_linux.go b/services/server/server_linux.go index 96b28a572..47692fed2 100644 --- a/services/server/server_linux.go +++ b/services/server/server_linux.go @@ -24,6 +24,7 @@ import ( "github.com/containerd/containerd/log" srvconfig "github.com/containerd/containerd/services/server/config" "github.com/containerd/containerd/sys" + "github.com/containerd/ttrpc" specs "github.com/opencontainers/runtime-spec/specs-go" ) @@ -53,3 +54,7 @@ func apply(ctx context.Context, config *srvconfig.Config) error { } return nil } + +func newTTRPCServer() (*ttrpc.Server, error) { + return ttrpc.NewServer(ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser())) +} diff --git a/services/server/server_windows.go b/services/server/server_windows.go index e0dd19b1d..8b569eb45 100644 --- a/services/server/server_windows.go +++ b/services/server/server_windows.go @@ -22,8 +22,13 @@ import ( "context" srvconfig "github.com/containerd/containerd/services/server/config" + "github.com/containerd/ttrpc" ) func apply(_ context.Context, _ *srvconfig.Config) error { return nil } + +func newTTRPCServer() (*ttrpc.Server, error) { + return ttrpc.NewServer() +}