From d7ef6cbfa3e45f8b19b388d167af1e9924b7f01d Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Fri, 30 Dec 2022 21:56:25 -0800 Subject: [PATCH 1/2] [streaming] move response packet after registration Prevent a race where a client may attempt to use a stream before it is registered. Signed-off-by: Derek McGowan --- services/streaming/service.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/services/streaming/service.go b/services/streaming/service.go index 30a11c6a5..f9159e43a 100644 --- a/services/streaming/service.go +++ b/services/streaming/service.go @@ -74,20 +74,23 @@ func (s *service) Stream(srv api.Streaming_StreamServer) error { if err != nil { return err } - if err := srv.Send(protobuf.FromAny(response)); err != nil { - return err - } cc := make(chan struct{}) ss := &serviceStream{ s: srv, cc: cc, } + log.G(srv.Context()).WithField("stream", i.ID).Debug("registering stream") if err := s.manager.Register(srv.Context(), i.ID, ss); err != nil { return err } + // Send response packet after registering stream + if err := srv.Send(protobuf.FromAny(response)); err != nil { + return err + } + select { case <-srv.Context().Done(): // TODO: Should return error if not cancelled? From a5b979623e65cae9a0d3595a4e16eec95c013ea8 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Fri, 30 Dec 2022 16:33:15 -0800 Subject: [PATCH 2/2] Add lease to transfer to preserve streams during transfer Signed-off-by: Derek McGowan --- transfer.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/transfer.go b/transfer.go index 1e70a69a2..669f90117 100644 --- a/transfer.go +++ b/transfer.go @@ -32,6 +32,12 @@ import ( ) func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{}, opts ...transfer.Opt) error { + ctx, done, err := c.WithLease(ctx) + if err != nil { + return err + } + defer done(ctx) + return proxy.NewTransferrer(transferapi.NewTransferClient(c.conn), c.streamCreator()).Transfer(ctx, src, dest, opts...) }