Merge pull request #2930 from cpuguy83/update_ttrpc

Update ttrpc to support context timeout.
This commit is contained in:
Phil Estes 2019-01-15 11:23:00 -05:00 committed by GitHub
commit 804faeacdb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 36 additions and 4 deletions

View File

@ -36,7 +36,7 @@ github.com/Microsoft/go-winio v0.4.11
github.com/Microsoft/hcsshim v0.8.3 github.com/Microsoft/hcsshim v0.8.3
google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944 google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944
golang.org/x/text 19e51611da83d6be54ddafce4a4af510cb3e9ea4 golang.org/x/text 19e51611da83d6be54ddafce4a4af510cb3e9ea4
github.com/containerd/ttrpc 2a805f71863501300ae1976d29f0454ae003e85a github.com/containerd/ttrpc f02858b1457c5ca3aaec3a0803eb0d59f96e41d6
github.com/syndtr/gocapability db04d3cc01c8b54962a58ec7e491717d06cfcc16 github.com/syndtr/gocapability db04d3cc01c8b54962a58ec7e491717d06cfcc16
gotest.tools v2.1.0 gotest.tools v2.1.0
github.com/google/go-cmp v0.1.0 github.com/google/go-cmp v0.1.0

View File

@ -50,3 +50,13 @@ TODO:
- [ ] Document protocol layout - [ ] Document protocol layout
- [ ] Add testing under concurrent load to ensure - [ ] Add testing under concurrent load to ensure
- [ ] Verify connection error handling - [ ] Verify connection error handling
# Project details
ttrpc is a containerd sub-project, licensed under the [Apache 2.0 license](./LICENSE).
As a containerd sub-project, you will find the:
* [Project governance](https://github.com/containerd/project/blob/master/GOVERNANCE.md),
* [Maintainers](https://github.com/containerd/project/blob/master/MAINTAINERS),
* and [Contributing guidelines](https://github.com/containerd/project/blob/master/CONTRIBUTING.md)
information in our [`containerd/project`](https://github.com/containerd/project) repository.

View File

@ -24,6 +24,7 @@ import (
"strings" "strings"
"sync" "sync"
"syscall" "syscall"
"time"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -86,6 +87,10 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
cresp = &Response{} cresp = &Response{}
) )
if dl, ok := ctx.Deadline(); ok {
creq.TimeoutNano = dl.Sub(time.Now()).Nanoseconds()
}
if err := c.dispatch(ctx, creq, cresp); err != nil { if err := c.dispatch(ctx, creq, cresp); err != nil {
return err return err
} }
@ -104,6 +109,7 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error { func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error {
errs := make(chan error, 1) errs := make(chan error, 1)
call := &callRequest{ call := &callRequest{
ctx: ctx,
req: req, req: req,
resp: resp, resp: resp,
errs: errs, errs: errs,

View File

@ -414,6 +414,9 @@ func (c *serverConn) run(sctx context.Context) {
case request := <-requests: case request := <-requests:
active++ active++
go func(id uint32) { go func(id uint32) {
ctx, cancel := getRequestContext(ctx, request.req)
defer cancel()
p, status := c.server.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload) p, status := c.server.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload)
resp := &Response{ resp := &Response{
Status: status.Proto(), Status: status.Proto(),
@ -454,3 +457,15 @@ func (c *serverConn) run(sctx context.Context) {
} }
} }
} }
var noopFunc = func() {}
func getRequestContext(ctx context.Context, req *Request) (retCtx context.Context, cancel func()) {
cancel = noopFunc
if req.TimeoutNano == 0 {
return ctx, cancel
}
ctx, cancel = context.WithTimeout(ctx, time.Duration(req.TimeoutNano))
return ctx, cancel
}

View File

@ -26,6 +26,7 @@ type Request struct {
Service string `protobuf:"bytes,1,opt,name=service,proto3"` Service string `protobuf:"bytes,1,opt,name=service,proto3"`
Method string `protobuf:"bytes,2,opt,name=method,proto3"` Method string `protobuf:"bytes,2,opt,name=method,proto3"`
Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"` Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"`
TimeoutNano int64 `protobuf:"varint,4,opt,name=timeout_nano,proto3"`
} }
func (r *Request) Reset() { *r = Request{} } func (r *Request) Reset() { *r = Request{} }