Add support for request timeout propgation.
Adds a new field to the `Request` type which specifies a timeout (in nanoseconds) for the request. This is propagated on method dispatch as a context timeout. There was some discussion here on supporting a broader "metadata" field (similar to grpc) that can be used for other things, but we ended up with a dedicated field because it is lighter weight and expect it to be used pretty heavily as is.... metadata may be added in the future, but is not necessary for timeouts. Also discussed using a deadline vs a timeout in the request and decided to go with a timeout in order to deal with potential clock skew between the client and server. This also has the side-effect of eliminating the protocol/wire overhead from the request timeout. Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
parent
f51df4475b
commit
a364f44e55
@ -24,6 +24,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
@ -86,6 +87,10 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
|
|||||||
cresp = &Response{}
|
cresp = &Response{}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if dl, ok := ctx.Deadline(); ok {
|
||||||
|
creq.TimeoutNano = dl.Sub(time.Now()).Nanoseconds()
|
||||||
|
}
|
||||||
|
|
||||||
if err := c.dispatch(ctx, creq, cresp); err != nil {
|
if err := c.dispatch(ctx, creq, cresp); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -104,6 +109,7 @@ func (c *Client) Call(ctx context.Context, service, method string, req, resp int
|
|||||||
func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error {
|
func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) error {
|
||||||
errs := make(chan error, 1)
|
errs := make(chan error, 1)
|
||||||
call := &callRequest{
|
call := &callRequest{
|
||||||
|
ctx: ctx,
|
||||||
req: req,
|
req: req,
|
||||||
resp: resp,
|
resp: resp,
|
||||||
errs: errs,
|
errs: errs,
|
||||||
|
15
server.go
15
server.go
@ -414,6 +414,9 @@ func (c *serverConn) run(sctx context.Context) {
|
|||||||
case request := <-requests:
|
case request := <-requests:
|
||||||
active++
|
active++
|
||||||
go func(id uint32) {
|
go func(id uint32) {
|
||||||
|
ctx, cancel := getRequestContext(ctx, request.req)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
p, status := c.server.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload)
|
p, status := c.server.services.call(ctx, request.req.Service, request.req.Method, request.req.Payload)
|
||||||
resp := &Response{
|
resp := &Response{
|
||||||
Status: status.Proto(),
|
Status: status.Proto(),
|
||||||
@ -454,3 +457,15 @@ func (c *serverConn) run(sctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var noopFunc = func() {}
|
||||||
|
|
||||||
|
func getRequestContext(ctx context.Context, req *Request) (retCtx context.Context, cancel func()) {
|
||||||
|
cancel = noopFunc
|
||||||
|
if req.TimeoutNano == 0 {
|
||||||
|
return ctx, cancel
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, time.Duration(req.TimeoutNano))
|
||||||
|
return ctx, cancel
|
||||||
|
}
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
@ -57,7 +58,8 @@ func (tc *testingClient) Test(ctx context.Context, req *testPayload) (*testPaylo
|
|||||||
}
|
}
|
||||||
|
|
||||||
type testPayload struct {
|
type testPayload struct {
|
||||||
Foo string `protobuf:"bytes,1,opt,name=foo,proto3"`
|
Foo string `protobuf:"bytes,1,opt,name=foo,proto3"`
|
||||||
|
Deadline int64 `protobuf:"varint,2,opt,name=deadline,proto3"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *testPayload) Reset() { *r = testPayload{} }
|
func (r *testPayload) Reset() { *r = testPayload{} }
|
||||||
@ -68,7 +70,11 @@ func (r *testPayload) ProtoMessage() {}
|
|||||||
type testingServer struct{}
|
type testingServer struct{}
|
||||||
|
|
||||||
func (s *testingServer) Test(ctx context.Context, req *testPayload) (*testPayload, error) {
|
func (s *testingServer) Test(ctx context.Context, req *testPayload) (*testPayload, error) {
|
||||||
return &testPayload{Foo: strings.Repeat(req.Foo, 2)}, nil
|
tp := &testPayload{Foo: strings.Repeat(req.Foo, 2)}
|
||||||
|
if dl, ok := ctx.Deadline(); ok {
|
||||||
|
tp.Deadline = dl.UnixNano()
|
||||||
|
}
|
||||||
|
return tp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// registerTestingService mocks more of what is generated code. Unlike grpc, we
|
// registerTestingService mocks more of what is generated code. Unlike grpc, we
|
||||||
@ -376,6 +382,34 @@ func TestUnixSocketHandshake(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestServerRequestTimeout(t *testing.T) {
|
||||||
|
var (
|
||||||
|
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(10*time.Minute))
|
||||||
|
server = mustServer(t)(NewServer())
|
||||||
|
addr, listener = newTestListener(t)
|
||||||
|
testImpl = &testingServer{}
|
||||||
|
client, cleanup = newTestClient(t, addr)
|
||||||
|
result testPayload
|
||||||
|
)
|
||||||
|
defer cancel()
|
||||||
|
defer cleanup()
|
||||||
|
defer listener.Close()
|
||||||
|
|
||||||
|
registerTestingService(server, testImpl)
|
||||||
|
|
||||||
|
go server.Serve(ctx, listener)
|
||||||
|
defer server.Shutdown(ctx)
|
||||||
|
|
||||||
|
if err := client.Call(ctx, serviceName, "Test", &testPayload{}, &result); err != nil {
|
||||||
|
t.Fatalf("unexpected error making call: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dl, _ := ctx.Deadline()
|
||||||
|
if result.Deadline != dl.UnixNano() {
|
||||||
|
t.Fatalf("expected deadline %v, actual: %v", dl, result.Deadline)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkRoundTrip(b *testing.B) {
|
func BenchmarkRoundTrip(b *testing.B) {
|
||||||
var (
|
var (
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
|
7
types.go
7
types.go
@ -23,9 +23,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Request struct {
|
type Request struct {
|
||||||
Service string `protobuf:"bytes,1,opt,name=service,proto3"`
|
Service string `protobuf:"bytes,1,opt,name=service,proto3"`
|
||||||
Method string `protobuf:"bytes,2,opt,name=method,proto3"`
|
Method string `protobuf:"bytes,2,opt,name=method,proto3"`
|
||||||
Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"`
|
Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3"`
|
||||||
|
TimeoutNano int64 `protobuf:"varint,4,opt,name=timeout_nano,proto3"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Request) Reset() { *r = Request{} }
|
func (r *Request) Reset() { *r = Request{} }
|
||||||
|
Loading…
Reference in New Issue
Block a user