From 8a25fa584f0a0f116db82c3cedcc38c629e9effa Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Fri, 2 Dec 2022 11:24:25 -0800 Subject: [PATCH] Unwrap proto errors in streaming client Allows clients to properly detect context cancellation Signed-off-by: Derek McGowan --- pkg/transfer/streaming/stream.go | 4 ++-- services/streaming/service.go | 8 ++++++-- transfer.go | 25 +++++++++++++++++++++---- 3 files changed, 29 insertions(+), 8 deletions(-) diff --git a/pkg/transfer/streaming/stream.go b/pkg/transfer/streaming/stream.go index 5b4890cdd..d6dcf78bb 100644 --- a/pkg/transfer/streaming/stream.go +++ b/pkg/transfer/streaming/stream.go @@ -157,14 +157,14 @@ func ReceiveStream(ctx context.Context, stream streaming.Stream) io.Reader { // check window update error after recv, stream may be complete if werr = stream.Send(any); werr == nil { window += windowSize - } else if werr == io.EOF { + } else if errors.Is(werr, io.EOF) { // TODO: Why does send return EOF here werr = nil } } any, err := stream.Recv() if err != nil { - if err == io.EOF { + if errors.Is(err, io.EOF) { err = nil } else { err = fmt.Errorf("received failed: %w", err) diff --git a/services/streaming/service.go b/services/streaming/service.go index 07d0e5771..30a11c6a5 100644 --- a/services/streaming/service.go +++ b/services/streaming/service.go @@ -102,8 +102,12 @@ type serviceStream struct { cc chan struct{} } -func (ss *serviceStream) Send(a typeurl.Any) error { - return errdefs.FromGRPC(ss.s.Send(protobuf.FromAny(a))) +func (ss *serviceStream) Send(a typeurl.Any) (err error) { + err = errdefs.FromGRPC(ss.s.Send(protobuf.FromAny(a))) + if !errors.Is(err, io.EOF) { + err = errdefs.FromGRPC(err) + } + return } func (ss *serviceStream) Recv() (a typeurl.Any, err error) { diff --git a/transfer.go b/transfer.go index 27a88b9f6..1e70a69a2 100644 --- a/transfer.go +++ b/transfer.go @@ -18,9 +18,12 @@ package containerd import ( "context" + "errors" + "io" streamingapi "github.com/containerd/containerd/api/services/streaming/v1" transferapi "github.com/containerd/containerd/api/services/transfer/v1" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/pkg/streaming" "github.com/containerd/containerd/pkg/transfer" "github.com/containerd/containerd/pkg/transfer/proxy" @@ -56,11 +59,17 @@ func (sc *streamCreator) Create(ctx context.Context, id string) (streaming.Strea } err = stream.Send(protobuf.FromAny(a)) if err != nil { + if !errors.Is(err, io.EOF) { + err = errdefs.FromGRPC(err) + } return nil, err } // Receive an ack that stream is init and ready if _, err = stream.Recv(); err != nil { + if !errors.Is(err, io.EOF) { + err = errdefs.FromGRPC(err) + } return nil, err } @@ -73,12 +82,20 @@ type clientStream struct { s streamingapi.Streaming_StreamClient } -func (cs *clientStream) Send(a typeurl.Any) error { - return cs.s.Send(protobuf.FromAny(a)) +func (cs *clientStream) Send(a typeurl.Any) (err error) { + err = cs.s.Send(protobuf.FromAny(a)) + if !errors.Is(err, io.EOF) { + err = errdefs.FromGRPC(err) + } + return } -func (cs *clientStream) Recv() (typeurl.Any, error) { - return cs.s.Recv() +func (cs *clientStream) Recv() (a typeurl.Any, err error) { + a, err = cs.s.Recv() + if !errors.Is(err, io.EOF) { + err = errdefs.FromGRPC(err) + } + return } func (cs *clientStream) Close() error {