Merge pull request #3195 from crosbymichael/ttrpc-love

Use ttrpc in the daemon for event publishing from shims
This commit is contained in:
Phil Estes 2019-04-10 10:29:01 +02:00 committed by GitHub
commit 475619c29e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 740 additions and 92 deletions

View File

@ -35,6 +35,10 @@ plugins = ["grpc", "fieldpath"]
prefixes = ["github.com/containerd/containerd/api/events"]
plugins = ["fieldpath"] # disable grpc for this package
[[overrides]]
prefixes = ["github.com/containerd/containerd/api/services/ttrpc/events/v1"]
plugins = ["ttrpc", "fieldpath"]
[[overrides]]
# enable ttrpc and disable fieldpath and grpc for the shim
prefixes = ["github.com/containerd/containerd/runtime/v1/shim/v1", "github.com/containerd/containerd/runtime/v2/task"]

View File

@ -4202,6 +4202,47 @@ file {
weak_dependency: 2
syntax: "proto3"
}
file {
name: "github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto"
package: "containerd.services.events.ttrpc.v1"
dependency: "github.com/containerd/containerd/protobuf/plugin/fieldpath.proto"
dependency: "gogoproto/gogo.proto"
dependency: "google/protobuf/any.proto"
dependency: "google/protobuf/empty.proto"
dependency: "google/protobuf/timestamp.proto"
message_type {
name: "PublishRequest"
field {
name: "topic"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "topic"
}
field {
name: "event"
number: 2
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".google.protobuf.Any"
json_name: "event"
}
}
service {
name: "Events"
method {
name: "Publish"
input_type: ".containerd.services.events.ttrpc.v1.PublishRequest"
output_type: ".google.protobuf.Empty"
}
}
options {
go_package: "github.com/containerd/containerd/api/services/ttrpc/events/v1;events"
}
weak_dependency: 0
weak_dependency: 1
syntax: "proto3"
}
file {
name: "github.com/containerd/containerd/api/services/version/v1/version.proto"
package: "containerd.services.version.v1"

View File

@ -0,0 +1,18 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package events defines the event pushing and subscription service.
package events

View File

@ -0,0 +1,465 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto
package events
import (
context "context"
fmt "fmt"
github_com_containerd_ttrpc "github.com/containerd/ttrpc"
proto "github.com/gogo/protobuf/proto"
types "github.com/gogo/protobuf/types"
io "io"
math "math"
reflect "reflect"
strings "strings"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
type PublishRequest struct {
Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"`
Event *types.Any `protobuf:"bytes,2,opt,name=event,proto3" json:"event,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *PublishRequest) Reset() { *m = PublishRequest{} }
func (*PublishRequest) ProtoMessage() {}
func (*PublishRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_19f98672016720b5, []int{0}
}
func (m *PublishRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *PublishRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_PublishRequest.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *PublishRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_PublishRequest.Merge(m, src)
}
func (m *PublishRequest) XXX_Size() int {
return m.Size()
}
func (m *PublishRequest) XXX_DiscardUnknown() {
xxx_messageInfo_PublishRequest.DiscardUnknown(m)
}
var xxx_messageInfo_PublishRequest proto.InternalMessageInfo
func init() {
proto.RegisterType((*PublishRequest)(nil), "containerd.services.events.ttrpc.v1.PublishRequest")
}
func init() {
proto.RegisterFile("github.com/containerd/containerd/api/services/ttrpc/events/v1/events.proto", fileDescriptor_19f98672016720b5)
}
var fileDescriptor_19f98672016720b5 = []byte{
// 311 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x51, 0x3f, 0x4b, 0x33, 0x31,
0x18, 0x6f, 0x5e, 0x68, 0x5f, 0x8c, 0xe0, 0x70, 0x14, 0xa9, 0x15, 0x62, 0xd1, 0xa5, 0x38, 0x24,
0xb4, 0x1d, 0x5d, 0x54, 0xec, 0xe2, 0x24, 0x37, 0x48, 0x71, 0x10, 0xef, 0xae, 0x69, 0x1a, 0xb8,
0x4b, 0xe2, 0xe5, 0xb9, 0x83, 0x6e, 0x7e, 0xbc, 0x8e, 0x8e, 0x8e, 0xf6, 0x3e, 0x89, 0x34, 0xb9,
0x52, 0xad, 0x83, 0x82, 0xdb, 0x2f, 0xf9, 0xfd, 0xcb, 0xf3, 0x04, 0xdf, 0x0a, 0x09, 0xf3, 0x22,
0xa6, 0x89, 0xce, 0x58, 0xa2, 0x15, 0x44, 0x52, 0xf1, 0x7c, 0xfa, 0x19, 0x46, 0x46, 0x32, 0xcb,
0xf3, 0x52, 0x26, 0xdc, 0x32, 0x80, 0xdc, 0x24, 0x8c, 0x97, 0x5c, 0x81, 0x65, 0xe5, 0xa0, 0x46,
0xd4, 0xe4, 0x1a, 0x74, 0x70, 0xb6, 0x75, 0xd1, 0x8d, 0x83, 0xd6, 0x0a, 0x67, 0xa4, 0xe5, 0xa0,
0x7b, 0xf9, 0x63, 0xa1, 0x0b, 0x8b, 0x8b, 0x19, 0x33, 0x69, 0x21, 0xa4, 0x62, 0x33, 0xc9, 0xd3,
0xa9, 0x89, 0x60, 0xee, 0x6b, 0xba, 0x6d, 0xa1, 0x85, 0x76, 0x90, 0xad, 0x51, 0x7d, 0x7b, 0x24,
0xb4, 0x16, 0x29, 0xdf, 0xba, 0x23, 0xb5, 0xa8, 0xa9, 0xe3, 0x5d, 0x8a, 0x67, 0x06, 0x36, 0xe4,
0xc9, 0x2e, 0x09, 0x32, 0xe3, 0x16, 0xa2, 0xcc, 0x78, 0xc1, 0x69, 0x88, 0x0f, 0xee, 0x8a, 0x38,
0x95, 0x76, 0x1e, 0xf2, 0xe7, 0x82, 0x5b, 0x08, 0xda, 0xb8, 0x09, 0xda, 0xc8, 0xa4, 0x83, 0x7a,
0xa8, 0xbf, 0x17, 0xfa, 0x43, 0x70, 0x8e, 0x9b, 0x6e, 0xd6, 0xce, 0xbf, 0x1e, 0xea, 0xef, 0x0f,
0xdb, 0xd4, 0x07, 0xd3, 0x4d, 0x30, 0xbd, 0x52, 0x8b, 0xd0, 0x4b, 0x86, 0x4f, 0xb8, 0x35, 0x76,
0x7b, 0x09, 0xee, 0xf1, 0xff, 0x3a, 0x3d, 0x18, 0xd1, 0x5f, 0xec, 0x8f, 0x7e, 0x7d, 0x4b, 0xf7,
0xf0, 0x5b, 0xcd, 0x78, 0x3d, 0xdc, 0xf5, 0xe3, 0x72, 0x45, 0x1a, 0x6f, 0x2b, 0xd2, 0x78, 0xa9,
0x08, 0x5a, 0x56, 0x04, 0xbd, 0x56, 0x04, 0xbd, 0x57, 0x04, 0x3d, 0xdc, 0xfc, 0xe9, 0xc7, 0x2f,
0x3c, 0x9a, 0x34, 0x26, 0x28, 0x6e, 0xb9, 0xce, 0xd1, 0x47, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8d,
0x47, 0xe0, 0xf5, 0x44, 0x02, 0x00, 0x00,
}
func (m *PublishRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PublishRequest) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.Topic) > 0 {
dAtA[i] = 0xa
i++
i = encodeVarintEvents(dAtA, i, uint64(len(m.Topic)))
i += copy(dAtA[i:], m.Topic)
}
if m.Event != nil {
dAtA[i] = 0x12
i++
i = encodeVarintEvents(dAtA, i, uint64(m.Event.Size()))
n1, err := m.Event.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n1
}
if m.XXX_unrecognized != nil {
i += copy(dAtA[i:], m.XXX_unrecognized)
}
return i, nil
}
func encodeVarintEvents(dAtA []byte, offset int, v uint64) int {
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return offset + 1
}
func (m *PublishRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Topic)
if l > 0 {
n += 1 + l + sovEvents(uint64(l))
}
if m.Event != nil {
l = m.Event.Size()
n += 1 + l + sovEvents(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovEvents(x uint64) (n int) {
for {
n++
x >>= 7
if x == 0 {
break
}
}
return n
}
func sozEvents(x uint64) (n int) {
return sovEvents(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (this *PublishRequest) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&PublishRequest{`,
`Topic:` + fmt.Sprintf("%v", this.Topic) + `,`,
`Event:` + strings.Replace(fmt.Sprintf("%v", this.Event), "Any", "types.Any", 1) + `,`,
`XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`,
`}`,
}, "")
return s
}
func valueToStringEvents(v interface{}) string {
rv := reflect.ValueOf(v)
if rv.IsNil() {
return "nil"
}
pv := reflect.Indirect(rv).Interface()
return fmt.Sprintf("*%v", pv)
}
type EventsService interface {
Publish(ctx context.Context, req *PublishRequest) (*types.Empty, error)
}
func RegisterEventsService(srv *github_com_containerd_ttrpc.Server, svc EventsService) {
srv.Register("containerd.services.events.ttrpc.v1.Events", map[string]github_com_containerd_ttrpc.Method{
"Publish": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
var req PublishRequest
if err := unmarshal(&req); err != nil {
return nil, err
}
return svc.Publish(ctx, &req)
},
})
}
type eventsClient struct {
client *github_com_containerd_ttrpc.Client
}
func NewEventsClient(client *github_com_containerd_ttrpc.Client) EventsService {
return &eventsClient{
client: client,
}
}
func (c *eventsClient) Publish(ctx context.Context, req *PublishRequest) (*types.Empty, error) {
var resp types.Empty
if err := c.client.Call(ctx, "containerd.services.events.ttrpc.v1.Events", "Publish", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (m *PublishRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PublishRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PublishRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Topic", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthEvents
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthEvents
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Topic = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Event", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowEvents
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthEvents
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthEvents
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Event == nil {
m.Event = &types.Any{}
}
if err := m.Event.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipEvents(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthEvents
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthEvents
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipEvents(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowEvents
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowEvents
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowEvents
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthEvents
}
iNdEx += length
if iNdEx < 0 {
return 0, ErrInvalidLengthEvents
}
return iNdEx, nil
case 3:
for {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowEvents
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipEvents(dAtA[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
if iNdEx < 0 {
return 0, ErrInvalidLengthEvents
}
}
return iNdEx, nil
case 4:
return iNdEx, nil
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
}
panic("unreachable")
}
var (
ErrInvalidLengthEvents = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowEvents = fmt.Errorf("proto: integer overflow")
)

View File

@ -0,0 +1,25 @@
syntax = "proto3";
package containerd.services.events.ttrpc.v1;
import weak "github.com/containerd/containerd/protobuf/plugin/fieldpath.proto";
import weak "gogoproto/gogo.proto";
import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
option go_package = "github.com/containerd/containerd/api/services/ttrpc/events/v1;events";
service Events {
// Publish an event to a topic.
//
// The event will be packed into a timestamp envelope with the namespace
// introspected from the context. The envelope will then be dispatched.
rpc Publish(PublishRequest) returns (google.protobuf.Empty);
}
message PublishRequest {
string topic = 1;
google.protobuf.Any event = 2;
}

View File

@ -147,7 +147,10 @@ func App() *cli.App {
for _, w := range warnings {
log.G(ctx).WithError(w).Warn("cleanup temp mount")
}
address := config.GRPC.Address
var (
address = config.GRPC.Address
ttrpcAddress = fmt.Sprintf("%s.ttrpc", config.GRPC.Address)
)
if address == "" {
return errors.New("grpc address cannot be empty")
}
@ -188,7 +191,14 @@ func App() *cli.App {
}
serve(ctx, l, server.ServeMetrics)
}
// setup the ttrpc endpoint
tl, err := sys.GetLocalListener(ttrpcAddress, config.GRPC.UID, config.GRPC.GID)
if err != nil {
return errors.Wrapf(err, "failed to get listener for main ttrpc endpoint")
}
serve(ctx, tl, server.ServeTTRPC)
// setup the main grpc endpoint
l, err := sys.GetLocalListener(address, config.GRPC.UID, config.GRPC.GID)
if err != nil {
return errors.Wrapf(err, "failed to get listener for main endpoint")

View File

@ -20,6 +20,7 @@ import (
"fmt"
"sync"
"github.com/containerd/ttrpc"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
@ -123,6 +124,11 @@ type Service interface {
Register(*grpc.Server) error
}
// TTRPCService allows TTRPC services to be registered with the underlying server
type TTRPCService interface {
RegisterTTRPC(*ttrpc.Server) error
}
var register = struct {
sync.RWMutex
r []*Registration

View File

@ -117,7 +117,6 @@ func newCommand(ctx context.Context, id, containerdBinary, containerdAddress str
"-namespace", ns,
"-id", id,
"-address", containerdAddress,
"-publish-binary", containerdBinary,
}
cmd := exec.Command(self, args...)
cmd.Dir = cwd

View File

@ -133,7 +133,6 @@ func newCommand(ctx context.Context, id, containerdBinary, containerdAddress str
"-namespace", ns,
"-id", id,
"-address", containerdAddress,
"-publish-binary", containerdBinary,
}
cmd := exec.Command(self, args...)
cmd.Dir = cwd

View File

@ -20,17 +20,20 @@ import (
"context"
"flag"
"fmt"
"net"
"os"
"runtime"
"runtime/debug"
"strings"
"time"
v1 "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
shimapi "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -153,11 +156,16 @@ func run(id string, initFunc Init, config Config) error {
return err
}
}
publisher := &remoteEventsPublisher{
address: addressFlag,
containerdBinaryPath: containerdBinaryFlag,
noReaper: config.NoReaper,
address: fmt.Sprintf("%s.ttrpc", addressFlag),
}
conn, err := connect(publisher.address, dialer)
if err != nil {
return err
}
defer conn.Close()
publisher.client = v1.NewEventsClient(ttrpc.NewClient(conn))
if namespaceFlag == "" {
return fmt.Errorf("shim namespace cannot be empty")
}
@ -282,7 +290,22 @@ func dumpStacks(logger *logrus.Entry) {
}
type remoteEventsPublisher struct {
address string
containerdBinaryPath string
noReaper bool
address string
client v1.EventsService
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
any, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
_, err = l.client.Publish(ctx, &v1.PublishRequest{
Topic: topic,
Event: any,
})
return err
}
func connect(address string, d func(string, time.Duration) (net.Conn, error)) (net.Conn, error) {
return d(address, 100*time.Second)
}

View File

@ -19,31 +19,21 @@
package shim
import (
"bytes"
"context"
"io"
"net"
"os"
"os/exec"
"os/signal"
"sync"
"strings"
"syscall"
"time"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/fifo"
"github.com/containerd/typeurl"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)
var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
}
// setupSignals creates a new signal handler for all signals and sets the shim as a
// sub-reaper so that the container processes are reparented
func setupSignals(config Config) (chan os.Signal, error) {
@ -101,41 +91,7 @@ func openLog(ctx context.Context, _ string) (io.Writer, error) {
return fifo.OpenFifo(ctx, "log", unix.O_WRONLY, 0700)
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, _ := namespaces.Namespace(ctx)
encoded, err := typeurl.MarshalAny(event)
if err != nil {
return err
}
data, err := encoded.Marshal()
if err != nil {
return err
}
cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
cmd.Stdin = bytes.NewReader(data)
b := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(b)
cmd.Stdout = b
cmd.Stderr = b
if l.noReaper {
if err := cmd.Start(); err != nil {
return err
}
if err := cmd.Wait(); err != nil {
return errors.Wrapf(err, "failed to publish event: %s", b.String())
}
return nil
}
c, err := Default.Start(cmd)
if err != nil {
return err
}
status, err := Default.Wait(cmd, c)
if err != nil {
return errors.Wrapf(err, "failed to publish event: %s", b.String())
}
if status != 0 {
return errors.Errorf("failed to publish event: %s", b.String())
}
return nil
func dialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", address, timeout)
}

View File

@ -25,15 +25,13 @@ import (
"io"
"net"
"os"
"os/exec"
"sync"
"time"
"unsafe"
winio "github.com/Microsoft/go-winio"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/ttrpc"
"github.com/containerd/typeurl"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/windows"
@ -286,17 +284,34 @@ func openLog(ctx context.Context, id string) (io.Writer, error) {
return dswl, nil
}
func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event events.Event) error {
ns, _ := namespaces.Namespace(ctx)
encoded, err := typeurl.MarshalAny(event)
if err != nil {
return err
func dialer(address string, timeout time.Duration) (net.Conn, error) {
var c net.Conn
var lastError error
timedOutError := errors.Errorf("timed out waiting for npipe %s", address)
start := time.Now()
for {
remaining := timeout - time.Since(start)
if remaining <= 0 {
lastError = timedOutError
break
}
c, lastError = winio.DialPipe(address, &remaining)
if lastError == nil {
break
}
if !os.IsNotExist(lastError) {
break
}
// There is nobody serving the pipe. We limit the timeout for this case
// to 5 seconds because any shim that would serve this endpoint should
// serve it within 5 seconds. We use the passed in timeout for the
// `DialPipe` timeout if the pipe exists however to give the pipe time
// to `Accept` the connection.
if time.Since(start) >= 5*time.Second {
lastError = timedOutError
break
}
time.Sleep(10 * time.Millisecond)
}
data, err := encoded.Marshal()
if err != nil {
return err
}
cmd := exec.CommandContext(ctx, l.containerdBinaryPath, "--address", l.address, "publish", "--topic", topic, "--namespace", ns)
cmd.Stdin = bytes.NewReader(data)
return cmd.Run()
return c, lastError
}

View File

@ -20,10 +20,12 @@ import (
"context"
api "github.com/containerd/containerd/api/services/events/v1"
apittrpc "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/plugin"
"github.com/containerd/ttrpc"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"google.golang.org/grpc"
@ -40,12 +42,18 @@ func init() {
}
type service struct {
events *exchange.Exchange
ttService *ttrpcService
events *exchange.Exchange
}
// NewService returns the GRPC events server
func NewService(events *exchange.Exchange) api.EventsServer {
return &service{events: events}
return &service{
ttService: &ttrpcService{
events: events,
},
events: events,
}
}
func (s *service) Register(server *grpc.Server) error {
@ -53,6 +61,11 @@ func (s *service) Register(server *grpc.Server) error {
return nil
}
func (s *service) RegisterTTRPC(server *ttrpc.Server) error {
apittrpc.RegisterEventsService(server, s.ttService)
return nil
}
func (s *service) Publish(ctx context.Context, r *api.PublishRequest) (*ptypes.Empty, error) {
if err := s.events.Publish(ctx, r.Topic, r.Event); err != nil {
return nil, errdefs.ToGRPC(err)

38
services/events/ttrpc.go Normal file
View File

@ -0,0 +1,38 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
"context"
api "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events/exchange"
ptypes "github.com/gogo/protobuf/types"
)
type ttrpcService struct {
events *exchange.Exchange
}
func (s *ttrpcService) Publish(ctx context.Context, r *api.PublishRequest) (*ptypes.Empty, error) {
if err := s.events.Publish(ctx, r.Topic, r.Event); err != nil {
return nil, errdefs.ToGRPC(err)
}
return &ptypes.Empty{}, nil
}

View File

@ -44,6 +44,7 @@ import (
"github.com/containerd/containerd/snapshots"
ssproxy "github.com/containerd/containerd/snapshots/proxy"
"github.com/containerd/containerd/sys"
"github.com/containerd/ttrpc"
metrics "github.com/docker/go-metrics"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
@ -91,13 +92,19 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
if config.GRPC.MaxSendMsgSize > 0 {
serverOpts = append(serverOpts, grpc.MaxSendMsgSize(config.GRPC.MaxSendMsgSize))
}
rpc := grpc.NewServer(serverOpts...)
ttrpcServer, err := newTTRPCServer()
if err != nil {
return nil, err
}
grpcServer := grpc.NewServer(serverOpts...)
var (
services []plugin.Service
s = &Server{
rpc: rpc,
events: exchange.NewExchange(),
config: config,
grpcServices []plugin.Service
ttrpcServices []plugin.TTRPCService
s = &Server{
grpcServer: grpcServer,
ttrpcServer: ttrpcServer,
events: exchange.NewExchange(),
config: config,
}
initialized = plugin.NewPluginSet()
)
@ -138,14 +145,22 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
continue
}
// check for grpc services that should be registered with the server
if service, ok := instance.(plugin.Service); ok {
services = append(services, service)
if src, ok := instance.(plugin.Service); ok {
grpcServices = append(grpcServices, src)
}
if src, ok := instance.(plugin.TTRPCService); ok {
ttrpcServices = append(ttrpcServices, src)
}
s.plugins = append(s.plugins, result)
}
// register services after all plugins have been initialized
for _, service := range services {
if err := service.Register(rpc); err != nil {
for _, service := range grpcServices {
if err := service.Register(grpcServer); err != nil {
return nil, err
}
}
for _, service := range ttrpcServices {
if err := service.RegisterTTRPC(ttrpcServer); err != nil {
return nil, err
}
}
@ -154,10 +169,11 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
// Server is the containerd main daemon
type Server struct {
rpc *grpc.Server
events *exchange.Exchange
config *srvconfig.Config
plugins []*plugin.Plugin
grpcServer *grpc.Server
ttrpcServer *ttrpc.Server
events *exchange.Exchange
config *srvconfig.Config
plugins []*plugin.Plugin
}
// ServeGRPC provides the containerd grpc APIs on the provided listener
@ -169,8 +185,13 @@ func (s *Server) ServeGRPC(l net.Listener) error {
// before we start serving the grpc API register the grpc_prometheus metrics
// handler. This needs to be the last service registered so that it can collect
// metrics for every other service
grpc_prometheus.Register(s.rpc)
return trapClosedConnErr(s.rpc.Serve(l))
grpc_prometheus.Register(s.grpcServer)
return trapClosedConnErr(s.grpcServer.Serve(l))
}
// ServeTTRPC provides the containerd ttrpc APIs on the provided listener
func (s *Server) ServeTTRPC(l net.Listener) error {
return trapClosedConnErr(s.ttrpcServer.Serve(context.Background(), l))
}
// ServeMetrics provides a prometheus endpoint for exposing metrics
@ -196,7 +217,7 @@ func (s *Server) ServeDebug(l net.Listener) error {
// Stop the containerd server canceling any open connections
func (s *Server) Stop() {
s.rpc.Stop()
s.grpcServer.Stop()
for i := len(s.plugins) - 1; i >= 0; i-- {
p := s.plugins[i]
instance, err := p.Instance()

View File

@ -24,6 +24,7 @@ import (
"github.com/containerd/containerd/log"
srvconfig "github.com/containerd/containerd/services/server/config"
"github.com/containerd/containerd/sys"
"github.com/containerd/ttrpc"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
@ -53,3 +54,7 @@ func apply(ctx context.Context, config *srvconfig.Config) error {
}
return nil
}
func newTTRPCServer() (*ttrpc.Server, error) {
return ttrpc.NewServer(ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()))
}

View File

@ -22,8 +22,13 @@ import (
"context"
srvconfig "github.com/containerd/containerd/services/server/config"
"github.com/containerd/ttrpc"
)
func apply(_ context.Context, _ *srvconfig.Config) error {
return nil
}
func newTTRPCServer() (*ttrpc.Server, error) {
return ttrpc.NewServer()
}

View File

@ -22,8 +22,13 @@ import (
"context"
srvconfig "github.com/containerd/containerd/services/server/config"
"github.com/containerd/ttrpc"
)
func apply(_ context.Context, _ *srvconfig.Config) error {
return nil
}
func newTTRPCServer() (*ttrpc.Server, error) {
return ttrpc.NewServer()
}