diff --git a/go.mod b/go.mod index 575e67d4a..7a3a18bb9 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/containerd/go-runc v1.0.0 github.com/containerd/imgcrypt v1.1.7 github.com/containerd/nri v0.3.0 - github.com/containerd/ttrpc v1.2.1 + github.com/containerd/ttrpc v1.2.2 github.com/containerd/typeurl/v2 v2.1.1 github.com/containerd/zfs v1.0.0 github.com/containernetworking/cni v1.1.2 diff --git a/go.sum b/go.sum index a864517f6..8239cd482 100644 --- a/go.sum +++ b/go.sum @@ -275,8 +275,8 @@ github.com/containerd/ttrpc v0.0.0-20191028202541-4f1b8fe65a5c/go.mod h1:LPm1u0x github.com/containerd/ttrpc v1.0.1/go.mod h1:UAxOpgT9ziI0gJrmKvgcZivgxOp8iFPSk8httJEt98Y= github.com/containerd/ttrpc v1.0.2/go.mod h1:UAxOpgT9ziI0gJrmKvgcZivgxOp8iFPSk8httJEt98Y= github.com/containerd/ttrpc v1.1.0/go.mod h1:XX4ZTnoOId4HklF4edwc4DcqskFZuvXB1Evzy5KFQpQ= -github.com/containerd/ttrpc v1.2.1 h1:VWv/Rzx023TBLv4WQ+9WPXlBG/s3rsRjY3i9AJ2BJdE= -github.com/containerd/ttrpc v1.2.1/go.mod h1:sIT6l32Ph/H9cvnJsfXM5drIVzTr5A2flTf1G5tYZak= +github.com/containerd/ttrpc v1.2.2 h1:9vqZr0pxwOF5koz6N0N3kJ0zDHokrcPxIR/ZR2YFtOs= +github.com/containerd/ttrpc v1.2.2/go.mod h1:sIT6l32Ph/H9cvnJsfXM5drIVzTr5A2flTf1G5tYZak= github.com/containerd/typeurl v0.0.0-20180627222232-a93fcdb778cd/go.mod h1:Cm3kwCdlkCfMSHURc+r6fwoGH6/F1hH3S4sg0rLFWPc= github.com/containerd/typeurl v0.0.0-20190911142611-5eb25027c9fd/go.mod h1:GeKYzf2pQcqv7tJ0AoCuuhtnqhva5LNU3U+OyKxxJpk= github.com/containerd/typeurl v1.0.1/go.mod h1:TB1hUtrpaiO88KEK56ijojHS1+NeF0izUACaJW2mdXg= diff --git a/integration/client/go.mod b/integration/client/go.mod index c4baad42a..929882cfa 100644 --- a/integration/client/go.mod +++ b/integration/client/go.mod @@ -10,7 +10,7 @@ require ( github.com/containerd/containerd v1.7.0 // see replace; the actual version of containerd is replaced with the code at the root of this repository github.com/containerd/continuity v0.3.1-0.20230307035957-72c70feb3081 github.com/containerd/go-runc v1.0.0 - github.com/containerd/ttrpc v1.2.1 + github.com/containerd/ttrpc v1.2.2 github.com/containerd/typeurl/v2 v2.1.1 github.com/opencontainers/go-digest v1.0.0 github.com/opencontainers/image-spec v1.1.0-rc2.0.20221005185240-3a7f492d3f1b diff --git a/integration/client/go.sum b/integration/client/go.sum index 318d809b5..07000335d 100644 --- a/integration/client/go.sum +++ b/integration/client/go.sum @@ -672,8 +672,8 @@ github.com/containerd/stargz-snapshotter/estargz v0.12.1/go.mod h1:12VUuCq3qPq4y github.com/containerd/ttrpc v1.0.2/go.mod h1:UAxOpgT9ziI0gJrmKvgcZivgxOp8iFPSk8httJEt98Y= github.com/containerd/ttrpc v1.1.0/go.mod h1:XX4ZTnoOId4HklF4edwc4DcqskFZuvXB1Evzy5KFQpQ= github.com/containerd/ttrpc v1.1.1-0.20220420014843-944ef4a40df3/go.mod h1:YYyNVhZrTMiaf51Vj6WhAJqJw+vl/nzABhj8pWrzle4= -github.com/containerd/ttrpc v1.2.1 h1:VWv/Rzx023TBLv4WQ+9WPXlBG/s3rsRjY3i9AJ2BJdE= -github.com/containerd/ttrpc v1.2.1/go.mod h1:sIT6l32Ph/H9cvnJsfXM5drIVzTr5A2flTf1G5tYZak= +github.com/containerd/ttrpc v1.2.2 h1:9vqZr0pxwOF5koz6N0N3kJ0zDHokrcPxIR/ZR2YFtOs= +github.com/containerd/ttrpc v1.2.2/go.mod h1:sIT6l32Ph/H9cvnJsfXM5drIVzTr5A2flTf1G5tYZak= github.com/containerd/typeurl v1.0.1/go.mod h1:TB1hUtrpaiO88KEK56ijojHS1+NeF0izUACaJW2mdXg= github.com/containerd/typeurl v1.0.2/go.mod h1:9trJWW2sRlGub4wZJRTW83VtbOLS6hwcDZXTn6oPz9s= github.com/containerd/typeurl/v2 v2.1.1 h1:3Q4Pt7i8nYwy2KmQWIw2+1hTvwTE/6w9FqcttATPO/4= diff --git a/vendor/github.com/containerd/ttrpc/.golangci.yml b/vendor/github.com/containerd/ttrpc/.golangci.yml index c8be4980c..6462e52f6 100644 --- a/vendor/github.com/containerd/ttrpc/.golangci.yml +++ b/vendor/github.com/containerd/ttrpc/.golangci.yml @@ -1,7 +1,5 @@ linters: enable: - - structcheck - - varcheck - staticcheck - unconvert - gofmt diff --git a/vendor/github.com/containerd/ttrpc/Makefile b/vendor/github.com/containerd/ttrpc/Makefile index 474194239..c3a497dca 100644 --- a/vendor/github.com/containerd/ttrpc/Makefile +++ b/vendor/github.com/containerd/ttrpc/Makefile @@ -151,7 +151,7 @@ install-protobuild: coverage: ## generate coverprofiles from the unit tests, except tests that require root @echo "$(WHALE) $@" @rm -f coverage.txt - @$(GO) test -i ${TESTFLAGS} ${TESTPACKAGES} 2> /dev/null + @$(GO) test ${TESTFLAGS} ${TESTPACKAGES} 2> /dev/null @( for pkg in ${PACKAGES}; do \ $(GO) test ${TESTFLAGS} \ -cover \ diff --git a/vendor/github.com/containerd/ttrpc/client.go b/vendor/github.com/containerd/ttrpc/client.go index 0abc7025d..4b1e1e709 100644 --- a/vendor/github.com/containerd/ttrpc/client.go +++ b/vendor/github.com/containerd/ttrpc/client.go @@ -214,60 +214,66 @@ func (cs *clientStream) RecvMsg(m interface{}) error { if cs.remoteClosed { return io.EOF } + + var msg *streamMessage select { case <-cs.ctx.Done(): return cs.ctx.Err() - case msg, ok := <-cs.s.recv: - if !ok { + case <-cs.s.recvClose: + // If recv has a pending message, process that first + select { + case msg = <-cs.s.recv: + default: return cs.s.recvErr } + case msg = <-cs.s.recv: + } - if msg.header.Type == messageTypeResponse { - resp := &Response{} - err := proto.Unmarshal(msg.payload[:msg.header.Length], resp) - // return the payload buffer for reuse - cs.c.channel.putmbuf(msg.payload) - if err != nil { - return err - } + if msg.header.Type == messageTypeResponse { + resp := &Response{} + err := proto.Unmarshal(msg.payload[:msg.header.Length], resp) + // return the payload buffer for reuse + cs.c.channel.putmbuf(msg.payload) + if err != nil { + return err + } - if err := cs.c.codec.Unmarshal(resp.Payload, m); err != nil { - return err - } + if err := cs.c.codec.Unmarshal(resp.Payload, m); err != nil { + return err + } - if resp.Status != nil && resp.Status.Code != int32(codes.OK) { - return status.ErrorProto(resp.Status) - } + if resp.Status != nil && resp.Status.Code != int32(codes.OK) { + return status.ErrorProto(resp.Status) + } + cs.c.deleteStream(cs.s) + cs.remoteClosed = true + + return nil + } else if msg.header.Type == messageTypeData { + if !cs.desc.StreamingServer { + cs.c.deleteStream(cs.s) + cs.remoteClosed = true + return fmt.Errorf("received data from non-streaming server: %w", ErrProtocol) + } + if msg.header.Flags&flagRemoteClosed == flagRemoteClosed { cs.c.deleteStream(cs.s) cs.remoteClosed = true - return nil - } else if msg.header.Type == messageTypeData { - if !cs.desc.StreamingServer { - cs.c.deleteStream(cs.s) - cs.remoteClosed = true - return fmt.Errorf("received data from non-streaming server: %w", ErrProtocol) + if msg.header.Flags&flagNoData == flagNoData { + return io.EOF } - if msg.header.Flags&flagRemoteClosed == flagRemoteClosed { - cs.c.deleteStream(cs.s) - cs.remoteClosed = true - - if msg.header.Flags&flagNoData == flagNoData { - return io.EOF - } - } - - err := cs.c.codec.Unmarshal(msg.payload[:msg.header.Length], m) - cs.c.channel.putmbuf(msg.payload) - if err != nil { - return err - } - return nil } - return fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol) + err := cs.c.codec.Unmarshal(msg.payload[:msg.header.Length], m) + cs.c.channel.putmbuf(msg.payload) + if err != nil { + return err + } + return nil } + + return fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol) } // Close closes the ttrpc connection and underlying connection @@ -477,25 +483,30 @@ func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) err } defer c.deleteStream(s) + var msg *streamMessage select { case <-ctx.Done(): return ctx.Err() case <-c.ctx.Done(): return ErrClosed - case msg, ok := <-s.recv: - if !ok { + case <-s.recvClose: + // If recv has a pending message, process that first + select { + case msg = <-s.recv: + default: return s.recvErr } - - if msg.header.Type == messageTypeResponse { - err = proto.Unmarshal(msg.payload[:msg.header.Length], resp) - } else { - err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol) - } - - // return the payload buffer for reuse - c.channel.putmbuf(msg.payload) - - return err + case msg = <-s.recv: } + + if msg.header.Type == messageTypeResponse { + err = proto.Unmarshal(msg.payload[:msg.header.Length], resp) + } else { + err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol) + } + + // return the payload buffer for reuse + c.channel.putmbuf(msg.payload) + + return err } diff --git a/vendor/github.com/containerd/ttrpc/server.go b/vendor/github.com/containerd/ttrpc/server.go index 2efda2bc9..7af59f828 100644 --- a/vendor/github.com/containerd/ttrpc/server.go +++ b/vendor/github.com/containerd/ttrpc/server.go @@ -547,7 +547,7 @@ func (c *serverConn) run(sctx context.Context) { // branch. Basically, it means that we are no longer receiving // requests due to a terminal error. recvErr = nil // connection is now "closing" - if err == io.EOF || err == io.ErrUnexpectedEOF || errors.Is(err, syscall.ECONNRESET) { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.ECONNRESET) { // The client went away and we should stop processing // requests, so that the client connection is closed return diff --git a/vendor/github.com/containerd/ttrpc/stream.go b/vendor/github.com/containerd/ttrpc/stream.go index 5f264fe67..739a4c967 100644 --- a/vendor/github.com/containerd/ttrpc/stream.go +++ b/vendor/github.com/containerd/ttrpc/stream.go @@ -35,27 +35,26 @@ type stream struct { closeOnce sync.Once recvErr error + recvClose chan struct{} } func newStream(id streamID, send sender) *stream { return &stream{ - id: id, - sender: send, - recv: make(chan *streamMessage, 1), + id: id, + sender: send, + recv: make(chan *streamMessage, 1), + recvClose: make(chan struct{}), } } func (s *stream) closeWithError(err error) error { s.closeOnce.Do(func() { - if s.recv != nil { - close(s.recv) - if err != nil { - s.recvErr = err - } else { - s.recvErr = ErrClosed - } - + if err != nil { + s.recvErr = err + } else { + s.recvErr = ErrClosed } + close(s.recvClose) }) return nil } @@ -65,10 +64,14 @@ func (s *stream) send(mt messageType, flags uint8, b []byte) error { } func (s *stream) receive(ctx context.Context, msg *streamMessage) error { - if s.recvErr != nil { + select { + case <-s.recvClose: return s.recvErr + default: } select { + case <-s.recvClose: + return s.recvErr case s.recv <- msg: return nil case <-ctx.Done(): diff --git a/vendor/modules.txt b/vendor/modules.txt index 8f693246b..b2182fe20 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -134,7 +134,7 @@ github.com/containerd/nri/pkg/net/multiplex github.com/containerd/nri/pkg/runtime-tools/generate github.com/containerd/nri/pkg/stub github.com/containerd/nri/types/v1 -# github.com/containerd/ttrpc v1.2.1 +# github.com/containerd/ttrpc v1.2.2 ## explicit; go 1.13 github.com/containerd/ttrpc # github.com/containerd/typeurl v1.0.2