Add tcp service for grpc listeners

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2019-03-04 16:23:38 -05:00
parent 475619c29e
commit b911ae3428
4 changed files with 48 additions and 3 deletions

View File

@ -198,6 +198,13 @@ func App() *cli.App {
}
serve(ctx, tl, server.ServeTTRPC)
if config.GRPC.TCPAddress != "" {
l, err := net.Listen("tcp", config.GRPC.TCPAddress)
if err != nil {
return errors.Wrapf(err, "failed to get listener for TCP grpc endpoint")
}
serve(ctx, l, server.ServeTCP)
}
// setup the main grpc endpoint
l, err := sys.GetLocalListener(address, config.GRPC.UID, config.GRPC.GID)
if err != nil {

View File

@ -129,6 +129,11 @@ type TTRPCService interface {
RegisterTTRPC(*ttrpc.Server) error
}
// TCPService allows GRPC services to be registered with the underlying tcp server
type TCPService interface {
RegisterTCP(*grpc.Server) error
}
var register = struct {
sync.RWMutex
r []*Registration

View File

@ -54,6 +54,9 @@ type Config struct {
// GRPCConfig provides GRPC configuration for the socket
type GRPCConfig struct {
Address string `toml:"address"`
TCPAddress string `toml:"tcp_address"`
TCPTLSCert string `toml:"tcp_tls_cert"`
TCPTLSKey string `toml:"tcp_tls_key"`
UID int `toml:"uid"`
GID int `toml:"gid"`
MaxRecvMsgSize int `toml:"max_recv_message_size"`

View File

@ -50,6 +50,7 @@ import (
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// CreateTopLevelDirectories creates the top-level root and state directories.
@ -81,7 +82,6 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
if err != nil {
return nil, err
}
serverOpts := []grpc.ServerOption{
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
@ -96,12 +96,26 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
if err != nil {
return nil, err
}
grpcServer := grpc.NewServer(serverOpts...)
tcpServerOpts := serverOpts
if config.GRPC.TCPTLSCert != "" {
log.G(ctx).Info("setting up tls on tcp GRPC services...")
creds, err := credentials.NewServerTLSFromFile(config.GRPC.TCPTLSCert, config.GRPC.TCPTLSKey)
if err != nil {
return nil, err
}
tcpServerOpts = append(tcpServerOpts, grpc.Creds(creds))
}
var (
grpcServer = grpc.NewServer(serverOpts...)
hrpc = grpc.NewServer(tcpServerOpts...)
grpcServices []plugin.Service
tcpServices []plugin.TCPService
ttrpcServices []plugin.TTRPCService
s = &Server{
s = &Server{
grpcServer: grpcServer,
hrpc: hrpc,
ttrpcServer: ttrpcServer,
events: exchange.NewExchange(),
config: config,
@ -151,6 +165,10 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
if src, ok := instance.(plugin.TTRPCService); ok {
ttrpcServices = append(ttrpcServices, src)
}
if service, ok := instance.(plugin.TCPService); ok {
tcpServices = append(tcpServices, service)
}
s.plugins = append(s.plugins, result)
}
// register services after all plugins have been initialized
@ -164,6 +182,11 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
return nil, err
}
}
for _, service := range tcpServices {
if err := service.RegisterTCP(hrpc); err != nil {
return nil, err
}
}
return s, nil
}
@ -171,6 +194,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
type Server struct {
grpcServer *grpc.Server
ttrpcServer *ttrpc.Server
hrpc *grpc.Server
events *exchange.Exchange
config *srvconfig.Config
plugins []*plugin.Plugin
@ -201,6 +225,12 @@ func (s *Server) ServeMetrics(l net.Listener) error {
return trapClosedConnErr(http.Serve(l, m))
}
// ServeTCP allows services to serve over tcp
func (s *Server) ServeTCP(l net.Listener) error {
grpc_prometheus.Register(s.hrpc)
return trapClosedConnErr(s.hrpc.Serve(l))
}
// ServeDebug provides a debug endpoint
func (s *Server) ServeDebug(l net.Listener) error {
// don't use the default http server mux to make sure nothing gets registered