diff --git a/vendor.conf b/vendor.conf index 6b8144421..29f3c8828 100644 --- a/vendor.conf +++ b/vendor.conf @@ -41,3 +41,4 @@ github.com/boltdb/bolt e9cf4fae01b5a8ff89d0ec6b32f0d9c9f79aefdd google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944 golang.org/x/text 19e51611da83d6be54ddafce4a4af510cb3e9ea4 github.com/dmcgowan/go-tar 2e2c51242e8993c50445dab7c03c8e7febddd0cf +github.com/stevvooe/ttrpc bdb2ab7a8169e485e39421e666e15a505e575fd2 diff --git a/vendor/github.com/stevvooe/ttrpc/LICENSE b/vendor/github.com/stevvooe/ttrpc/LICENSE new file mode 100644 index 000000000..261eeb9e9 --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/stevvooe/ttrpc/README.md b/vendor/github.com/stevvooe/ttrpc/README.md new file mode 100644 index 000000000..6e2159a84 --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/README.md @@ -0,0 +1,50 @@ +# ttrpc + +GRPC for low-memory environments. + +The existing grpc-go project requires a lot of memory overhead for importing +packages and at runtime. While this is great for many services with low density +requirements, this can be a problem when running a large number of services on +a single machine or on a machine with a small amount of memory. + +Using the same GRPC definitions, this project reduces the binary size and +protocol overhead required. We do this by eliding the `net/http`, `net/http2` +and `grpc` package used by grpc replacing it with a lightweight framing +protocol. The result are smaller binaries that use less resident memory with +the same ease of use as GRPC. + +Please note that while this project supports generating either end of the +protocol, the generated service definitions will be incompatible with regular +GRPC services, as they do not speak the same protocol. + +# Usage + +Create a gogo vanity binary (see +[`cmd/protoc-gen-gogottrpc/main.go`](cmd/protoc-gen-gogottrpc/main.go) for an +example with the ttrpc plugin enabled. + +It's recommended to use [`protobuild`](https://github.com/stevvooe/protobuild) +to build the protobufs for this project, but this will work with protoc +directly, if required. + +# Differences from GRPC + +- The protocol stack has been replaced with a lighter protocol that doesn't + require http, http2 and tls. +- The client and server interface are identical whereas in GRPC there is a + client and server interface that are different. +- The Go stdlib context package is used instead. +- No support for streams yet. + +# Status + +Very new. YMMV. + +TODO: + +- [X] Plumb error codes and GRPC status +- [X] Remove use of any type and dependency on typeurl package +- [X] Ensure that protocol can support streaming in the future +- [ ] Document protocol layout +- [ ] Add testing under concurrent load to ensure +- [ ] Verify connection error handling diff --git a/vendor/github.com/stevvooe/ttrpc/channel.go b/vendor/github.com/stevvooe/ttrpc/channel.go new file mode 100644 index 000000000..a71260bcc --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/channel.go @@ -0,0 +1,99 @@ +package ttrpc + +import ( + "bufio" + "context" + "encoding/binary" + "io" + + "github.com/pkg/errors" +) + +const ( + messageHeaderLength = 10 + messageLengthMax = 8 << 10 +) + +type messageType uint8 + +const ( + messageTypeRequest messageType = 0x1 + messageTypeResponse messageType = 0x2 +) + +// messageHeader represents the fixed-length message header of 10 bytes sent +// with every request. +type messageHeader struct { + Length uint32 // length excluding this header. b[:4] + StreamID uint32 // identifies which request stream message is a part of. b[4:8] + Type messageType // message type b[8] + Flags uint8 // reserved b[9] +} + +func readMessageHeader(p []byte, r io.Reader) (messageHeader, error) { + _, err := io.ReadFull(r, p[:messageHeaderLength]) + if err != nil { + return messageHeader{}, err + } + + return messageHeader{ + Length: binary.BigEndian.Uint32(p[:4]), + StreamID: binary.BigEndian.Uint32(p[4:8]), + Type: messageType(p[8]), + Flags: p[9], + }, nil +} + +func writeMessageHeader(w io.Writer, p []byte, mh messageHeader) error { + binary.BigEndian.PutUint32(p[:4], mh.Length) + binary.BigEndian.PutUint32(p[4:8], mh.StreamID) + p[8] = byte(mh.Type) + p[9] = mh.Flags + + _, err := w.Write(p[:]) + return err +} + +type channel struct { + bw *bufio.Writer + br *bufio.Reader + hrbuf [messageHeaderLength]byte // avoid alloc when reading header + hwbuf [messageHeaderLength]byte +} + +func newChannel(w io.Writer, r io.Reader) *channel { + return &channel{ + bw: bufio.NewWriter(w), + br: bufio.NewReader(r), + } +} + +func (ch *channel) recv(ctx context.Context, p []byte) (messageHeader, error) { + mh, err := readMessageHeader(ch.hrbuf[:], ch.br) + if err != nil { + return messageHeader{}, err + } + + if mh.Length > uint32(len(p)) { + return messageHeader{}, errors.Wrapf(io.ErrShortBuffer, "message length %v over buffer size %v", mh.Length, len(p)) + } + + if _, err := io.ReadFull(ch.br, p[:mh.Length]); err != nil { + return messageHeader{}, errors.Wrapf(err, "failed reading message") + } + + return mh, nil +} + +func (ch *channel) send(ctx context.Context, streamID uint32, t messageType, p []byte) error { + if err := writeMessageHeader(ch.bw, ch.hwbuf[:], messageHeader{Length: uint32(len(p)), StreamID: streamID, Type: t}); err != nil { + return err + } + + _, err := ch.bw.Write(p) + if err != nil { + return err + } + + return ch.bw.Flush() +} diff --git a/vendor/github.com/stevvooe/ttrpc/client.go b/vendor/github.com/stevvooe/ttrpc/client.go new file mode 100644 index 000000000..9dbb3d3b4 --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/client.go @@ -0,0 +1,204 @@ +package ttrpc + +import ( + "context" + "net" + "sync" + "sync/atomic" + + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" + "google.golang.org/grpc/status" +) + +type Client struct { + codec codec + channel *channel + requestID uint32 + sendRequests chan sendRequest + recvRequests chan recvRequest + + closed chan struct{} + closeOnce sync.Once + done chan struct{} + err error +} + +func NewClient(conn net.Conn) *Client { + c := &Client{ + codec: codec{}, + requestID: 1, + channel: newChannel(conn, conn), + sendRequests: make(chan sendRequest), + recvRequests: make(chan recvRequest), + closed: make(chan struct{}), + done: make(chan struct{}), + } + + go c.run() + return c +} + +func (c *Client) Call(ctx context.Context, service, method string, req, resp interface{}) error { + payload, err := c.codec.Marshal(req) + if err != nil { + return err + } + + requestID := atomic.AddUint32(&c.requestID, 2) + request := Request{ + Service: service, + Method: method, + Payload: payload, + } + + if err := c.send(ctx, requestID, &request); err != nil { + return err + } + + var response Response + if err := c.recv(ctx, requestID, &response); err != nil { + return err + } + + if err := c.codec.Unmarshal(response.Payload, resp); err != nil { + return err + } + + if response.Status == nil { + return errors.New("no status provided on response") + } + + return status.ErrorProto(response.Status) +} + +func (c *Client) Close() error { + c.closeOnce.Do(func() { + close(c.closed) + }) + + return nil +} + +type sendRequest struct { + ctx context.Context + id uint32 + msg interface{} + err chan error +} + +func (c *Client) send(ctx context.Context, id uint32, msg interface{}) error { + errs := make(chan error, 1) + select { + case c.sendRequests <- sendRequest{ + ctx: ctx, + id: id, + msg: msg, + err: errs, + }: + case <-ctx.Done(): + return ctx.Err() + case <-c.done: + return c.err + } + + select { + case err := <-errs: + return err + case <-ctx.Done(): + return ctx.Err() + case <-c.done: + return c.err + } +} + +type recvRequest struct { + id uint32 + msg interface{} + err chan error +} + +func (c *Client) recv(ctx context.Context, id uint32, msg interface{}) error { + errs := make(chan error, 1) + select { + case c.recvRequests <- recvRequest{ + id: id, + msg: msg, + err: errs, + }: + case <-c.done: + return c.err + case <-ctx.Done(): + return ctx.Err() + } + + select { + case err := <-errs: + return err + case <-c.done: + return c.err + case <-ctx.Done(): + return ctx.Err() + } +} + +type received struct { + mh messageHeader + p []byte + err error +} + +func (c *Client) run() { + defer close(c.done) + var ( + waiters = map[uint32]recvRequest{} + queued = map[uint32]received{} // messages unmatched by waiter + incoming = make(chan received) + ) + + go func() { + // start one more goroutine to recv messages without blocking. + for { + var p [messageLengthMax]byte + mh, err := c.channel.recv(context.TODO(), p[:]) + select { + case incoming <- received{ + mh: mh, + p: p[:mh.Length], + err: err, + }: + case <-c.done: + return + } + } + }() + + for { + select { + case req := <-c.sendRequests: + if p, err := proto.Marshal(req.msg.(proto.Message)); err != nil { + req.err <- err + } else { + req.err <- c.channel.send(req.ctx, req.id, messageTypeRequest, p) + } + case req := <-c.recvRequests: + if r, ok := queued[req.id]; ok { + req.err <- proto.Unmarshal(r.p, req.msg.(proto.Message)) + } + waiters[req.id] = req + case r := <-incoming: + if r.err != nil { + c.err = r.err + return + } + + if waiter, ok := waiters[r.mh.StreamID]; ok { + waiter.err <- proto.Unmarshal(r.p, waiter.msg.(proto.Message)) + } else { + queued[r.mh.StreamID] = r + } + case <-c.closed: + return + } + } +} diff --git a/vendor/github.com/stevvooe/ttrpc/codec.go b/vendor/github.com/stevvooe/ttrpc/codec.go new file mode 100644 index 000000000..7956a7222 --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/codec.go @@ -0,0 +1,26 @@ +package ttrpc + +import ( + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" +) + +type codec struct{} + +func (c codec) Marshal(msg interface{}) ([]byte, error) { + switch v := msg.(type) { + case proto.Message: + return proto.Marshal(v) + default: + return nil, errors.Errorf("ttrpc: cannot marshal unknown type: %T", msg) + } +} + +func (c codec) Unmarshal(p []byte, msg interface{}) error { + switch v := msg.(type) { + case proto.Message: + return proto.Unmarshal(p, v) + default: + return errors.Errorf("ttrpc: cannot unmarshal into unknown type: %T", msg) + } +} diff --git a/vendor/github.com/stevvooe/ttrpc/plugin/generator.go b/vendor/github.com/stevvooe/ttrpc/plugin/generator.go new file mode 100644 index 000000000..5603e57f4 --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/plugin/generator.go @@ -0,0 +1,131 @@ +package plugin + +import ( + "strings" + + "github.com/gogo/protobuf/protoc-gen-gogo/descriptor" + "github.com/gogo/protobuf/protoc-gen-gogo/generator" +) + +type ttrpcGenerator struct { + *generator.Generator + generator.PluginImports + + typeurlPkg generator.Single + ttrpcPkg generator.Single + contextPkg generator.Single +} + +func init() { + generator.RegisterPlugin(new(ttrpcGenerator)) +} + +func (p *ttrpcGenerator) Name() string { + return "ttrpc" +} + +func (p *ttrpcGenerator) Init(g *generator.Generator) { + p.Generator = g +} + +func (p *ttrpcGenerator) Generate(file *generator.FileDescriptor) { + p.PluginImports = generator.NewPluginImports(p.Generator) + p.contextPkg = p.NewImport("context") + p.typeurlPkg = p.NewImport("github.com/containerd/typeurl") + p.ttrpcPkg = p.NewImport("github.com/stevvooe/ttrpc") + + for _, service := range file.GetService() { + serviceName := service.GetName() + if pkg := file.GetPackage(); pkg != "" { + serviceName = pkg + "." + serviceName + } + + p.genService(serviceName, service) + } +} + +func (p *ttrpcGenerator) genService(fullName string, service *descriptor.ServiceDescriptorProto) { + serviceName := service.GetName() + "Service" + p.P() + p.P("type ", serviceName, " interface{") + p.In() + for _, method := range service.Method { + p.P(method.GetName(), + "(ctx ", p.contextPkg.Use(), ".Context, ", + "req *", p.typeName(method.GetInputType()), ") ", + "(*", p.typeName(method.GetOutputType()), ", error)") + + } + p.Out() + p.P("}") + + p.P() + // registration method + p.P("func Register", serviceName, "(srv *", p.ttrpcPkg.Use(), ".Server, svc ", serviceName, ") {") + p.In() + p.P(`srv.Register("`, fullName, `", map[string]`, p.ttrpcPkg.Use(), ".Method{") + p.In() + for _, method := range service.Method { + p.P(`"`, method.GetName(), `": `, `func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {`) + p.In() + p.P("var req ", p.typeName(method.GetInputType())) + p.P(`if err := unmarshal(&req); err != nil {`) + p.In() + p.P(`return nil, err`) + p.Out() + p.P(`}`) + p.P("return svc.", method.GetName(), "(ctx, &req)") + p.Out() + p.P("},") + } + p.Out() + p.P("})") + p.Out() + p.P("}") + + clientType := service.GetName() + "Client" + clientStructType := strings.ToLower(clientType[:1]) + clientType[1:] + p.P() + p.P("type ", clientStructType, " struct{") + p.In() + p.P("client *", p.ttrpcPkg.Use(), ".Client") + p.Out() + p.P("}") + p.P() + p.P("func New", clientType, "(client *", p.ttrpcPkg.Use(), ".Client)", serviceName, "{") + p.In() + p.P("return &", clientStructType, "{") + p.In() + p.P("client: client,") + p.Out() + p.P("}") + p.Out() + p.P("}") + p.P() + for _, method := range service.Method { + p.P() + p.P("func (c *", clientStructType, ") ", method.GetName(), + "(ctx ", p.contextPkg.Use(), ".Context, ", + "req *", p.typeName(method.GetInputType()), ") ", + "(*", p.typeName(method.GetOutputType()), ", error) {") + p.In() + p.P("var resp ", p.typeName(method.GetOutputType())) + p.P("if err := c.client.Call(ctx, ", `"`+fullName+`", `, `"`+method.GetName()+`"`, ", req, &resp); err != nil {") + p.In() + p.P("return nil, err") + p.Out() + p.P("}") + p.P("return &resp, nil") + p.Out() + p.P("}") + } +} + +func (p *ttrpcGenerator) objectNamed(name string) generator.Object { + p.Generator.RecordTypeUse(name) + return p.Generator.ObjectNamed(name) +} + +func (p *ttrpcGenerator) typeName(str string) string { + return p.Generator.TypeName(p.objectNamed(str)) +} diff --git a/vendor/github.com/stevvooe/ttrpc/server.go b/vendor/github.com/stevvooe/ttrpc/server.go new file mode 100644 index 000000000..407068fc3 --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/server.go @@ -0,0 +1,155 @@ +package ttrpc + +import ( + "context" + "net" + + "github.com/containerd/containerd/log" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type Server struct { + services *serviceSet + codec codec +} + +func NewServer() *Server { + return &Server{ + services: newServiceSet(), + } +} + +func (s *Server) Register(name string, methods map[string]Method) { + s.services.register(name, methods) +} + +func (s *Server) Shutdown(ctx context.Context) error { + // TODO(stevvooe): Wait on connection shutdown. + return nil +} + +func (s *Server) Serve(l net.Listener) error { + for { + conn, err := l.Accept() + if err != nil { + log.L.WithError(err).Error("failed accept") + continue + } + + go s.handleConn(conn) + } + + return nil +} + +func (s *Server) handleConn(conn net.Conn) { + defer conn.Close() + + type ( + request struct { + id uint32 + req *Request + } + + response struct { + id uint32 + resp *Response + } + ) + + var ( + ch = newChannel(conn, conn) + ctx, cancel = context.WithCancel(context.Background()) + responses = make(chan response) + requests = make(chan request) + recvErr = make(chan error, 1) + done = make(chan struct{}) + ) + + defer cancel() + defer close(done) + + go func() { + defer close(recvErr) + var p [messageLengthMax]byte + for { + mh, err := ch.recv(ctx, p[:]) + if err != nil { + recvErr <- err + return + } + + if mh.Type != messageTypeRequest { + // we must ignore this for future compat. + continue + } + + var req Request + if err := s.codec.Unmarshal(p[:mh.Length], &req); err != nil { + recvErr <- err + return + } + + if mh.StreamID%2 != 1 { + // enforce odd client initiated identifiers. + select { + case responses <- response{ + // even though we've had an invalid stream id, we send it + // back on the same stream id so the client knows which + // stream id was bad. + id: mh.StreamID, + resp: &Response{ + Status: status.New(codes.InvalidArgument, "StreamID must be odd for client initiated streams").Proto(), + }, + }: + case <-done: + } + + continue + } + + select { + case requests <- request{ + id: mh.StreamID, + req: &req, + }: + case <-done: + } + } + }() + + for { + select { + case request := <-requests: + go func(id uint32) { + p, status := s.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload) + resp := &Response{ + Status: status.Proto(), + Payload: p, + } + + select { + case responses <- response{ + id: id, + resp: resp, + }: + case <-done: + } + }(request.id) + case response := <-responses: + p, err := s.codec.Marshal(response.resp) + if err != nil { + log.L.WithError(err).Error("failed marshaling response") + return + } + if err := ch.send(ctx, response.id, messageTypeResponse, p); err != nil { + log.L.WithError(err).Error("failed sending message on channel") + return + } + case err := <-recvErr: + log.L.WithError(err).Error("error receiving message") + return + } + } +} diff --git a/vendor/github.com/stevvooe/ttrpc/services.go b/vendor/github.com/stevvooe/ttrpc/services.go new file mode 100644 index 000000000..b9a749e3d --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/services.go @@ -0,0 +1,134 @@ +package ttrpc + +import ( + "context" + "io" + "os" + "path" + + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type Method func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) + +type ServiceDesc struct { + Methods map[string]Method + + // TODO(stevvooe): Add stream support. +} + +type serviceSet struct { + services map[string]ServiceDesc +} + +func newServiceSet() *serviceSet { + return &serviceSet{ + services: make(map[string]ServiceDesc), + } +} + +func (s *serviceSet) register(name string, methods map[string]Method) { + if _, ok := s.services[name]; ok { + panic(errors.Errorf("duplicate service %v registered", name)) + } + + s.services[name] = ServiceDesc{ + Methods: methods, + } +} + +func (s *serviceSet) call(ctx context.Context, serviceName, methodName string, p []byte) ([]byte, *status.Status) { + p, err := s.dispatch(ctx, serviceName, methodName, p) + st, ok := status.FromError(err) + if !ok { + st = status.New(convertCode(err), err.Error()) + } + + return p, st +} + +func (s *serviceSet) dispatch(ctx context.Context, serviceName, methodName string, p []byte) ([]byte, error) { + method, err := s.resolve(serviceName, methodName) + if err != nil { + return nil, err + } + + unmarshal := func(obj interface{}) error { + switch v := obj.(type) { + case proto.Message: + if err := proto.Unmarshal(p, v); err != nil { + return status.Errorf(codes.Internal, "ttrpc: error unmarshaling payload: %v", err.Error()) + } + default: + return status.Errorf(codes.Internal, "ttrpc: error unsupported request type: %T", v) + } + return nil + } + + resp, err := method(ctx, unmarshal) + if err != nil { + return nil, err + } + + switch v := resp.(type) { + case proto.Message: + r, err := proto.Marshal(v) + if err != nil { + return nil, status.Errorf(codes.Internal, "ttrpc: error marshaling payload: %v", err.Error()) + } + + return r, nil + default: + return nil, status.Errorf(codes.Internal, "ttrpc: error unsupported response type: %T", v) + } +} + +func (s *serviceSet) resolve(service, method string) (Method, error) { + srv, ok := s.services[service] + if !ok { + return nil, status.Errorf(codes.NotFound, "service %v", service) + } + + mthd, ok := srv.Methods[method] + if !ok { + return nil, status.Errorf(codes.NotFound, "method %v", method) + } + + return mthd, nil +} + +// convertCode maps stdlib go errors into grpc space. +// +// This is ripped from the grpc-go code base. +func convertCode(err error) codes.Code { + switch err { + case nil: + return codes.OK + case io.EOF: + return codes.OutOfRange + case io.ErrClosedPipe, io.ErrNoProgress, io.ErrShortBuffer, io.ErrShortWrite, io.ErrUnexpectedEOF: + return codes.FailedPrecondition + case os.ErrInvalid: + return codes.InvalidArgument + case context.Canceled: + return codes.Canceled + case context.DeadlineExceeded: + return codes.DeadlineExceeded + } + switch { + case os.IsExist(err): + return codes.AlreadyExists + case os.IsNotExist(err): + return codes.NotFound + case os.IsPermission(err): + return codes.PermissionDenied + } + return codes.Unknown +} + +func fullPath(service, method string) string { + return "/" + path.Join("/", service, method) +} diff --git a/vendor/github.com/stevvooe/ttrpc/types.go b/vendor/github.com/stevvooe/ttrpc/types.go new file mode 100644 index 000000000..a522b0cf2 --- /dev/null +++ b/vendor/github.com/stevvooe/ttrpc/types.go @@ -0,0 +1,26 @@ +package ttrpc + +import ( + "fmt" + + spb "google.golang.org/genproto/googleapis/rpc/status" +) + +type Request struct { + Service string `protobuf:"bytes,1,opt,name=service,proto3"` + Method string `protobuf:"bytes,2,opt,name=method,proto3"` + Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"` +} + +func (r *Request) Reset() { *r = Request{} } +func (r *Request) String() string { return fmt.Sprintf("%+#v", r) } +func (r *Request) ProtoMessage() {} + +type Response struct { + Status *spb.Status `protobuf:"bytes,1,opt,name=status,proto3"` + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3"` +} + +func (r *Response) Reset() { *r = Response{} } +func (r *Response) String() string { return fmt.Sprintf("%+#v", r) } +func (r *Response) ProtoMessage() {}