Update ttrpc to v1.2.1

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan
2023-03-08 10:29:44 -08:00
parent 71a7234f67
commit 56354c7de5
6 changed files with 40 additions and 21 deletions

View File

@@ -121,12 +121,18 @@ func (s *Server) Serve(ctx context.Context, l net.Listener) error {
approved, handshake, err := handshaker.Handshake(ctx, conn)
if err != nil {
logrus.WithError(err).Errorf("ttrpc: refusing connection after handshake")
logrus.WithError(err).Error("ttrpc: refusing connection after handshake")
conn.Close()
continue
}
sc, err := s.newConn(approved, handshake)
if err != nil {
logrus.WithError(err).Error("ttrpc: create connection failed")
conn.Close()
continue
}
sc := s.newConn(approved, handshake)
go sc.run(ctx)
}
}
@@ -145,15 +151,20 @@ func (s *Server) Shutdown(ctx context.Context) error {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
if s.closeIdleConns() {
return lnerr
s.closeIdleConns()
if s.countConnection() == 0 {
break
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
return lnerr
}
// Close the server without waiting for active connections.
@@ -205,11 +216,18 @@ func (s *Server) closeListeners() error {
return err
}
func (s *Server) addConnection(c *serverConn) {
func (s *Server) addConnection(c *serverConn) error {
s.mu.Lock()
defer s.mu.Unlock()
select {
case <-s.done:
return ErrServerClosed
default:
}
s.connections[c] = struct{}{}
return nil
}
func (s *Server) delConnection(c *serverConn) {
@@ -226,20 +244,17 @@ func (s *Server) countConnection() int {
return len(s.connections)
}
func (s *Server) closeIdleConns() bool {
func (s *Server) closeIdleConns() {
s.mu.Lock()
defer s.mu.Unlock()
quiescent := true
for c := range s.connections {
st, ok := c.getState()
if !ok || st != connStateIdle {
quiescent = false
if st, ok := c.getState(); !ok || st == connStateActive {
continue
}
c.close()
delete(s.connections, c)
}
return quiescent
}
type connState int
@@ -263,7 +278,7 @@ func (cs connState) String() string {
}
}
func (s *Server) newConn(conn net.Conn, handshake interface{}) *serverConn {
func (s *Server) newConn(conn net.Conn, handshake interface{}) (*serverConn, error) {
c := &serverConn{
server: s,
conn: conn,
@@ -271,8 +286,11 @@ func (s *Server) newConn(conn net.Conn, handshake interface{}) *serverConn {
shutdown: make(chan struct{}),
}
c.setState(connStateIdle)
s.addConnection(c)
return c
if err := s.addConnection(c); err != nil {
c.close()
return nil, err
}
return c, nil
}
type serverConn struct {

2
vendor/modules.txt vendored
View File

@@ -137,7 +137,7 @@ github.com/containerd/nri/pkg/net/multiplex
github.com/containerd/nri/pkg/runtime-tools/generate
github.com/containerd/nri/pkg/stub
github.com/containerd/nri/types/v1
# github.com/containerd/ttrpc v1.2.0
# github.com/containerd/ttrpc v1.2.1
## explicit; go 1.13
github.com/containerd/ttrpc
# github.com/containerd/typeurl v1.0.2