diff --git a/cmd/containerd/command/main.go b/cmd/containerd/command/main.go index 91e3e5a3b..098a6f5a3 100644 --- a/cmd/containerd/command/main.go +++ b/cmd/containerd/command/main.go @@ -198,6 +198,13 @@ func App() *cli.App { } serve(ctx, tl, server.ServeTTRPC) + if config.GRPC.TCPAddress != "" { + l, err := net.Listen("tcp", config.GRPC.TCPAddress) + if err != nil { + return errors.Wrapf(err, "failed to get listener for TCP grpc endpoint") + } + serve(ctx, l, server.ServeTCP) + } // setup the main grpc endpoint l, err := sys.GetLocalListener(address, config.GRPC.UID, config.GRPC.GID) if err != nil { diff --git a/plugin/plugin.go b/plugin/plugin.go index e727dd5b8..9ae8bbeb5 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -129,6 +129,11 @@ type TTRPCService interface { RegisterTTRPC(*ttrpc.Server) error } +// TCPService allows GRPC services to be registered with the underlying tcp server +type TCPService interface { + RegisterTCP(*grpc.Server) error +} + var register = struct { sync.RWMutex r []*Registration diff --git a/services/server/config/config.go b/services/server/config/config.go index 3b1a4fd25..26fb92599 100644 --- a/services/server/config/config.go +++ b/services/server/config/config.go @@ -57,6 +57,9 @@ type Config struct { // GRPCConfig provides GRPC configuration for the socket type GRPCConfig struct { Address string `toml:"address"` + TCPAddress string `toml:"tcp_address"` + TCPTLSCert string `toml:"tcp_tls_cert"` + TCPTLSKey string `toml:"tcp_tls_key"` UID int `toml:"uid"` GID int `toml:"gid"` MaxRecvMsgSize int `toml:"max_recv_message_size"` diff --git a/services/server/server.go b/services/server/server.go index 4d6e3a058..c5f30d924 100644 --- a/services/server/server.go +++ b/services/server/server.go @@ -50,6 +50,7 @@ import ( "github.com/pkg/errors" bolt "go.etcd.io/bbolt" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) // CreateTopLevelDirectories creates the top-level root and state directories. @@ -81,7 +82,6 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { if err != nil { return nil, err } - serverOpts := []grpc.ServerOption{ grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), @@ -96,12 +96,26 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { if err != nil { return nil, err } - grpcServer := grpc.NewServer(serverOpts...) + tcpServerOpts := serverOpts + if config.GRPC.TCPTLSCert != "" { + log.G(ctx).Info("setting up tls on tcp GRPC services...") + creds, err := credentials.NewServerTLSFromFile(config.GRPC.TCPTLSCert, config.GRPC.TCPTLSKey) + if err != nil { + return nil, err + } + tcpServerOpts = append(tcpServerOpts, grpc.Creds(creds)) + } var ( + grpcServer = grpc.NewServer(serverOpts...) + hrpc = grpc.NewServer(tcpServerOpts...) + grpcServices []plugin.Service + tcpServices []plugin.TCPService ttrpcServices []plugin.TTRPCService - s = &Server{ + + s = &Server{ grpcServer: grpcServer, + hrpc: hrpc, ttrpcServer: ttrpcServer, events: exchange.NewExchange(), config: config, @@ -159,6 +173,10 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { if src, ok := instance.(plugin.TTRPCService); ok { ttrpcServices = append(ttrpcServices, src) } + if service, ok := instance.(plugin.TCPService); ok { + tcpServices = append(tcpServices, service) + } + s.plugins = append(s.plugins, result) } if len(required) != 0 { @@ -180,6 +198,11 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { return nil, err } } + for _, service := range tcpServices { + if err := service.RegisterTCP(hrpc); err != nil { + return nil, err + } + } return s, nil } @@ -187,6 +210,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { type Server struct { grpcServer *grpc.Server ttrpcServer *ttrpc.Server + hrpc *grpc.Server events *exchange.Exchange config *srvconfig.Config plugins []*plugin.Plugin @@ -217,6 +241,12 @@ func (s *Server) ServeMetrics(l net.Listener) error { return trapClosedConnErr(http.Serve(l, m)) } +// ServeTCP allows services to serve over tcp +func (s *Server) ServeTCP(l net.Listener) error { + grpc_prometheus.Register(s.hrpc) + return trapClosedConnErr(s.hrpc.Serve(l)) +} + // ServeDebug provides a debug endpoint func (s *Server) ServeDebug(l net.Listener) error { // don't use the default http server mux to make sure nothing gets registered