From e963fd5a12652485dcff3a952b88700e6fbc2875 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Fri, 5 Jan 2018 14:20:51 -0800 Subject: [PATCH] ttrpc: return ErrClosed when client is shutdown To gracefully handle scenarios where the connection is closed or the client is closed, we now set the final error to be `ErrClosed`. Callers can resolve it through using `errors.Cause` to detect this condition. Signed-off-by: Stephen J Day --- client.go | 47 ++++++++++++++++++++++++++++++++++++++++++++++- server.go | 2 +- server_test.go | 5 ++++- 3 files changed, 51 insertions(+), 3 deletions(-) diff --git a/client.go b/client.go index ca76afe..ae367f9 100644 --- a/client.go +++ b/client.go @@ -2,8 +2,12 @@ package ttrpc import ( "context" + "io" "net" + "os" + "strings" "sync" + "syscall" "github.com/containerd/containerd/log" "github.com/gogo/protobuf/proto" @@ -11,6 +15,10 @@ import ( "google.golang.org/grpc/status" ) +// ErrClosed is returned by client methods when the underlying connection is +// closed. +var ErrClosed = errors.New("ttrpc: closed") + type Client struct { codec codec conn net.Conn @@ -91,7 +99,7 @@ func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) err select { case err := <-errs: - return err + return filterCloseErr(err) case <-c.done: return c.err } @@ -171,7 +179,14 @@ func (c *Client) run() { call.errs <- c.recv(call.resp, msg) delete(waiters, msg.StreamID) case <-shutdown: + if shutdownErr != nil { + shutdownErr = filterCloseErr(shutdownErr) + } else { + shutdownErr = ErrClosed + } + shutdownErr = errors.Wrapf(shutdownErr, "ttrpc: client shutting down") + c.err = shutdownErr for _, waiter := range waiters { waiter.errs <- shutdownErr @@ -179,6 +194,9 @@ func (c *Client) run() { c.Close() return case <-c.closed: + if c.err == nil { + c.err = ErrClosed + } // broadcast the shutdown error to the remaining waiters. for _, waiter := range waiters { waiter.errs <- shutdownErr @@ -209,3 +227,30 @@ func (c *Client) recv(resp *Response, msg *message) error { defer c.channel.putmbuf(msg.p) return proto.Unmarshal(msg.p, resp) } + +// filterCloseErr rewrites EOF and EPIPE errors to ErrClosed. Use when +// returning from call or handling errors from main read loop. +// +// This purposely ignores errors with a wrapped cause. +func filterCloseErr(err error) error { + if err == nil { + return nil + } + + if err == io.EOF { + return ErrClosed + } + + if strings.Contains(err.Error(), "use of closed network connection") { + return ErrClosed + } + + // if we have an epipe on a write, we cast to errclosed + if oerr, ok := err.(*net.OpError); ok && oerr.Op == "write" { + if serr, ok := oerr.Err.(*os.SyscallError); ok && serr.Err == syscall.EPIPE { + return ErrClosed + } + } + + return err +} diff --git a/server.go b/server.go index edfca0c..5d688e5 100644 --- a/server.go +++ b/server.go @@ -16,7 +16,7 @@ import ( ) var ( - ErrServerClosed = errors.New("ttrpc: server close") + ErrServerClosed = errors.New("ttrpc: server closed") ) type Server struct { diff --git a/server_test.go b/server_test.go index 8be008b..a2c773e 100644 --- a/server_test.go +++ b/server_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -220,7 +221,7 @@ func TestServerShutdown(t *testing.T) { <-shutdownFinished for i := 0; i < ncalls; i++ { - if err := <-callErrs; err != nil { + if err := <-callErrs; err != nil && err != ErrClosed { t.Fatal(err) } } @@ -329,6 +330,8 @@ func TestClientEOF(t *testing.T) { // server shutdown, but we still make a call. if err := client.Call(ctx, serviceName, "Test", tp, tp); err == nil { t.Fatalf("expected error when calling against shutdown server") + } else if errors.Cause(err) != ErrClosed { + t.Fatalf("expected to have a cause of ErrClosed, got %v", errors.Cause(err)) } }