From 0b7abc02b22309a648a8ba5ebe2a2d189c474cbe Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 13 Jun 2019 19:09:07 +0000 Subject: [PATCH] ttrpc updates for interceptors, close, and metadata Signed-off-by: Michael Crosby --- namespaces/ttrpc.go | 2 +- vendor.conf | 2 +- vendor/github.com/containerd/ttrpc/channel.go | 5 +- vendor/github.com/containerd/ttrpc/client.go | 207 +++++++++++------- vendor/github.com/containerd/ttrpc/config.go | 15 +- .../containerd/ttrpc/interceptor.go | 50 +++++ .../github.com/containerd/ttrpc/metadata.go | 55 +++-- vendor/github.com/containerd/ttrpc/server.go | 15 +- .../github.com/containerd/ttrpc/services.go | 14 +- vendor/github.com/containerd/ttrpc/types.go | 19 +- 10 files changed, 268 insertions(+), 116 deletions(-) create mode 100644 vendor/github.com/containerd/ttrpc/interceptor.go diff --git a/namespaces/ttrpc.go b/namespaces/ttrpc.go index 28e039417..2224334d1 100644 --- a/namespaces/ttrpc.go +++ b/namespaces/ttrpc.go @@ -30,7 +30,7 @@ const ( func withTTRPCNamespaceHeader(ctx context.Context, namespace string) context.Context { md, ok := ttrpc.GetMetadata(ctx) if !ok { - md = ttrpc.Metadata{} + md = ttrpc.MD{} } md.Set(TTRPCHeader, namespace) return ttrpc.WithMetadata(ctx, md) diff --git a/vendor.conf b/vendor.conf index 8bf450510..73212389a 100644 --- a/vendor.conf +++ b/vendor.conf @@ -37,7 +37,7 @@ github.com/Microsoft/go-winio 84b4ab48a50763fe7b3abcef38e5205c12027fac github.com/Microsoft/hcsshim 8abdbb8205e4192c68b5f84c31197156f31be517 google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944 golang.org/x/text 19e51611da83d6be54ddafce4a4af510cb3e9ea4 -github.com/containerd/ttrpc a5bd8ce9e40bc7c065a11c6936f4d032ce6bfa2b +github.com/containerd/ttrpc 1fb3814edf44a76e0ccf503decf726d994919a9a github.com/syndtr/gocapability d98352740cb2c55f81556b63d4a1ec64c5a319c2 gotest.tools v2.3.0 github.com/google/go-cmp v0.2.0 diff --git a/vendor/github.com/containerd/ttrpc/channel.go b/vendor/github.com/containerd/ttrpc/channel.go index 22f5496b4..aa8c9541c 100644 --- a/vendor/github.com/containerd/ttrpc/channel.go +++ b/vendor/github.com/containerd/ttrpc/channel.go @@ -18,7 +18,6 @@ package ttrpc import ( "bufio" - "context" "encoding/binary" "io" "net" @@ -98,7 +97,7 @@ func newChannel(conn net.Conn) *channel { // returned will be valid and caller should send that along to // the correct consumer. The bytes on the underlying channel // will be discarded. -func (ch *channel) recv(ctx context.Context) (messageHeader, []byte, error) { +func (ch *channel) recv() (messageHeader, []byte, error) { mh, err := readMessageHeader(ch.hrbuf[:], ch.br) if err != nil { return messageHeader{}, nil, err @@ -120,7 +119,7 @@ func (ch *channel) recv(ctx context.Context) (messageHeader, []byte, error) { return mh, p, nil } -func (ch *channel) send(ctx context.Context, streamID uint32, t messageType, p []byte) error { +func (ch *channel) send(streamID uint32, t messageType, p []byte) error { if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t}); err != nil { return err } diff --git a/vendor/github.com/containerd/ttrpc/client.go b/vendor/github.com/containerd/ttrpc/client.go index 748a0073a..9db15fe69 100644 --- a/vendor/github.com/containerd/ttrpc/client.go +++ b/vendor/github.com/containerd/ttrpc/client.go @@ -36,36 +36,52 @@ import ( // closed. var ErrClosed = errors.New("ttrpc: closed") +// Client for a ttrpc server type Client struct { codec codec conn net.Conn channel *channel calls chan *callRequest - closed chan struct{} - closeOnce sync.Once - closeFunc func() - done chan struct{} - err error + ctx context.Context + closed func() + + closeOnce sync.Once + userCloseFunc func() + + errOnce sync.Once + err error + interceptor UnaryClientInterceptor } +// ClientOpts configures a client type ClientOpts func(c *Client) +// WithOnClose sets the close func whenever the client's Close() method is called func WithOnClose(onClose func()) ClientOpts { return func(c *Client) { - c.closeFunc = onClose + c.userCloseFunc = onClose + } +} + +// WithUnaryClientInterceptor sets the provided client interceptor +func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts { + return func(c *Client) { + c.interceptor = i } } func NewClient(conn net.Conn, opts ...ClientOpts) *Client { + ctx, cancel := context.WithCancel(context.Background()) c := &Client{ - codec: codec{}, - conn: conn, - channel: newChannel(conn), - calls: make(chan *callRequest), - closed: make(chan struct{}), - done: make(chan struct{}), - closeFunc: func() {}, + codec: codec{}, + conn: conn, + channel: newChannel(conn), + calls: make(chan *callRequest), + closed: cancel, + ctx: ctx, + userCloseFunc: func() {}, + interceptor: defaultClientInterceptor, } for _, o := range opts { @@ -100,14 +116,17 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int ) if metadata, ok := GetMetadata(ctx); ok { - creq.Metadata = metadata + metadata.setRequest(creq) } if dl, ok := ctx.Deadline(); ok { creq.TimeoutNano = dl.Sub(time.Now()).Nanoseconds() } - if err := c.dispatch(ctx, creq, cresp); err != nil { + info := &UnaryClientInfo{ + FullMethod: fullPath(service, method), + } + if err := c.interceptor(ctx, creq, cresp, info, c.dispatch); err != nil { return err } @@ -135,8 +154,8 @@ func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) err case <-ctx.Done(): return ctx.Err() case c.calls <- call: - case <-c.done: - return c.err + case <-c.ctx.Done(): + return c.error() } select { @@ -144,16 +163,15 @@ func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) err return ctx.Err() case err := <-errs: return filterCloseErr(err) - case <-c.done: - return c.err + case <-c.ctx.Done(): + return c.error() } } func (c *Client) Close() error { c.closeOnce.Do(func() { - close(c.closed) + c.closed() }) - return nil } @@ -163,51 +181,82 @@ type message struct { err error } -func (c *Client) run() { - var ( - streamID uint32 = 1 - waiters = make(map[uint32]*callRequest) - calls = c.calls - incoming = make(chan *message) - shutdown = make(chan struct{}) - shutdownErr error - ) +type receiver struct { + wg *sync.WaitGroup + messages chan *message + err error +} - go func() { - defer close(shutdown) +func (r *receiver) run(ctx context.Context, c *channel) { + defer r.wg.Done() - // start one more goroutine to recv messages without blocking. - for { - mh, p, err := c.channel.recv(context.TODO()) + for { + select { + case <-ctx.Done(): + r.err = ctx.Err() + return + default: + mh, p, err := c.recv() if err != nil { _, ok := status.FromError(err) if !ok { // treat all errors that are not an rpc status as terminal. // all others poison the connection. - shutdownErr = err + r.err = filterCloseErr(err) return } } select { - case incoming <- &message{ + case r.messages <- &message{ messageHeader: mh, p: p[:mh.Length], err: err, }: - case <-c.done: + case <-ctx.Done(): + r.err = ctx.Err() return } } - }() + } +} - defer c.conn.Close() - defer close(c.done) - defer c.closeFunc() +func (c *Client) run() { + var ( + streamID uint32 = 1 + waiters = make(map[uint32]*callRequest) + calls = c.calls + incoming = make(chan *message) + receiversDone = make(chan struct{}) + wg sync.WaitGroup + ) + + // broadcast the shutdown error to the remaining waiters. + abortWaiters := func(wErr error) { + for _, waiter := range waiters { + waiter.errs <- wErr + } + } + recv := &receiver{ + wg: &wg, + messages: incoming, + } + wg.Add(1) + + go func() { + wg.Wait() + close(receiversDone) + }() + go recv.run(c.ctx, c.channel) + + defer func() { + c.conn.Close() + c.userCloseFunc() + }() for { select { case call := <-calls: - if err := c.send(call.ctx, streamID, messageTypeRequest, call.req); err != nil { + if err := c.send(streamID, messageTypeRequest, call.req); err != nil { call.errs <- err continue } @@ -223,41 +272,42 @@ func (c *Client) run() { call.errs <- c.recv(call.resp, msg) delete(waiters, msg.StreamID) - case <-shutdown: - if shutdownErr != nil { - shutdownErr = filterCloseErr(shutdownErr) - } else { - shutdownErr = ErrClosed - } - - shutdownErr = errors.Wrapf(shutdownErr, "ttrpc: client shutting down") - - c.err = shutdownErr - for _, waiter := range waiters { - waiter.errs <- shutdownErr + case <-receiversDone: + // all the receivers have exited + if recv.err != nil { + c.setError(recv.err) } + // don't return out, let the close of the context trigger the abort of waiters c.Close() - return - case <-c.closed: - if c.err == nil { - c.err = ErrClosed - } - // broadcast the shutdown error to the remaining waiters. - for _, waiter := range waiters { - waiter.errs <- c.err - } + case <-c.ctx.Done(): + abortWaiters(c.error()) return } } } -func (c *Client) send(ctx context.Context, streamID uint32, mtype messageType, msg interface{}) error { +func (c *Client) error() error { + c.errOnce.Do(func() { + if c.err == nil { + c.err = ErrClosed + } + }) + return c.err +} + +func (c *Client) setError(err error) { + c.errOnce.Do(func() { + c.err = err + }) +} + +func (c *Client) send(streamID uint32, mtype messageType, msg interface{}) error { p, err := c.codec.Marshal(msg) if err != nil { return err } - return c.channel.send(ctx, streamID, mtype, p) + return c.channel.send(streamID, mtype, p) } func (c *Client) recv(resp *Response, msg *message) error { @@ -278,22 +328,21 @@ func (c *Client) recv(resp *Response, msg *message) error { // // This purposely ignores errors with a wrapped cause. func filterCloseErr(err error) error { - if err == nil { + switch { + case err == nil: return nil - } - - if err == io.EOF { + case err == io.EOF: return ErrClosed - } - - if strings.Contains(err.Error(), "use of closed network connection") { + case errors.Cause(err) == io.EOF: return ErrClosed - } - - // if we have an epipe on a write, we cast to errclosed - if oerr, ok := err.(*net.OpError); ok && oerr.Op == "write" { - if serr, ok := oerr.Err.(*os.SyscallError); ok && serr.Err == syscall.EPIPE { - return ErrClosed + case strings.Contains(err.Error(), "use of closed network connection"): + return ErrClosed + default: + // if we have an epipe on a write, we cast to errclosed + if oerr, ok := err.(*net.OpError); ok && oerr.Op == "write" { + if serr, ok := oerr.Err.(*os.SyscallError); ok && serr.Err == syscall.EPIPE { + return ErrClosed + } } } diff --git a/vendor/github.com/containerd/ttrpc/config.go b/vendor/github.com/containerd/ttrpc/config.go index 019b7a09d..6a53c112b 100644 --- a/vendor/github.com/containerd/ttrpc/config.go +++ b/vendor/github.com/containerd/ttrpc/config.go @@ -19,9 +19,11 @@ package ttrpc import "github.com/pkg/errors" type serverConfig struct { - handshaker Handshaker + handshaker Handshaker + interceptor UnaryServerInterceptor } +// ServerOpt for configuring a ttrpc server type ServerOpt func(*serverConfig) error // WithServerHandshaker can be passed to NewServer to ensure that the @@ -37,3 +39,14 @@ func WithServerHandshaker(handshaker Handshaker) ServerOpt { return nil } } + +// WithUnaryServerInterceptor sets the provided interceptor on the server +func WithUnaryServerInterceptor(i UnaryServerInterceptor) ServerOpt { + return func(c *serverConfig) error { + if c.interceptor != nil { + return errors.New("only one interceptor allowed per server") + } + c.interceptor = i + return nil + } +} diff --git a/vendor/github.com/containerd/ttrpc/interceptor.go b/vendor/github.com/containerd/ttrpc/interceptor.go new file mode 100644 index 000000000..c1219dac6 --- /dev/null +++ b/vendor/github.com/containerd/ttrpc/interceptor.go @@ -0,0 +1,50 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package ttrpc + +import "context" + +// UnaryServerInfo provides information about the server request +type UnaryServerInfo struct { + FullMethod string +} + +// UnaryClientInfo provides information about the client request +type UnaryClientInfo struct { + FullMethod string +} + +// Unmarshaler contains the server request data and allows it to be unmarshaled +// into a concrete type +type Unmarshaler func(interface{}) error + +// Invoker invokes the client's request and response from the ttrpc server +type Invoker func(context.Context, *Request, *Response) error + +// UnaryServerInterceptor specifies the interceptor function for server request/response +type UnaryServerInterceptor func(context.Context, Unmarshaler, *UnaryServerInfo, Method) (interface{}, error) + +// UnaryClientInterceptor specifies the interceptor function for client request/response +type UnaryClientInterceptor func(context.Context, *Request, *Response, *UnaryClientInfo, Invoker) error + +func defaultServerInterceptor(ctx context.Context, unmarshal Unmarshaler, info *UnaryServerInfo, method Method) (interface{}, error) { + return method(ctx, unmarshal) +} + +func defaultClientInterceptor(ctx context.Context, req *Request, resp *Response, _ *UnaryClientInfo, invoker Invoker) error { + return invoker(ctx, req, resp) +} diff --git a/vendor/github.com/containerd/ttrpc/metadata.go b/vendor/github.com/containerd/ttrpc/metadata.go index d03626321..ce8c0d13c 100644 --- a/vendor/github.com/containerd/ttrpc/metadata.go +++ b/vendor/github.com/containerd/ttrpc/metadata.go @@ -16,53 +16,74 @@ package ttrpc -import "context" +import ( + "context" + "strings" +) -// Metadata represents the key-value pairs (similar to http.Header) to be passed to ttrpc server from a client. -type Metadata map[string]StringList +// MD is the user type for ttrpc metadata +type MD map[string][]string // Get returns the metadata for a given key when they exist. // If there is no metadata, a nil slice and false are returned. -func (m Metadata) Get(key string) ([]string, bool) { +func (m MD) Get(key string) ([]string, bool) { + key = strings.ToLower(key) list, ok := m[key] - if !ok || len(list.List) == 0 { + if !ok || len(list) == 0 { return nil, false } - return list.List, true + return list, true } // Set sets the provided values for a given key. // The values will overwrite any existing values. // If no values provided, a key will be deleted. -func (m Metadata) Set(key string, values ...string) { +func (m MD) Set(key string, values ...string) { + key = strings.ToLower(key) if len(values) == 0 { delete(m, key) return } - - m[key] = StringList{List: values} + m[key] = values } // Append appends additional values to the given key. -func (m Metadata) Append(key string, values ...string) { +func (m MD) Append(key string, values ...string) { + key = strings.ToLower(key) if len(values) == 0 { return } - - list, ok := m[key] + current, ok := m[key] if ok { - m.Set(key, append(list.List, values...)...) + m.Set(key, append(current, values...)...) } else { m.Set(key, values...) } } +func (m MD) setRequest(r *Request) { + for k, values := range m { + for _, v := range values { + r.Metadata = append(r.Metadata, &KeyValue{ + Key: k, + Value: v, + }) + } + } +} + +func (m MD) fromRequest(r *Request) { + for _, kv := range r.Metadata { + m[kv.Key] = append(m[kv.Key], kv.Value) + } +} + type metadataKey struct{} // GetMetadata retrieves metadata from context.Context (previously attached with WithMetadata) -func GetMetadata(ctx context.Context) (Metadata, bool) { - metadata, ok := ctx.Value(metadataKey{}).(Metadata) +func GetMetadata(ctx context.Context) (MD, bool) { + metadata, ok := ctx.Value(metadataKey{}).(MD) return metadata, ok } @@ -81,6 +102,6 @@ func GetMetadataValue(ctx context.Context, name string) (string, bool) { } // WithMetadata attaches metadata map to a context.Context -func WithMetadata(ctx context.Context, headers Metadata) context.Context { - return context.WithValue(ctx, metadataKey{}, headers) +func WithMetadata(ctx context.Context, md MD) context.Context { + return context.WithValue(ctx, metadataKey{}, md) } diff --git a/vendor/github.com/containerd/ttrpc/server.go b/vendor/github.com/containerd/ttrpc/server.go index 595a69a0d..1d4f1df65 100644 --- a/vendor/github.com/containerd/ttrpc/server.go +++ b/vendor/github.com/containerd/ttrpc/server.go @@ -53,10 +53,13 @@ func NewServer(opts ...ServerOpt) (*Server, error) { return nil, err } } + if config.interceptor == nil { + config.interceptor = defaultServerInterceptor + } return &Server{ config: config, - services: newServiceSet(), + services: newServiceSet(config.interceptor), done: make(chan struct{}), listeners: make(map[net.Listener]struct{}), connections: make(map[*serverConn]struct{}), @@ -341,7 +344,7 @@ func (c *serverConn) run(sctx context.Context) { default: // proceed } - mh, p, err := ch.recv(ctx) + mh, p, err := ch.recv() if err != nil { status, ok := status.FromError(err) if !ok { @@ -438,7 +441,7 @@ func (c *serverConn) run(sctx context.Context) { return } - if err := ch.send(ctx, response.id, messageTypeResponse, p); err != nil { + if err := ch.send(response.id, messageTypeResponse, p); err != nil { logrus.WithError(err).Error("failed sending message on channel") return } @@ -466,8 +469,10 @@ func (c *serverConn) run(sctx context.Context) { var noopFunc = func() {} func getRequestContext(ctx context.Context, req *Request) (retCtx context.Context, cancel func()) { - if req.Metadata != nil { - ctx = WithMetadata(ctx, req.Metadata) + if len(req.Metadata) > 0 { + md := MD{} + md.fromRequest(req) + ctx = WithMetadata(ctx, md) } cancel = noopFunc diff --git a/vendor/github.com/containerd/ttrpc/services.go b/vendor/github.com/containerd/ttrpc/services.go index fe1cade5a..655b2caea 100644 --- a/vendor/github.com/containerd/ttrpc/services.go +++ b/vendor/github.com/containerd/ttrpc/services.go @@ -37,12 +37,14 @@ type ServiceDesc struct { } type serviceSet struct { - services map[string]ServiceDesc + services map[string]ServiceDesc + interceptor UnaryServerInterceptor } -func newServiceSet() *serviceSet { +func newServiceSet(interceptor UnaryServerInterceptor) *serviceSet { return &serviceSet{ - services: make(map[string]ServiceDesc), + services: make(map[string]ServiceDesc), + interceptor: interceptor, } } @@ -84,7 +86,11 @@ func (s *serviceSet) dispatch(ctx context.Context, serviceName, methodName strin return nil } - resp, err := method(ctx, unmarshal) + info := &UnaryServerInfo{ + FullMethod: fullPath(serviceName, methodName), + } + + resp, err := s.interceptor(ctx, unmarshal, info, method) if err != nil { return nil, err } diff --git a/vendor/github.com/containerd/ttrpc/types.go b/vendor/github.com/containerd/ttrpc/types.go index c8ecb3868..9a1c19a72 100644 --- a/vendor/github.com/containerd/ttrpc/types.go +++ b/vendor/github.com/containerd/ttrpc/types.go @@ -23,11 +23,11 @@ import ( ) type Request struct { - Service string `protobuf:"bytes,1,opt,name=service,proto3"` - Method string `protobuf:"bytes,2,opt,name=method,proto3"` - Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"` - TimeoutNano int64 `protobuf:"varint,4,opt,name=timeout_nano,proto3"` - Metadata Metadata `protobuf:"bytes,5,opt,name=metadata,proto3" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Service string `protobuf:"bytes,1,opt,name=service,proto3"` + Method string `protobuf:"bytes,2,opt,name=method,proto3"` + Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"` + TimeoutNano int64 `protobuf:"varint,4,opt,name=timeout_nano,proto3"` + Metadata []*KeyValue `protobuf:"bytes,5,rep,name=metadata,proto3"` } func (r *Request) Reset() { *r = Request{} } @@ -52,3 +52,12 @@ func (r *StringList) String() string { return fmt.Sprintf("%+#v", r) } func (r *StringList) ProtoMessage() {} func makeStringList(item ...string) StringList { return StringList{List: item} } + +type KeyValue struct { + Key string `protobuf:"bytes,1,opt,name=key,proto3"` + Value string `protobuf:"bytes,2,opt,name=value,proto3"` +} + +func (m *KeyValue) Reset() { *m = KeyValue{} } +func (*KeyValue) ProtoMessage() {} +func (m *KeyValue) String() string { return fmt.Sprintf("%+#v", m) }