Update ttrpc to v1.2.2

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2023-05-08 23:09:52 -07:00
parent c6d7e45c14
commit 718250b6ba
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
10 changed files with 86 additions and 74 deletions

2
go.mod
View File

@ -17,7 +17,7 @@ require (
github.com/containerd/go-runc v1.0.0 github.com/containerd/go-runc v1.0.0
github.com/containerd/imgcrypt v1.1.7 github.com/containerd/imgcrypt v1.1.7
github.com/containerd/nri v0.3.0 github.com/containerd/nri v0.3.0
github.com/containerd/ttrpc v1.2.1 github.com/containerd/ttrpc v1.2.2
github.com/containerd/typeurl/v2 v2.1.1 github.com/containerd/typeurl/v2 v2.1.1
github.com/containerd/zfs v1.0.0 github.com/containerd/zfs v1.0.0
github.com/containernetworking/cni v1.1.2 github.com/containernetworking/cni v1.1.2

4
go.sum
View File

@ -275,8 +275,8 @@ github.com/containerd/ttrpc v0.0.0-20191028202541-4f1b8fe65a5c/go.mod h1:LPm1u0x
github.com/containerd/ttrpc v1.0.1/go.mod h1:UAxOpgT9ziI0gJrmKvgcZivgxOp8iFPSk8httJEt98Y= github.com/containerd/ttrpc v1.0.1/go.mod h1:UAxOpgT9ziI0gJrmKvgcZivgxOp8iFPSk8httJEt98Y=
github.com/containerd/ttrpc v1.0.2/go.mod h1:UAxOpgT9ziI0gJrmKvgcZivgxOp8iFPSk8httJEt98Y= github.com/containerd/ttrpc v1.0.2/go.mod h1:UAxOpgT9ziI0gJrmKvgcZivgxOp8iFPSk8httJEt98Y=
github.com/containerd/ttrpc v1.1.0/go.mod h1:XX4ZTnoOId4HklF4edwc4DcqskFZuvXB1Evzy5KFQpQ= github.com/containerd/ttrpc v1.1.0/go.mod h1:XX4ZTnoOId4HklF4edwc4DcqskFZuvXB1Evzy5KFQpQ=
github.com/containerd/ttrpc v1.2.1 h1:VWv/Rzx023TBLv4WQ+9WPXlBG/s3rsRjY3i9AJ2BJdE= github.com/containerd/ttrpc v1.2.2 h1:9vqZr0pxwOF5koz6N0N3kJ0zDHokrcPxIR/ZR2YFtOs=
github.com/containerd/ttrpc v1.2.1/go.mod h1:sIT6l32Ph/H9cvnJsfXM5drIVzTr5A2flTf1G5tYZak= github.com/containerd/ttrpc v1.2.2/go.mod h1:sIT6l32Ph/H9cvnJsfXM5drIVzTr5A2flTf1G5tYZak=
github.com/containerd/typeurl v0.0.0-20180627222232-a93fcdb778cd/go.mod h1:Cm3kwCdlkCfMSHURc+r6fwoGH6/F1hH3S4sg0rLFWPc= github.com/containerd/typeurl v0.0.0-20180627222232-a93fcdb778cd/go.mod h1:Cm3kwCdlkCfMSHURc+r6fwoGH6/F1hH3S4sg0rLFWPc=
github.com/containerd/typeurl v0.0.0-20190911142611-5eb25027c9fd/go.mod h1:GeKYzf2pQcqv7tJ0AoCuuhtnqhva5LNU3U+OyKxxJpk= github.com/containerd/typeurl v0.0.0-20190911142611-5eb25027c9fd/go.mod h1:GeKYzf2pQcqv7tJ0AoCuuhtnqhva5LNU3U+OyKxxJpk=
github.com/containerd/typeurl v1.0.1/go.mod h1:TB1hUtrpaiO88KEK56ijojHS1+NeF0izUACaJW2mdXg= github.com/containerd/typeurl v1.0.1/go.mod h1:TB1hUtrpaiO88KEK56ijojHS1+NeF0izUACaJW2mdXg=

View File

@ -10,7 +10,7 @@ require (
github.com/containerd/containerd v1.7.0 // see replace; the actual version of containerd is replaced with the code at the root of this repository github.com/containerd/containerd v1.7.0 // see replace; the actual version of containerd is replaced with the code at the root of this repository
github.com/containerd/continuity v0.3.1-0.20230307035957-72c70feb3081 github.com/containerd/continuity v0.3.1-0.20230307035957-72c70feb3081
github.com/containerd/go-runc v1.0.0 github.com/containerd/go-runc v1.0.0
github.com/containerd/ttrpc v1.2.1 github.com/containerd/ttrpc v1.2.2
github.com/containerd/typeurl/v2 v2.1.1 github.com/containerd/typeurl/v2 v2.1.1
github.com/opencontainers/go-digest v1.0.0 github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.1.0-rc2.0.20221005185240-3a7f492d3f1b github.com/opencontainers/image-spec v1.1.0-rc2.0.20221005185240-3a7f492d3f1b

View File

@ -672,8 +672,8 @@ github.com/containerd/stargz-snapshotter/estargz v0.12.1/go.mod h1:12VUuCq3qPq4y
github.com/containerd/ttrpc v1.0.2/go.mod h1:UAxOpgT9ziI0gJrmKvgcZivgxOp8iFPSk8httJEt98Y= github.com/containerd/ttrpc v1.0.2/go.mod h1:UAxOpgT9ziI0gJrmKvgcZivgxOp8iFPSk8httJEt98Y=
github.com/containerd/ttrpc v1.1.0/go.mod h1:XX4ZTnoOId4HklF4edwc4DcqskFZuvXB1Evzy5KFQpQ= github.com/containerd/ttrpc v1.1.0/go.mod h1:XX4ZTnoOId4HklF4edwc4DcqskFZuvXB1Evzy5KFQpQ=
github.com/containerd/ttrpc v1.1.1-0.20220420014843-944ef4a40df3/go.mod h1:YYyNVhZrTMiaf51Vj6WhAJqJw+vl/nzABhj8pWrzle4= github.com/containerd/ttrpc v1.1.1-0.20220420014843-944ef4a40df3/go.mod h1:YYyNVhZrTMiaf51Vj6WhAJqJw+vl/nzABhj8pWrzle4=
github.com/containerd/ttrpc v1.2.1 h1:VWv/Rzx023TBLv4WQ+9WPXlBG/s3rsRjY3i9AJ2BJdE= github.com/containerd/ttrpc v1.2.2 h1:9vqZr0pxwOF5koz6N0N3kJ0zDHokrcPxIR/ZR2YFtOs=
github.com/containerd/ttrpc v1.2.1/go.mod h1:sIT6l32Ph/H9cvnJsfXM5drIVzTr5A2flTf1G5tYZak= github.com/containerd/ttrpc v1.2.2/go.mod h1:sIT6l32Ph/H9cvnJsfXM5drIVzTr5A2flTf1G5tYZak=
github.com/containerd/typeurl v1.0.1/go.mod h1:TB1hUtrpaiO88KEK56ijojHS1+NeF0izUACaJW2mdXg= github.com/containerd/typeurl v1.0.1/go.mod h1:TB1hUtrpaiO88KEK56ijojHS1+NeF0izUACaJW2mdXg=
github.com/containerd/typeurl v1.0.2/go.mod h1:9trJWW2sRlGub4wZJRTW83VtbOLS6hwcDZXTn6oPz9s= github.com/containerd/typeurl v1.0.2/go.mod h1:9trJWW2sRlGub4wZJRTW83VtbOLS6hwcDZXTn6oPz9s=
github.com/containerd/typeurl/v2 v2.1.1 h1:3Q4Pt7i8nYwy2KmQWIw2+1hTvwTE/6w9FqcttATPO/4= github.com/containerd/typeurl/v2 v2.1.1 h1:3Q4Pt7i8nYwy2KmQWIw2+1hTvwTE/6w9FqcttATPO/4=

View File

@ -1,7 +1,5 @@
linters: linters:
enable: enable:
- structcheck
- varcheck
- staticcheck - staticcheck
- unconvert - unconvert
- gofmt - gofmt

View File

@ -151,7 +151,7 @@ install-protobuild:
coverage: ## generate coverprofiles from the unit tests, except tests that require root coverage: ## generate coverprofiles from the unit tests, except tests that require root
@echo "$(WHALE) $@" @echo "$(WHALE) $@"
@rm -f coverage.txt @rm -f coverage.txt
@$(GO) test -i ${TESTFLAGS} ${TESTPACKAGES} 2> /dev/null @$(GO) test ${TESTFLAGS} ${TESTPACKAGES} 2> /dev/null
@( for pkg in ${PACKAGES}; do \ @( for pkg in ${PACKAGES}; do \
$(GO) test ${TESTFLAGS} \ $(GO) test ${TESTFLAGS} \
-cover \ -cover \

View File

@ -214,60 +214,66 @@ func (cs *clientStream) RecvMsg(m interface{}) error {
if cs.remoteClosed { if cs.remoteClosed {
return io.EOF return io.EOF
} }
var msg *streamMessage
select { select {
case <-cs.ctx.Done(): case <-cs.ctx.Done():
return cs.ctx.Err() return cs.ctx.Err()
case msg, ok := <-cs.s.recv: case <-cs.s.recvClose:
if !ok { // If recv has a pending message, process that first
select {
case msg = <-cs.s.recv:
default:
return cs.s.recvErr return cs.s.recvErr
} }
case msg = <-cs.s.recv:
}
if msg.header.Type == messageTypeResponse { if msg.header.Type == messageTypeResponse {
resp := &Response{} resp := &Response{}
err := proto.Unmarshal(msg.payload[:msg.header.Length], resp) err := proto.Unmarshal(msg.payload[:msg.header.Length], resp)
// return the payload buffer for reuse // return the payload buffer for reuse
cs.c.channel.putmbuf(msg.payload) cs.c.channel.putmbuf(msg.payload)
if err != nil { if err != nil {
return err return err
} }
if err := cs.c.codec.Unmarshal(resp.Payload, m); err != nil { if err := cs.c.codec.Unmarshal(resp.Payload, m); err != nil {
return err return err
} }
if resp.Status != nil && resp.Status.Code != int32(codes.OK) { if resp.Status != nil && resp.Status.Code != int32(codes.OK) {
return status.ErrorProto(resp.Status) return status.ErrorProto(resp.Status)
} }
cs.c.deleteStream(cs.s)
cs.remoteClosed = true
return nil
} else if msg.header.Type == messageTypeData {
if !cs.desc.StreamingServer {
cs.c.deleteStream(cs.s)
cs.remoteClosed = true
return fmt.Errorf("received data from non-streaming server: %w", ErrProtocol)
}
if msg.header.Flags&flagRemoteClosed == flagRemoteClosed {
cs.c.deleteStream(cs.s) cs.c.deleteStream(cs.s)
cs.remoteClosed = true cs.remoteClosed = true
return nil if msg.header.Flags&flagNoData == flagNoData {
} else if msg.header.Type == messageTypeData { return io.EOF
if !cs.desc.StreamingServer {
cs.c.deleteStream(cs.s)
cs.remoteClosed = true
return fmt.Errorf("received data from non-streaming server: %w", ErrProtocol)
} }
if msg.header.Flags&flagRemoteClosed == flagRemoteClosed {
cs.c.deleteStream(cs.s)
cs.remoteClosed = true
if msg.header.Flags&flagNoData == flagNoData {
return io.EOF
}
}
err := cs.c.codec.Unmarshal(msg.payload[:msg.header.Length], m)
cs.c.channel.putmbuf(msg.payload)
if err != nil {
return err
}
return nil
} }
return fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol) err := cs.c.codec.Unmarshal(msg.payload[:msg.header.Length], m)
cs.c.channel.putmbuf(msg.payload)
if err != nil {
return err
}
return nil
} }
return fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
} }
// Close closes the ttrpc connection and underlying connection // Close closes the ttrpc connection and underlying connection
@ -477,25 +483,30 @@ func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) err
} }
defer c.deleteStream(s) defer c.deleteStream(s)
var msg *streamMessage
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-c.ctx.Done(): case <-c.ctx.Done():
return ErrClosed return ErrClosed
case msg, ok := <-s.recv: case <-s.recvClose:
if !ok { // If recv has a pending message, process that first
select {
case msg = <-s.recv:
default:
return s.recvErr return s.recvErr
} }
case msg = <-s.recv:
if msg.header.Type == messageTypeResponse {
err = proto.Unmarshal(msg.payload[:msg.header.Length], resp)
} else {
err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
}
// return the payload buffer for reuse
c.channel.putmbuf(msg.payload)
return err
} }
if msg.header.Type == messageTypeResponse {
err = proto.Unmarshal(msg.payload[:msg.header.Length], resp)
} else {
err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol)
}
// return the payload buffer for reuse
c.channel.putmbuf(msg.payload)
return err
} }

View File

@ -547,7 +547,7 @@ func (c *serverConn) run(sctx context.Context) {
// branch. Basically, it means that we are no longer receiving // branch. Basically, it means that we are no longer receiving
// requests due to a terminal error. // requests due to a terminal error.
recvErr = nil // connection is now "closing" recvErr = nil // connection is now "closing"
if err == io.EOF || err == io.ErrUnexpectedEOF || errors.Is(err, syscall.ECONNRESET) { if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.ECONNRESET) {
// The client went away and we should stop processing // The client went away and we should stop processing
// requests, so that the client connection is closed // requests, so that the client connection is closed
return return

View File

@ -35,27 +35,26 @@ type stream struct {
closeOnce sync.Once closeOnce sync.Once
recvErr error recvErr error
recvClose chan struct{}
} }
func newStream(id streamID, send sender) *stream { func newStream(id streamID, send sender) *stream {
return &stream{ return &stream{
id: id, id: id,
sender: send, sender: send,
recv: make(chan *streamMessage, 1), recv: make(chan *streamMessage, 1),
recvClose: make(chan struct{}),
} }
} }
func (s *stream) closeWithError(err error) error { func (s *stream) closeWithError(err error) error {
s.closeOnce.Do(func() { s.closeOnce.Do(func() {
if s.recv != nil { if err != nil {
close(s.recv) s.recvErr = err
if err != nil { } else {
s.recvErr = err s.recvErr = ErrClosed
} else {
s.recvErr = ErrClosed
}
} }
close(s.recvClose)
}) })
return nil return nil
} }
@ -65,10 +64,14 @@ func (s *stream) send(mt messageType, flags uint8, b []byte) error {
} }
func (s *stream) receive(ctx context.Context, msg *streamMessage) error { func (s *stream) receive(ctx context.Context, msg *streamMessage) error {
if s.recvErr != nil { select {
case <-s.recvClose:
return s.recvErr return s.recvErr
default:
} }
select { select {
case <-s.recvClose:
return s.recvErr
case s.recv <- msg: case s.recv <- msg:
return nil return nil
case <-ctx.Done(): case <-ctx.Done():

2
vendor/modules.txt vendored
View File

@ -134,7 +134,7 @@ github.com/containerd/nri/pkg/net/multiplex
github.com/containerd/nri/pkg/runtime-tools/generate github.com/containerd/nri/pkg/runtime-tools/generate
github.com/containerd/nri/pkg/stub github.com/containerd/nri/pkg/stub
github.com/containerd/nri/types/v1 github.com/containerd/nri/types/v1
# github.com/containerd/ttrpc v1.2.1 # github.com/containerd/ttrpc v1.2.2
## explicit; go 1.13 ## explicit; go 1.13
github.com/containerd/ttrpc github.com/containerd/ttrpc
# github.com/containerd/typeurl v1.0.2 # github.com/containerd/typeurl v1.0.2