Merge pull request #31 from cpuguy83/support_context_deadlines

Add support for request timeout propgation.
This commit is contained in:
Stephen Day 2019-01-07 13:10:05 -08:00 committed by GitHub
commit f02858b145
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 61 additions and 5 deletions

View File

@ -24,6 +24,7 @@ import (
"strings" "strings"
"sync" "sync"
"syscall" "syscall"
"time"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -86,6 +87,10 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
cresp = &Response{} cresp = &Response{}
) )
if dl, ok := ctx.Deadline(); ok {
creq.TimeoutNano = dl.Sub(time.Now()).Nanoseconds()
}
if err := c.dispatch(ctx, creq, cresp); err != nil { if err := c.dispatch(ctx, creq, cresp); err != nil {
return err return err
} }
@ -104,6 +109,7 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error { func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error {
errs := make(chan error, 1) errs := make(chan error, 1)
call := &callRequest{ call := &callRequest{
ctx: ctx,
req: req, req: req,
resp: resp, resp: resp,
errs: errs, errs: errs,

View File

@ -414,6 +414,9 @@ func (c *serverConn) run(sctx context.Context) {
case request := <-requests: case request := <-requests:
active++ active++
go func(id uint32) { go func(id uint32) {
ctx, cancel := getRequestContext(ctx, request.req)
defer cancel()
p, status := c.server.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload) p, status := c.server.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload)
resp := &Response{ resp := &Response{
Status: status.Proto(), Status: status.Proto(),
@ -454,3 +457,15 @@ func (c *serverConn) run(sctx context.Context) {
} }
} }
} }
var noopFunc = func() {}
func getRequestContext(ctx context.Context, req *Request) (retCtx context.Context, cancel func()) {
cancel = noopFunc
if req.TimeoutNano == 0 {
return ctx, cancel
}
ctx, cancel = context.WithTimeout(ctx, time.Duration(req.TimeoutNano))
return ctx, cancel
}

View File

@ -24,6 +24,7 @@ import (
"strings" "strings"
"sync" "sync"
"testing" "testing"
"time"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -57,7 +58,8 @@ func (tc *testingClient) Test(ctx context.Context, req *testPayload) (*testPaylo
} }
type testPayload struct { type testPayload struct {
Foo string `protobuf:"bytes,1,opt,name=foo,proto3"` Foo string `protobuf:"bytes,1,opt,name=foo,proto3"`
Deadline int64 `protobuf:"varint,2,opt,name=deadline,proto3"`
} }
func (r *testPayload) Reset() { *r = testPayload{} } func (r *testPayload) Reset() { *r = testPayload{} }
@ -68,7 +70,11 @@ func (r *testPayload) ProtoMessage() {}
type testingServer struct{} type testingServer struct{}
func (s *testingServer) Test(ctx context.Context, req *testPayload) (*testPayload, error) { func (s *testingServer) Test(ctx context.Context, req *testPayload) (*testPayload, error) {
return &testPayload{Foo: strings.Repeat(req.Foo, 2)}, nil tp := &testPayload{Foo: strings.Repeat(req.Foo, 2)}
if dl, ok := ctx.Deadline(); ok {
tp.Deadline = dl.UnixNano()
}
return tp, nil
} }
// registerTestingService mocks more of what is generated code. Unlike grpc, we // registerTestingService mocks more of what is generated code. Unlike grpc, we
@ -376,6 +382,34 @@ func TestUnixSocketHandshake(t *testing.T) {
} }
} }
func TestServerRequestTimeout(t *testing.T) {
var (
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(10*time.Minute))
server = mustServer(t)(NewServer())
addr, listener = newTestListener(t)
testImpl = &testingServer{}
client, cleanup = newTestClient(t, addr)
result testPayload
)
defer cancel()
defer cleanup()
defer listener.Close()
registerTestingService(server, testImpl)
go server.Serve(ctx, listener)
defer server.Shutdown(ctx)
if err := client.Call(ctx, serviceName, "Test", &testPayload{}, &result); err != nil {
t.Fatalf("unexpected error making call: %v", err)
}
dl, _ := ctx.Deadline()
if result.Deadline != dl.UnixNano() {
t.Fatalf("expected deadline %v, actual: %v", dl, result.Deadline)
}
}
func BenchmarkRoundTrip(b *testing.B) { func BenchmarkRoundTrip(b *testing.B) {
var ( var (
ctx = context.Background() ctx = context.Background()

View File

@ -23,9 +23,10 @@ import (
) )
type Request struct { type Request struct {
Service string `protobuf:"bytes,1,opt,name=service,proto3"` Service string `protobuf:"bytes,1,opt,name=service,proto3"`
Method string `protobuf:"bytes,2,opt,name=method,proto3"` Method string `protobuf:"bytes,2,opt,name=method,proto3"`
Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"` Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"`
TimeoutNano int64 `protobuf:"varint,4,opt,name=timeout_nano,proto3"`
} }
func (r *Request) Reset() { *r = Request{} } func (r *Request) Reset() { *r = Request{} }