ttrpc/server.go
Stephen J Day 42ff40f1f1
mgrpc: initial implementation of server
With this change, we define a simple server and client framework to
start generating code against. We define a simple handler system with
back registration into the server definition.

From here, we can start generating code against the handlers.

Signed-off-by: Stephen J Day <stephen.day@docker.com>
2017-11-14 12:18:06 -08:00

132 lines
2.9 KiB
Go

package mgrpc
import (
"context"
"net"
"path"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/typeurl"
"github.com/pkg/errors"
)
type Server struct {
handlers map[string]map[string]Handler
}
func NewServer() *Server {
return &Server{handlers: make(map[string]map[string]Handler)}
}
func (s *Server) Register(name string, methods map[string]Handler) error {
if _, ok := s.handlers[name]; ok {
return errors.Errorf("duplicate service %v registered", name)
}
s.handlers[name] = methods
return nil
}
func (s *Server) Shutdown(ctx context.Context) error {
// TODO(stevvooe): Wait on connection shutdown.
return nil
}
func (s *Server) Serve(l net.Listener) error {
for {
conn, err := l.Accept()
if err != nil {
log.L.WithError(err).Error("failed accept")
}
go s.handleConn(conn)
}
return nil
}
const maxMessageSize = 1 << 20 // TODO(stevvooe): Cut these down, since they are pre-alloced.
func (s *Server) handleConn(conn net.Conn) {
defer conn.Close()
var (
ch = newChannel(conn)
req Request
ctx, cancel = context.WithCancel(context.Background())
)
defer cancel()
// TODO(stevvooe): Recover here or in dispatch to handle panics in service
// methods.
// every connection is just a simple in/out request loop. No complexity for
// multiplexing streams or dealing with head of line blocking, as this
// isn't necessary for shim control.
for {
if err := ch.recv(ctx, &req); err != nil {
log.L.WithError(err).Error("failed receiving message on channel")
return
}
resp, err := s.dispatch(ctx, &req)
if err != nil {
log.L.WithError(err).Error("failed to dispatch request")
return
}
if err := ch.send(ctx, resp); err != nil {
log.L.WithError(err).Error("failed sending message on channel")
return
}
}
}
func (s *Server) dispatch(ctx context.Context, req *Request) (*Response, error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithField("method", path.Join("/", req.Service, req.Method)))
handler, err := s.resolve(req.Service, req.Method)
if err != nil {
log.L.WithError(err).Error("failed to resolve handler")
return nil, err
}
payload, err := typeurl.UnmarshalAny(req.Payload)
if err != nil {
return nil, err
}
resp, err := handler.Handle(ctx, payload)
if err != nil {
log.L.WithError(err).Error("handler returned an error")
return nil, err
}
apayload, err := typeurl.MarshalAny(resp)
if err != nil {
return nil, err
}
rresp := &Response{
// Status: *st,
Payload: apayload,
}
return rresp, nil
}
func (s *Server) resolve(service, method string) (Handler, error) {
srv, ok := s.handlers[service]
if !ok {
return nil, errors.Wrapf(errdefs.ErrNotFound, "could not resolve service %v", service)
}
handler, ok := srv[method]
if !ok {
return nil, errors.Wrapf(errdefs.ErrNotFound, "could not resolve method %v", method)
}
return handler, nil
}