update containerd/ttrpc to v1.2.2

https://github.com/containerd/ttrpc/compare/v1.2.1...v1.2.2

Signed-off-by: Humble Chirammal <humble.devassy@gmail.com>
This commit is contained in:
Humble Chirammal
2023-05-12 08:21:39 +05:30
parent 02659772cb
commit f03b5cdc08
8 changed files with 83 additions and 71 deletions

View File

@@ -214,60 +214,66 @@ func (cs *clientStream) RecvMsg(m interface{}) error {
if cs.remoteClosed {
return io.EOF
}
var msg *streamMessage
select {
case <-cs.ctx.Done():
return cs.ctx.Err()
case msg, ok := <-cs.s.recv:
if !ok {
case <-cs.s.recvClose:
// If recv has a pending message, process that first
select {
case msg = <-cs.s.recv:
default:
return cs.s.recvErr
}
case msg = <-cs.s.recv:
}
if msg.header.Type == messageTypeResponse {
resp := &Response{}
err := proto.Unmarshal(msg.payload[:msg.header.Length], resp)
// return the payload buffer for reuse
cs.c.channel.putmbuf(msg.payload)
if err != nil {
return err
}
if msg.header.Type == messageTypeResponse {
resp := &Response{}
err := proto.Unmarshal(msg.payload[:msg.header.Length], resp)
// return the payload buffer for reuse
cs.c.channel.putmbuf(msg.payload)
if err != nil {
return err
}
if err := cs.c.codec.Unmarshal(resp.Payload, m); err != nil {
return err
}
if err := cs.c.codec.Unmarshal(resp.Payload, m); err != nil {
return err
}
if resp.Status != nil && resp.Status.Code != int32(codes.OK) {
return status.ErrorProto(resp.Status)
}
if resp.Status != nil && resp.Status.Code != int32(codes.OK) {
return status.ErrorProto(resp.Status)
}
cs.c.deleteStream(cs.s)
cs.remoteClosed = true
return nil
} else if msg.header.Type == messageTypeData {
if !cs.desc.StreamingServer {
cs.c.deleteStream(cs.s)
cs.remoteClosed = true
return fmt.Errorf("received data from non-streaming server: %w", ErrProtocol)
}
if msg.header.Flags&flagRemoteClosed == flagRemoteClosed {
cs.c.deleteStream(cs.s)
cs.remoteClosed = true
return nil
} else if msg.header.Type == messageTypeData {
if !cs.desc.StreamingServer {
cs.c.deleteStream(cs.s)
cs.remoteClosed = true
return fmt.Errorf("received data from non-streaming server: %w", ErrProtocol)
if msg.header.Flags&flagNoData == flagNoData {
return io.EOF
}
if msg.header.Flags&flagRemoteClosed == flagRemoteClosed {
cs.c.deleteStream(cs.s)
cs.remoteClosed = true
if msg.header.Flags&flagNoData == flagNoData {
return io.EOF
}
}
err := cs.c.codec.Unmarshal(msg.payload[:msg.header.Length], m)
cs.c.channel.putmbuf(msg.payload)
if err != nil {
return err
}
return nil
}
return fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
err := cs.c.codec.Unmarshal(msg.payload[:msg.header.Length], m)
cs.c.channel.putmbuf(msg.payload)
if err != nil {
return err
}
return nil
}
return fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
}
// Close closes the ttrpc connection and underlying connection
@@ -477,25 +483,30 @@ func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) err
}
defer c.deleteStream(s)
var msg *streamMessage
select {
case <-ctx.Done():
return ctx.Err()
case <-c.ctx.Done():
return ErrClosed
case msg, ok := <-s.recv:
if !ok {
case <-s.recvClose:
// If recv has a pending message, process that first
select {
case msg = <-s.recv:
default:
return s.recvErr
}
if msg.header.Type == messageTypeResponse {
err = proto.Unmarshal(msg.payload[:msg.header.Length], resp)
} else {
err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
}
// return the payload buffer for reuse
c.channel.putmbuf(msg.payload)
return err
case msg = <-s.recv:
}
if msg.header.Type == messageTypeResponse {
err = proto.Unmarshal(msg.payload[:msg.header.Length], resp)
} else {
err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
}
// return the payload buffer for reuse
c.channel.putmbuf(msg.payload)
return err
}