diff --git a/Protobuild.toml b/Protobuild.toml index 005625f38..3bb2e6287 100644 --- a/Protobuild.toml +++ b/Protobuild.toml @@ -35,6 +35,10 @@ plugins = ["grpc", "fieldpath"] prefixes = ["github.com/containerd/containerd/api/events"] plugins = ["fieldpath"] # disable grpc for this package +[[overrides]] +prefixes = ["github.com/containerd/containerd/api/services/ttrpc/events/v1"] +plugins = ["ttrpc", "fieldpath"] + [[overrides]] # enable ttrpc and disable fieldpath and grpc for the shim prefixes = ["github.com/containerd/containerd/runtime/v1/shim/v1", "github.com/containerd/containerd/runtime/v2/task"] diff --git a/api/next.pb.txt b/api/next.pb.txt index 7a0ed2347..0120a081a 100755 --- a/api/next.pb.txt +++ b/api/next.pb.txt @@ -4202,6 +4202,47 @@ file { weak_dependency: 2 syntax: "proto3" } +file { + name: "github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto" + package: "containerd.services.events.ttrpc.v1" + dependency: "github.com/containerd/containerd/protobuf/plugin/fieldpath.proto" + dependency: "gogoproto/gogo.proto" + dependency: "google/protobuf/any.proto" + dependency: "google/protobuf/empty.proto" + dependency: "google/protobuf/timestamp.proto" + message_type { + name: "PublishRequest" + field { + name: "topic" + number: 1 + label: LABEL_OPTIONAL + type: TYPE_STRING + json_name: "topic" + } + field { + name: "event" + number: 2 + label: LABEL_OPTIONAL + type: TYPE_MESSAGE + type_name: ".google.protobuf.Any" + json_name: "event" + } + } + service { + name: "Events" + method { + name: "Publish" + input_type: ".containerd.services.events.ttrpc.v1.PublishRequest" + output_type: ".google.protobuf.Empty" + } + } + options { + go_package: "github.com/containerd/containerd/api/services/ttrpc/events/v1;events" + } + weak_dependency: 0 + weak_dependency: 1 + syntax: "proto3" +} file { name: "github.com/containerd/containerd/api/services/version/v1/version.proto" package: "containerd.services.version.v1" diff --git a/api/services/ttrpc/events/v1/doc.go b/api/services/ttrpc/events/v1/doc.go new file mode 100644 index 000000000..b7f86da86 --- /dev/null +++ b/api/services/ttrpc/events/v1/doc.go @@ -0,0 +1,18 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +// Package events defines the event pushing and subscription service. +package events diff --git a/api/services/ttrpc/events/v1/events.pb.go b/api/services/ttrpc/events/v1/events.pb.go new file mode 100644 index 000000000..4c73aba93 --- /dev/null +++ b/api/services/ttrpc/events/v1/events.pb.go @@ -0,0 +1,465 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto + +package events + +import ( + context "context" + fmt "fmt" + github_com_containerd_ttrpc "github.com/containerd/ttrpc" + proto "github.com/gogo/protobuf/proto" + types "github.com/gogo/protobuf/types" + io "io" + math "math" + reflect "reflect" + strings "strings" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type PublishRequest struct { + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Event *types.Any `protobuf:"bytes,2,opt,name=event,proto3" json:"event,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PublishRequest) Reset() { *m = PublishRequest{} } +func (*PublishRequest) ProtoMessage() {} +func (*PublishRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_19f98672016720b5, []int{0} +} +func (m *PublishRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PublishRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PublishRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PublishRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_PublishRequest.Merge(m, src) +} +func (m *PublishRequest) XXX_Size() int { + return m.Size() +} +func (m *PublishRequest) XXX_DiscardUnknown() { + xxx_messageInfo_PublishRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_PublishRequest proto.InternalMessageInfo + +func init() { + proto.RegisterType((*PublishRequest)(nil), "containerd.services.events.ttrpc.v1.PublishRequest") +} + +func init() { + proto.RegisterFile("github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto", fileDescriptor_19f98672016720b5) +} + +var fileDescriptor_19f98672016720b5 = []byte{ + // 311 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x51, 0x3f, 0x4b, 0x33, 0x31, + 0x18, 0x6f, 0x5e, 0x68, 0x5f, 0x8c, 0xe0, 0x70, 0x14, 0xa9, 0x15, 0x62, 0xd1, 0xa5, 0x38, 0x24, + 0xb4, 0x1d, 0x5d, 0x54, 0xec, 0xe2, 0x24, 0x37, 0x48, 0x71, 0x10, 0xef, 0xae, 0x69, 0x1a, 0xb8, + 0x4b, 0xe2, 0xe5, 0xb9, 0x83, 0x6e, 0x7e, 0xbc, 0x8e, 0x8e, 0x8e, 0xf6, 0x3e, 0x89, 0x34, 0xb9, + 0x52, 0xad, 0x83, 0x82, 0xdb, 0x2f, 0xf9, 0xfd, 0xcb, 0xf3, 0x04, 0xdf, 0x0a, 0x09, 0xf3, 0x22, + 0xa6, 0x89, 0xce, 0x58, 0xa2, 0x15, 0x44, 0x52, 0xf1, 0x7c, 0xfa, 0x19, 0x46, 0x46, 0x32, 0xcb, + 0xf3, 0x52, 0x26, 0xdc, 0x32, 0x80, 0xdc, 0x24, 0x8c, 0x97, 0x5c, 0x81, 0x65, 0xe5, 0xa0, 0x46, + 0xd4, 0xe4, 0x1a, 0x74, 0x70, 0xb6, 0x75, 0xd1, 0x8d, 0x83, 0xd6, 0x0a, 0x67, 0xa4, 0xe5, 0xa0, + 0x7b, 0xf9, 0x63, 0xa1, 0x0b, 0x8b, 0x8b, 0x19, 0x33, 0x69, 0x21, 0xa4, 0x62, 0x33, 0xc9, 0xd3, + 0xa9, 0x89, 0x60, 0xee, 0x6b, 0xba, 0x6d, 0xa1, 0x85, 0x76, 0x90, 0xad, 0x51, 0x7d, 0x7b, 0x24, + 0xb4, 0x16, 0x29, 0xdf, 0xba, 0x23, 0xb5, 0xa8, 0xa9, 0xe3, 0x5d, 0x8a, 0x67, 0x06, 0x36, 0xe4, + 0xc9, 0x2e, 0x09, 0x32, 0xe3, 0x16, 0xa2, 0xcc, 0x78, 0xc1, 0x69, 0x88, 0x0f, 0xee, 0x8a, 0x38, + 0x95, 0x76, 0x1e, 0xf2, 0xe7, 0x82, 0x5b, 0x08, 0xda, 0xb8, 0x09, 0xda, 0xc8, 0xa4, 0x83, 0x7a, + 0xa8, 0xbf, 0x17, 0xfa, 0x43, 0x70, 0x8e, 0x9b, 0x6e, 0xd6, 0xce, 0xbf, 0x1e, 0xea, 0xef, 0x0f, + 0xdb, 0xd4, 0x07, 0xd3, 0x4d, 0x30, 0xbd, 0x52, 0x8b, 0xd0, 0x4b, 0x86, 0x4f, 0xb8, 0x35, 0x76, + 0x7b, 0x09, 0xee, 0xf1, 0xff, 0x3a, 0x3d, 0x18, 0xd1, 0x5f, 0xec, 0x8f, 0x7e, 0x7d, 0x4b, 0xf7, + 0xf0, 0x5b, 0xcd, 0x78, 0x3d, 0xdc, 0xf5, 0xe3, 0x72, 0x45, 0x1a, 0x6f, 0x2b, 0xd2, 0x78, 0xa9, + 0x08, 0x5a, 0x56, 0x04, 0xbd, 0x56, 0x04, 0xbd, 0x57, 0x04, 0x3d, 0xdc, 0xfc, 0xe9, 0xc7, 0x2f, + 0x3c, 0x9a, 0x34, 0x26, 0x28, 0x6e, 0xb9, 0xce, 0xd1, 0x47, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8d, + 0x47, 0xe0, 0xf5, 0x44, 0x02, 0x00, 0x00, +} + +func (m *PublishRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PublishRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Topic) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintEvents(dAtA, i, uint64(len(m.Topic))) + i += copy(dAtA[i:], m.Topic) + } + if m.Event != nil { + dAtA[i] = 0x12 + i++ + i = encodeVarintEvents(dAtA, i, uint64(m.Event.Size())) + n1, err := m.Event.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + +func encodeVarintEvents(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *PublishRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Topic) + if l > 0 { + n += 1 + l + sovEvents(uint64(l)) + } + if m.Event != nil { + l = m.Event.Size() + n += 1 + l + sovEvents(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovEvents(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozEvents(x uint64) (n int) { + return sovEvents(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *PublishRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&PublishRequest{`, + `Topic:` + fmt.Sprintf("%v", this.Topic) + `,`, + `Event:` + strings.Replace(fmt.Sprintf("%v", this.Event), "Any", "types.Any", 1) + `,`, + `XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`, + `}`, + }, "") + return s +} +func valueToStringEvents(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} + +type EventsService interface { + Publish(ctx context.Context, req *PublishRequest) (*types.Empty, error) +} + +func RegisterEventsService(srv *github_com_containerd_ttrpc.Server, svc EventsService) { + srv.Register("containerd.services.events.ttrpc.v1.Events", map[string]github_com_containerd_ttrpc.Method{ + "Publish": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) { + var req PublishRequest + if err := unmarshal(&req); err != nil { + return nil, err + } + return svc.Publish(ctx, &req) + }, + }) +} + +type eventsClient struct { + client *github_com_containerd_ttrpc.Client +} + +func NewEventsClient(client *github_com_containerd_ttrpc.Client) EventsService { + return &eventsClient{ + client: client, + } +} + +func (c *eventsClient) Publish(ctx context.Context, req *PublishRequest) (*types.Empty, error) { + var resp types.Empty + if err := c.client.Call(ctx, "containerd.services.events.ttrpc.v1.Events", "Publish", req, &resp); err != nil { + return nil, err + } + return &resp, nil +} +func (m *PublishRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PublishRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PublishRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Topic = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowEvents + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthEvents + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthEvents + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Event == nil { + m.Event = &types.Any{} + } + if err := m.Event.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipEvents(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthEvents + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipEvents(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowEvents + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowEvents + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowEvents + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthEvents + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthEvents + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowEvents + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipEvents(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthEvents + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthEvents = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowEvents = fmt.Errorf("proto: integer overflow") +) diff --git a/api/services/ttrpc/events/v1/events.proto b/api/services/ttrpc/events/v1/events.proto new file mode 100644 index 000000000..9e5aff77b --- /dev/null +++ b/api/services/ttrpc/events/v1/events.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; + +package containerd.services.events.ttrpc.v1; + +import weak "github.com/containerd/containerd/protobuf/plugin/fieldpath.proto"; +import weak "gogoproto/gogo.proto"; +import "google/protobuf/any.proto"; +import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; + +option go_package = "github.com/containerd/containerd/api/services/ttrpc/events/v1;events"; + +service Events { + // Publish an event to a topic. + // + // The event will be packed into a timestamp envelope with the namespace + // introspected from the context. The envelope will then be dispatched. + rpc Publish(PublishRequest) returns (google.protobuf.Empty); + +} + +message PublishRequest { + string topic = 1; + google.protobuf.Any event = 2; +} diff --git a/cmd/containerd/command/main.go b/cmd/containerd/command/main.go index 1c469c040..91e3e5a3b 100644 --- a/cmd/containerd/command/main.go +++ b/cmd/containerd/command/main.go @@ -147,7 +147,10 @@ func App() *cli.App { for _, w := range warnings { log.G(ctx).WithError(w).Warn("cleanup temp mount") } - address := config.GRPC.Address + var ( + address = config.GRPC.Address + ttrpcAddress = fmt.Sprintf("%s.ttrpc", config.GRPC.Address) + ) if address == "" { return errors.New("grpc address cannot be empty") } @@ -188,7 +191,14 @@ func App() *cli.App { } serve(ctx, l, server.ServeMetrics) } + // setup the ttrpc endpoint + tl, err := sys.GetLocalListener(ttrpcAddress, config.GRPC.UID, config.GRPC.GID) + if err != nil { + return errors.Wrapf(err, "failed to get listener for main ttrpc endpoint") + } + serve(ctx, tl, server.ServeTTRPC) + // setup the main grpc endpoint l, err := sys.GetLocalListener(address, config.GRPC.UID, config.GRPC.GID) if err != nil { return errors.Wrapf(err, "failed to get listener for main endpoint") diff --git a/plugin/plugin.go b/plugin/plugin.go index 5e69145ef..e727dd5b8 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -20,6 +20,7 @@ import ( "fmt" "sync" + "github.com/containerd/ttrpc" "github.com/pkg/errors" "google.golang.org/grpc" ) @@ -123,6 +124,11 @@ type Service interface { Register(*grpc.Server) error } +// TTRPCService allows TTRPC services to be registered with the underlying server +type TTRPCService interface { + RegisterTTRPC(*ttrpc.Server) error +} + var register = struct { sync.RWMutex r []*Registration diff --git a/runtime/v2/runc/v1/service.go b/runtime/v2/runc/v1/service.go index 62026893b..cba16ca53 100644 --- a/runtime/v2/runc/v1/service.go +++ b/runtime/v2/runc/v1/service.go @@ -117,7 +117,6 @@ func newCommand(ctx context.Context, id, containerdBinary, containerdAddress str "-namespace", ns, "-id", id, "-address", containerdAddress, - "-publish-binary", containerdBinary, } cmd := exec.Command(self, args...) cmd.Dir = cwd diff --git a/runtime/v2/runc/v2/service.go b/runtime/v2/runc/v2/service.go index 98ff279ee..3a25d85ce 100644 --- a/runtime/v2/runc/v2/service.go +++ b/runtime/v2/runc/v2/service.go @@ -133,7 +133,6 @@ func newCommand(ctx context.Context, id, containerdBinary, containerdAddress str "-namespace", ns, "-id", id, "-address", containerdAddress, - "-publish-binary", containerdBinary, } cmd := exec.Command(self, args...) cmd.Dir = cwd diff --git a/runtime/v2/shim/shim.go b/runtime/v2/shim/shim.go index d60d49663..7e994d6a4 100644 --- a/runtime/v2/shim/shim.go +++ b/runtime/v2/shim/shim.go @@ -20,17 +20,20 @@ import ( "context" "flag" "fmt" + "net" "os" "runtime" "runtime/debug" "strings" "time" + v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1" "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" shimapi "github.com/containerd/containerd/runtime/v2/task" "github.com/containerd/ttrpc" + "github.com/containerd/typeurl" "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -153,11 +156,16 @@ func run(id string, initFunc Init, config Config) error { return err } } + publisher := &remoteEventsPublisher{ - address: addressFlag, - containerdBinaryPath: containerdBinaryFlag, - noReaper: config.NoReaper, + address: fmt.Sprintf("%s.ttrpc", addressFlag), } + conn, err := connect(publisher.address, dialer) + if err != nil { + return err + } + defer conn.Close() + publisher.client = v1.NewEventsClient(ttrpc.NewClient(conn)) if namespaceFlag == "" { return fmt.Errorf("shim namespace cannot be empty") } @@ -282,7 +290,22 @@ func dumpStacks(logger *logrus.Entry) { } type remoteEventsPublisher struct { - address string - containerdBinaryPath string - noReaper bool + address string + client v1.EventsService +} + +func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { + any, err := typeurl.MarshalAny(event) + if err != nil { + return err + } + _, err = l.client.Publish(ctx, &v1.PublishRequest{ + Topic: topic, + Event: any, + }) + return err +} + +func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) { + return d(address, 100*time.Second) } diff --git a/runtime/v2/shim/shim_unix.go b/runtime/v2/shim/shim_unix.go index 87b366f7b..a1ab23c79 100644 --- a/runtime/v2/shim/shim_unix.go +++ b/runtime/v2/shim/shim_unix.go @@ -19,31 +19,21 @@ package shim import ( - "bytes" "context" "io" "net" "os" - "os/exec" "os/signal" - "sync" + "strings" "syscall" + "time" - "github.com/containerd/containerd/events" - "github.com/containerd/containerd/namespaces" "github.com/containerd/fifo" - "github.com/containerd/typeurl" "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/sys/unix" ) -var bufPool = sync.Pool{ - New: func() interface{} { - return bytes.NewBuffer(nil) - }, -} - // setupSignals creates a new signal handler for all signals and sets the shim as a // sub-reaper so that the container processes are reparented func setupSignals(config Config) (chan os.Signal, error) { @@ -101,41 +91,7 @@ func openLog(ctx context.Context, _ string) (io.Writer, error) { return fifo.OpenFifo(ctx, "log", unix.O_WRONLY, 0700) } -func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { - ns, _ := namespaces.Namespace(ctx) - encoded, err := typeurl.MarshalAny(event) - if err != nil { - return err - } - data, err := encoded.Marshal() - if err != nil { - return err - } - cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns) - cmd.Stdin = bytes.NewReader(data) - b := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(b) - cmd.Stdout = b - cmd.Stderr = b - if l.noReaper { - if err := cmd.Start(); err != nil { - return err - } - if err := cmd.Wait(); err != nil { - return errors.Wrapf(err, "failed to publish event: %s", b.String()) - } - return nil - } - c, err := Default.Start(cmd) - if err != nil { - return err - } - status, err := Default.Wait(cmd, c) - if err != nil { - return errors.Wrapf(err, "failed to publish event: %s", b.String()) - } - if status != 0 { - return errors.Errorf("failed to publish event: %s", b.String()) - } - return nil +func dialer(address string, timeout time.Duration) (net.Conn, error) { + address = strings.TrimPrefix(address, "unix://") + return net.DialTimeout("unix", address, timeout) } diff --git a/runtime/v2/shim/shim_windows.go b/runtime/v2/shim/shim_windows.go index e9d9db7ba..ac5e3d719 100644 --- a/runtime/v2/shim/shim_windows.go +++ b/runtime/v2/shim/shim_windows.go @@ -25,15 +25,13 @@ import ( "io" "net" "os" - "os/exec" "sync" + "time" "unsafe" winio "github.com/Microsoft/go-winio" - "github.com/containerd/containerd/events" "github.com/containerd/containerd/namespaces" "github.com/containerd/ttrpc" - "github.com/containerd/typeurl" "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/sys/windows" @@ -286,17 +284,34 @@ func openLog(ctx context.Context, id string) (io.Writer, error) { return dswl, nil } -func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error { - ns, _ := namespaces.Namespace(ctx) - encoded, err := typeurl.MarshalAny(event) - if err != nil { - return err +func dialer(address string, timeout time.Duration) (net.Conn, error) { + var c net.Conn + var lastError error + timedOutError := errors.Errorf("timed out waiting for npipe %s", address) + start := time.Now() + for { + remaining := timeout - time.Since(start) + if remaining <= 0 { + lastError = timedOutError + break + } + c, lastError = winio.DialPipe(address, &remaining) + if lastError == nil { + break + } + if !os.IsNotExist(lastError) { + break + } + // There is nobody serving the pipe. We limit the timeout for this case + // to 5 seconds because any shim that would serve this endpoint should + // serve it within 5 seconds. We use the passed in timeout for the + // `DialPipe` timeout if the pipe exists however to give the pipe time + // to `Accept` the connection. + if time.Since(start) >= 5*time.Second { + lastError = timedOutError + break + } + time.Sleep(10 * time.Millisecond) } - data, err := encoded.Marshal() - if err != nil { - return err - } - cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns) - cmd.Stdin = bytes.NewReader(data) - return cmd.Run() + return c, lastError } diff --git a/services/events/service.go b/services/events/service.go index d620cbf02..fc1684862 100644 --- a/services/events/service.go +++ b/services/events/service.go @@ -20,10 +20,12 @@ import ( "context" api "github.com/containerd/containerd/api/services/events/v1" + apittrpc "github.com/containerd/containerd/api/services/ttrpc/events/v1" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/plugin" + "github.com/containerd/ttrpc" ptypes "github.com/gogo/protobuf/types" "github.com/pkg/errors" "google.golang.org/grpc" @@ -40,12 +42,18 @@ func init() { } type service struct { - events *exchange.Exchange + ttService *ttrpcService + events *exchange.Exchange } // NewService returns the GRPC events server func NewService(events *exchange.Exchange) api.EventsServer { - return &service{events: events} + return &service{ + ttService: &ttrpcService{ + events: events, + }, + events: events, + } } func (s *service) Register(server *grpc.Server) error { @@ -53,6 +61,11 @@ func (s *service) Register(server *grpc.Server) error { return nil } +func (s *service) RegisterTTRPC(server *ttrpc.Server) error { + apittrpc.RegisterEventsService(server, s.ttService) + return nil +} + func (s *service) Publish(ctx context.Context, r *api.PublishRequest) (*ptypes.Empty, error) { if err := s.events.Publish(ctx, r.Topic, r.Event); err != nil { return nil, errdefs.ToGRPC(err) diff --git a/services/events/ttrpc.go b/services/events/ttrpc.go new file mode 100644 index 000000000..42b476d74 --- /dev/null +++ b/services/events/ttrpc.go @@ -0,0 +1,38 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "context" + + api "github.com/containerd/containerd/api/services/ttrpc/events/v1" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events/exchange" + ptypes "github.com/gogo/protobuf/types" +) + +type ttrpcService struct { + events *exchange.Exchange +} + +func (s *ttrpcService) Publish(ctx context.Context, r *api.PublishRequest) (*ptypes.Empty, error) { + if err := s.events.Publish(ctx, r.Topic, r.Event); err != nil { + return nil, errdefs.ToGRPC(err) + } + + return &ptypes.Empty{}, nil +} diff --git a/services/server/server.go b/services/server/server.go index 140f54f6e..38ffb0756 100644 --- a/services/server/server.go +++ b/services/server/server.go @@ -44,6 +44,7 @@ import ( "github.com/containerd/containerd/snapshots" ssproxy "github.com/containerd/containerd/snapshots/proxy" "github.com/containerd/containerd/sys" + "github.com/containerd/ttrpc" metrics "github.com/docker/go-metrics" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/pkg/errors" @@ -91,13 +92,19 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { if config.GRPC.MaxSendMsgSize > 0 { serverOpts = append(serverOpts, grpc.MaxSendMsgSize(config.GRPC.MaxSendMsgSize)) } - rpc := grpc.NewServer(serverOpts...) + ttrpcServer, err := newTTRPCServer() + if err != nil { + return nil, err + } + grpcServer := grpc.NewServer(serverOpts...) var ( - services []plugin.Service - s = &Server{ - rpc: rpc, - events: exchange.NewExchange(), - config: config, + grpcServices []plugin.Service + ttrpcServices []plugin.TTRPCService + s = &Server{ + grpcServer: grpcServer, + ttrpcServer: ttrpcServer, + events: exchange.NewExchange(), + config: config, } initialized = plugin.NewPluginSet() ) @@ -138,14 +145,22 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { continue } // check for grpc services that should be registered with the server - if service, ok := instance.(plugin.Service); ok { - services = append(services, service) + if src, ok := instance.(plugin.Service); ok { + grpcServices = append(grpcServices, src) + } + if src, ok := instance.(plugin.TTRPCService); ok { + ttrpcServices = append(ttrpcServices, src) } s.plugins = append(s.plugins, result) } // register services after all plugins have been initialized - for _, service := range services { - if err := service.Register(rpc); err != nil { + for _, service := range grpcServices { + if err := service.Register(grpcServer); err != nil { + return nil, err + } + } + for _, service := range ttrpcServices { + if err := service.RegisterTTRPC(ttrpcServer); err != nil { return nil, err } } @@ -154,10 +169,11 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { // Server is the containerd main daemon type Server struct { - rpc *grpc.Server - events *exchange.Exchange - config *srvconfig.Config - plugins []*plugin.Plugin + grpcServer *grpc.Server + ttrpcServer *ttrpc.Server + events *exchange.Exchange + config *srvconfig.Config + plugins []*plugin.Plugin } // ServeGRPC provides the containerd grpc APIs on the provided listener @@ -169,8 +185,13 @@ func (s *Server) ServeGRPC(l net.Listener) error { // before we start serving the grpc API register the grpc_prometheus metrics // handler. This needs to be the last service registered so that it can collect // metrics for every other service - grpc_prometheus.Register(s.rpc) - return trapClosedConnErr(s.rpc.Serve(l)) + grpc_prometheus.Register(s.grpcServer) + return trapClosedConnErr(s.grpcServer.Serve(l)) +} + +// ServeTTRPC provides the containerd ttrpc APIs on the provided listener +func (s *Server) ServeTTRPC(l net.Listener) error { + return trapClosedConnErr(s.ttrpcServer.Serve(context.Background(), l)) } // ServeMetrics provides a prometheus endpoint for exposing metrics @@ -196,7 +217,7 @@ func (s *Server) ServeDebug(l net.Listener) error { // Stop the containerd server canceling any open connections func (s *Server) Stop() { - s.rpc.Stop() + s.grpcServer.Stop() for i := len(s.plugins) - 1; i >= 0; i-- { p := s.plugins[i] instance, err := p.Instance() diff --git a/services/server/server_linux.go b/services/server/server_linux.go index 96b28a572..47692fed2 100644 --- a/services/server/server_linux.go +++ b/services/server/server_linux.go @@ -24,6 +24,7 @@ import ( "github.com/containerd/containerd/log" srvconfig "github.com/containerd/containerd/services/server/config" "github.com/containerd/containerd/sys" + "github.com/containerd/ttrpc" specs "github.com/opencontainers/runtime-spec/specs-go" ) @@ -53,3 +54,7 @@ func apply(ctx context.Context, config *srvconfig.Config) error { } return nil } + +func newTTRPCServer() (*ttrpc.Server, error) { + return ttrpc.NewServer(ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser())) +} diff --git a/services/server/server_unsupported.go b/services/server/server_unsupported.go index a6f187651..80674e69e 100644 --- a/services/server/server_unsupported.go +++ b/services/server/server_unsupported.go @@ -22,8 +22,13 @@ import ( "context" srvconfig "github.com/containerd/containerd/services/server/config" + "github.com/containerd/ttrpc" ) func apply(_ context.Context, _ *srvconfig.Config) error { return nil } + +func newTTRPCServer() (*ttrpc.Server, error) { + return ttrpc.NewServer() +} diff --git a/services/server/server_windows.go b/services/server/server_windows.go index e0dd19b1d..8b569eb45 100644 --- a/services/server/server_windows.go +++ b/services/server/server_windows.go @@ -22,8 +22,13 @@ import ( "context" srvconfig "github.com/containerd/containerd/services/server/config" + "github.com/containerd/ttrpc" ) func apply(_ context.Context, _ *srvconfig.Config) error { return nil } + +func newTTRPCServer() (*ttrpc.Server, error) { + return ttrpc.NewServer() +}