Merge pull request #3162 from crosbymichael/tcpservice
Add tcp service for grpc listeners
This commit is contained in:
commit
5f4c977ba0
@ -198,6 +198,13 @@ func App() *cli.App {
|
|||||||
}
|
}
|
||||||
serve(ctx, tl, server.ServeTTRPC)
|
serve(ctx, tl, server.ServeTTRPC)
|
||||||
|
|
||||||
|
if config.GRPC.TCPAddress != "" {
|
||||||
|
l, err := net.Listen("tcp", config.GRPC.TCPAddress)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrapf(err, "failed to get listener for TCP grpc endpoint")
|
||||||
|
}
|
||||||
|
serve(ctx, l, server.ServeTCP)
|
||||||
|
}
|
||||||
// setup the main grpc endpoint
|
// setup the main grpc endpoint
|
||||||
l, err := sys.GetLocalListener(address, config.GRPC.UID, config.GRPC.GID)
|
l, err := sys.GetLocalListener(address, config.GRPC.UID, config.GRPC.GID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -129,6 +129,11 @@ type TTRPCService interface {
|
|||||||
RegisterTTRPC(*ttrpc.Server) error
|
RegisterTTRPC(*ttrpc.Server) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TCPService allows GRPC services to be registered with the underlying tcp server
|
||||||
|
type TCPService interface {
|
||||||
|
RegisterTCP(*grpc.Server) error
|
||||||
|
}
|
||||||
|
|
||||||
var register = struct {
|
var register = struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
r []*Registration
|
r []*Registration
|
||||||
|
@ -57,6 +57,9 @@ type Config struct {
|
|||||||
// GRPCConfig provides GRPC configuration for the socket
|
// GRPCConfig provides GRPC configuration for the socket
|
||||||
type GRPCConfig struct {
|
type GRPCConfig struct {
|
||||||
Address string `toml:"address"`
|
Address string `toml:"address"`
|
||||||
|
TCPAddress string `toml:"tcp_address"`
|
||||||
|
TCPTLSCert string `toml:"tcp_tls_cert"`
|
||||||
|
TCPTLSKey string `toml:"tcp_tls_key"`
|
||||||
UID int `toml:"uid"`
|
UID int `toml:"uid"`
|
||||||
GID int `toml:"gid"`
|
GID int `toml:"gid"`
|
||||||
MaxRecvMsgSize int `toml:"max_recv_message_size"`
|
MaxRecvMsgSize int `toml:"max_recv_message_size"`
|
||||||
|
@ -50,6 +50,7 @@ import (
|
|||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
bolt "go.etcd.io/bbolt"
|
bolt "go.etcd.io/bbolt"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CreateTopLevelDirectories creates the top-level root and state directories.
|
// CreateTopLevelDirectories creates the top-level root and state directories.
|
||||||
@ -81,7 +82,6 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
serverOpts := []grpc.ServerOption{
|
serverOpts := []grpc.ServerOption{
|
||||||
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
|
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
|
||||||
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
|
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
|
||||||
@ -96,12 +96,26 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
grpcServer := grpc.NewServer(serverOpts...)
|
tcpServerOpts := serverOpts
|
||||||
|
if config.GRPC.TCPTLSCert != "" {
|
||||||
|
log.G(ctx).Info("setting up tls on tcp GRPC services...")
|
||||||
|
creds, err := credentials.NewServerTLSFromFile(config.GRPC.TCPTLSCert, config.GRPC.TCPTLSKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
tcpServerOpts = append(tcpServerOpts, grpc.Creds(creds))
|
||||||
|
}
|
||||||
var (
|
var (
|
||||||
|
grpcServer = grpc.NewServer(serverOpts...)
|
||||||
|
hrpc = grpc.NewServer(tcpServerOpts...)
|
||||||
|
|
||||||
grpcServices []plugin.Service
|
grpcServices []plugin.Service
|
||||||
|
tcpServices []plugin.TCPService
|
||||||
ttrpcServices []plugin.TTRPCService
|
ttrpcServices []plugin.TTRPCService
|
||||||
|
|
||||||
s = &Server{
|
s = &Server{
|
||||||
grpcServer: grpcServer,
|
grpcServer: grpcServer,
|
||||||
|
hrpc: hrpc,
|
||||||
ttrpcServer: ttrpcServer,
|
ttrpcServer: ttrpcServer,
|
||||||
events: exchange.NewExchange(),
|
events: exchange.NewExchange(),
|
||||||
config: config,
|
config: config,
|
||||||
@ -159,6 +173,10 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
|
|||||||
if src, ok := instance.(plugin.TTRPCService); ok {
|
if src, ok := instance.(plugin.TTRPCService); ok {
|
||||||
ttrpcServices = append(ttrpcServices, src)
|
ttrpcServices = append(ttrpcServices, src)
|
||||||
}
|
}
|
||||||
|
if service, ok := instance.(plugin.TCPService); ok {
|
||||||
|
tcpServices = append(tcpServices, service)
|
||||||
|
}
|
||||||
|
|
||||||
s.plugins = append(s.plugins, result)
|
s.plugins = append(s.plugins, result)
|
||||||
}
|
}
|
||||||
if len(required) != 0 {
|
if len(required) != 0 {
|
||||||
@ -180,6 +198,11 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for _, service := range tcpServices {
|
||||||
|
if err := service.RegisterTCP(hrpc); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -187,6 +210,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
|
|||||||
type Server struct {
|
type Server struct {
|
||||||
grpcServer *grpc.Server
|
grpcServer *grpc.Server
|
||||||
ttrpcServer *ttrpc.Server
|
ttrpcServer *ttrpc.Server
|
||||||
|
hrpc *grpc.Server
|
||||||
events *exchange.Exchange
|
events *exchange.Exchange
|
||||||
config *srvconfig.Config
|
config *srvconfig.Config
|
||||||
plugins []*plugin.Plugin
|
plugins []*plugin.Plugin
|
||||||
@ -217,6 +241,12 @@ func (s *Server) ServeMetrics(l net.Listener) error {
|
|||||||
return trapClosedConnErr(http.Serve(l, m))
|
return trapClosedConnErr(http.Serve(l, m))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ServeTCP allows services to serve over tcp
|
||||||
|
func (s *Server) ServeTCP(l net.Listener) error {
|
||||||
|
grpc_prometheus.Register(s.hrpc)
|
||||||
|
return trapClosedConnErr(s.hrpc.Serve(l))
|
||||||
|
}
|
||||||
|
|
||||||
// ServeDebug provides a debug endpoint
|
// ServeDebug provides a debug endpoint
|
||||||
func (s *Server) ServeDebug(l net.Listener) error {
|
func (s *Server) ServeDebug(l net.Listener) error {
|
||||||
// don't use the default http server mux to make sure nothing gets registered
|
// don't use the default http server mux to make sure nothing gets registered
|
||||||
|
Loading…
Reference in New Issue
Block a user