Merge pull request #7886 from dmcgowan/fix-transfer-register-ordering
Fix race between stream registration and use
This commit is contained in:
commit
72e5ddb516
@ -74,20 +74,23 @@ func (s *service) Stream(srv api.Streaming_StreamServer) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := srv.Send(protobuf.FromAny(response)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
cc := make(chan struct{})
|
cc := make(chan struct{})
|
||||||
ss := &serviceStream{
|
ss := &serviceStream{
|
||||||
s: srv,
|
s: srv,
|
||||||
cc: cc,
|
cc: cc,
|
||||||
}
|
}
|
||||||
|
|
||||||
log.G(srv.Context()).WithField("stream", i.ID).Debug("registering stream")
|
log.G(srv.Context()).WithField("stream", i.ID).Debug("registering stream")
|
||||||
if err := s.manager.Register(srv.Context(), i.ID, ss); err != nil {
|
if err := s.manager.Register(srv.Context(), i.ID, ss); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send response packet after registering stream
|
||||||
|
if err := srv.Send(protobuf.FromAny(response)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-srv.Context().Done():
|
case <-srv.Context().Done():
|
||||||
// TODO: Should return error if not cancelled?
|
// TODO: Should return error if not cancelled?
|
||||||
|
@ -32,6 +32,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{}, opts ...transfer.Opt) error {
|
func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{}, opts ...transfer.Opt) error {
|
||||||
|
ctx, done, err := c.WithLease(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer done(ctx)
|
||||||
|
|
||||||
return proxy.NewTransferrer(transferapi.NewTransferClient(c.conn), c.streamCreator()).Transfer(ctx, src, dest, opts...)
|
return proxy.NewTransferrer(transferapi.NewTransferClient(c.conn), c.streamCreator()).Transfer(ctx, src, dest, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user