Add ttrpc server to containerd

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby
2019-04-08 17:15:58 -04:00
parent db3a8637c1
commit a8a805cad3
12 changed files with 1569 additions and 20 deletions

View File

@@ -20,10 +20,12 @@ import (
"context"
api "github.com/containerd/containerd/api/services/events/v1"
apittrpc "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/plugin"
"github.com/containerd/ttrpc"
ptypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"google.golang.org/grpc"
@@ -40,12 +42,18 @@ func init() {
}
type service struct {
events *exchange.Exchange
ttService *ttrpcService
events *exchange.Exchange
}
// NewService returns the GRPC events server
func NewService(events *exchange.Exchange) api.EventsServer {
return &service{events: events}
return &service{
ttService: &ttrpcService{
events: events,
},
events: events,
}
}
func (s *service) Register(server *grpc.Server) error {
@@ -53,6 +61,11 @@ func (s *service) Register(server *grpc.Server) error {
return nil
}
func (s *service) RegisterTTRPC(server *ttrpc.Server) error {
apittrpc.RegisterEventsService(server, s.ttService)
return nil
}
func (s *service) Publish(ctx context.Context, r *api.PublishRequest) (*ptypes.Empty, error) {
if err := s.events.Publish(ctx, r.Topic, r.Event); err != nil {
return nil, errdefs.ToGRPC(err)

60
services/events/ttrpc.go Normal file
View File

@@ -0,0 +1,60 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
"context"
api "github.com/containerd/containerd/api/services/ttrpc/events/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/events/exchange"
ptypes "github.com/gogo/protobuf/types"
)
type ttrpcService struct {
events *exchange.Exchange
}
func (s *ttrpcService) Publish(ctx context.Context, r *api.PublishRequest) (*ptypes.Empty, error) {
if err := s.events.Publish(ctx, r.Topic, r.Event); err != nil {
return nil, errdefs.ToGRPC(err)
}
return &ptypes.Empty{}, nil
}
func (s *ttrpcService) Forward(ctx context.Context, r *api.ForwardRequest) (*ptypes.Empty, error) {
if err := s.events.Forward(ctx, fromTTProto(r.Envelope)); err != nil {
return nil, errdefs.ToGRPC(err)
}
return &ptypes.Empty{}, nil
}
func (s *ttrpcService) Subscribe(ctx context.Context, req *api.SubscribeRequest) (*api.Envelope, error) {
return nil, errdefs.ToGRPCf(errdefs.ErrNotImplemented, "ttrpc does not support streaming")
}
func fromTTProto(env *api.Envelope) *events.Envelope {
return &events.Envelope{
Timestamp: env.Timestamp,
Namespace: env.Namespace,
Topic: env.Topic,
Event: env.Event,
}
}

View File

@@ -44,6 +44,7 @@ import (
"github.com/containerd/containerd/snapshots"
ssproxy "github.com/containerd/containerd/snapshots/proxy"
"github.com/containerd/containerd/sys"
"github.com/containerd/ttrpc"
metrics "github.com/docker/go-metrics"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
@@ -91,13 +92,19 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
if config.GRPC.MaxSendMsgSize > 0 {
serverOpts = append(serverOpts, grpc.MaxSendMsgSize(config.GRPC.MaxSendMsgSize))
}
rpc := grpc.NewServer(serverOpts...)
ttrpcServer, err := newTTRPCServer()
if err != nil {
return nil, err
}
grpcServer := grpc.NewServer(serverOpts...)
var (
services []plugin.Service
s = &Server{
rpc: rpc,
events: exchange.NewExchange(),
config: config,
grpcServices []plugin.Service
ttrpcServices []plugin.TTRPCService
s = &Server{
grpcServer: grpcServer,
ttrpcServer: ttrpcServer,
events: exchange.NewExchange(),
config: config,
}
initialized = plugin.NewPluginSet()
)
@@ -138,14 +145,22 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
continue
}
// check for grpc services that should be registered with the server
if service, ok := instance.(plugin.Service); ok {
services = append(services, service)
if src, ok := instance.(plugin.Service); ok {
grpcServices = append(grpcServices, src)
}
if src, ok := instance.(plugin.TTRPCService); ok {
ttrpcServices = append(ttrpcServices, src)
}
s.plugins = append(s.plugins, result)
}
// register services after all plugins have been initialized
for _, service := range services {
if err := service.Register(rpc); err != nil {
for _, service := range grpcServices {
if err := service.Register(grpcServer); err != nil {
return nil, err
}
}
for _, service := range ttrpcServices {
if err := service.RegisterTTRPC(ttrpcServer); err != nil {
return nil, err
}
}
@@ -154,10 +169,11 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
// Server is the containerd main daemon
type Server struct {
rpc *grpc.Server
events *exchange.Exchange
config *srvconfig.Config
plugins []*plugin.Plugin
grpcServer *grpc.Server
ttrpcServer *ttrpc.Server
events *exchange.Exchange
config *srvconfig.Config
plugins []*plugin.Plugin
}
// ServeGRPC provides the containerd grpc APIs on the provided listener
@@ -169,8 +185,13 @@ func (s *Server) ServeGRPC(l net.Listener) error {
// before we start serving the grpc API register the grpc_prometheus metrics
// handler. This needs to be the last service registered so that it can collect
// metrics for every other service
grpc_prometheus.Register(s.rpc)
return trapClosedConnErr(s.rpc.Serve(l))
grpc_prometheus.Register(s.grpcServer)
return trapClosedConnErr(s.grpcServer.Serve(l))
}
// ServeTTRPC provides the containerd ttrpc APIs on the provided listener
func (s *Server) ServeTTRPC(l net.Listener) error {
return trapClosedConnErr(s.ttrpcServer.Serve(context.Background(), l))
}
// ServeMetrics provides a prometheus endpoint for exposing metrics
@@ -196,7 +217,7 @@ func (s *Server) ServeDebug(l net.Listener) error {
// Stop the containerd server canceling any open connections
func (s *Server) Stop() {
s.rpc.Stop()
s.grpcServer.Stop()
for i := len(s.plugins) - 1; i >= 0; i-- {
p := s.plugins[i]
instance, err := p.Instance()

View File

@@ -24,6 +24,7 @@ import (
"github.com/containerd/containerd/log"
srvconfig "github.com/containerd/containerd/services/server/config"
"github.com/containerd/containerd/sys"
"github.com/containerd/ttrpc"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
@@ -53,3 +54,7 @@ func apply(ctx context.Context, config *srvconfig.Config) error {
}
return nil
}
func newTTRPCServer() (*ttrpc.Server, error) {
return ttrpc.NewServer(ttrpc.WithServerHandshaker(ttrpc.UnixSocketRequireSameUser()))
}

View File

@@ -22,8 +22,13 @@ import (
"context"
srvconfig "github.com/containerd/containerd/services/server/config"
"github.com/containerd/ttrpc"
)
func apply(_ context.Context, _ *srvconfig.Config) error {
return nil
}
func newTTRPCServer() (*ttrpc.Server, error) {
return ttrpc.NewServer()
}