Update TTRPC and Protobuild dependencies

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko
2023-01-19 10:52:41 -08:00
parent 60363db5bc
commit 99580e0aad
17 changed files with 278 additions and 231 deletions

View File

@@ -57,7 +57,7 @@ TESTFLAGS_PARALLEL ?= 8
# Use this to replace `go test` with, for instance, `gotestsum`
GOTEST ?= $(GO) test
.PHONY: clean all AUTHORS build binaries test integration generate protos checkprotos coverage ci check help install vendor install-protobuf install-protobuild
.PHONY: clean all AUTHORS build binaries test integration generate protos check-protos coverage ci check help install vendor install-protobuf install-protobuild
.DEFAULT: default
# Forcibly set the default goal to all, in case an include above brought in a rule definition.
@@ -69,7 +69,7 @@ check: proto-fmt ## run all linters
@echo "$(WHALE) $@"
GOGC=75 golangci-lint run
ci: check binaries checkprotos coverage # coverage-integration ## to be used by the CI
ci: check binaries check-protos coverage # coverage-integration ## to be used by the CI
AUTHORS: .mailmap .git/HEAD
git log --format='%aN <%aE>' | sort -fu > $@
@@ -145,8 +145,8 @@ install-protobuf:
install-protobuild:
@echo "$(WHALE) $@"
@$(GO) install google.golang.org/protobuf/cmd/protoc-gen-go@v1.27.1
@$(GO) install github.com/containerd/protobuild@7e5ee24bc1f70e9e289fef15e2631eb3491320bf
@$(GO) install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28.1
@$(GO) install github.com/containerd/protobuild@14832ccc41429f5c4f81028e5af08aa233a219cf
coverage: ## generate coverprofiles from the unit tests, except tests that require root
@echo "$(WHALE) $@"

View File

@@ -23,3 +23,6 @@ generators = ["go"]
# enable ttrpc and disable fieldpath and grpc for the shim
prefixes = ["github.com/containerd/ttrpc/integration/streaming"]
generators = ["go", "go-ttrpc"]
[overrides.parameters.go-ttrpc]
prefix = "TTRPC"

View File

@@ -1,7 +1,6 @@
# ttrpc
[![Build Status](https://github.com/containerd/ttrpc/workflows/CI/badge.svg)](https://github.com/containerd/ttrpc/actions?query=workflow%3ACI)
[![codecov](https://codecov.io/gh/containerd/ttrpc/branch/main/graph/badge.svg)](https://codecov.io/gh/containerd/ttrpc)
GRPC for low-memory environments.
@@ -30,7 +29,7 @@ Create a gogo vanity binary (see
[`cmd/protoc-gen-gogottrpc/main.go`](cmd/protoc-gen-gogottrpc/main.go) for an
example with the ttrpc plugin enabled.
It's recommended to use [`protobuild`](https://github.com//stevvooe/protobuild)
It's recommended to use [`protobuild`](https://github.com/containerd/protobuild)
to build the protobufs for this project, but this will work with protoc
directly, if required.
@@ -41,7 +40,6 @@ directly, if required.
- The client and server interface are identical whereas in GRPC there is a
client and server interface that are different.
- The Go stdlib context package is used instead.
- No support for streams yet.
# Status

View File

@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.27.1
// protoc v3.11.4
// protoc-gen-go v1.28.1
// protoc v3.20.1
// source: github.com/containerd/ttrpc/request.proto
package ttrpc

View File

@@ -18,11 +18,13 @@ package ttrpc
import (
"context"
"errors"
"io"
"math/rand"
"net"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/sirupsen/logrus"
@@ -318,6 +320,7 @@ func (c *serverConn) run(sctx context.Context) {
responses = make(chan response)
recvErr = make(chan error, 1)
done = make(chan struct{})
streams = sync.Map{}
active int32
lastStreamID uint32
)
@@ -347,7 +350,6 @@ func (c *serverConn) run(sctx context.Context) {
go func(recvErr chan error) {
defer close(recvErr)
streams := map[uint32]*streamHandler{}
for {
select {
case <-c.shutdown:
@@ -383,12 +385,13 @@ func (c *serverConn) run(sctx context.Context) {
}
if mh.Type == messageTypeData {
sh, ok := streams[mh.StreamID]
i, ok := streams.Load(mh.StreamID)
if !ok {
if !sendStatus(mh.StreamID, status.Newf(codes.InvalidArgument, "StreamID is no longer active")) {
return
}
}
sh := i.(*streamHandler)
if mh.Flags&flagNoData != flagNoData {
unmarshal := func(obj interface{}) error {
err := protoUnmarshal(p, obj)
@@ -458,7 +461,7 @@ func (c *serverConn) run(sctx context.Context) {
continue
}
streams[id] = sh
streams.Store(id, sh)
atomic.AddInt32(&active, 1)
}
// TODO: else we must ignore this for future compat. log this?
@@ -518,6 +521,7 @@ func (c *serverConn) run(sctx context.Context) {
// The ttrpc protocol currently does not support the case where
// the server is localClosed but not remoteClosed. Once the server
// is closing, the whole stream may be considered finished
streams.Delete(response.id)
atomic.AddInt32(&active, -1)
}
case err := <-recvErr:
@@ -525,14 +529,12 @@ func (c *serverConn) run(sctx context.Context) {
// branch. Basically, it means that we are no longer receiving
// requests due to a terminal error.
recvErr = nil // connection is now "closing"
if err == io.EOF || err == io.ErrUnexpectedEOF {
if err == io.EOF || err == io.ErrUnexpectedEOF || errors.Is(err, syscall.ECONNRESET) {
// The client went away and we should stop processing
// requests, so that the client connection is closed
return
}
if err != nil {
logrus.WithError(err).Error("error receiving message")
}
logrus.WithError(err).Error("error receiving message")
// else, initiate shutdown
case <-shutdown:
return