Upgrade konnectivity-client for GRPC connection fixes
The v0.0.19 Konnectivity client includes several significant fixes to prevent the GRPC tunnel between the KAS and the APIServer Network Proxy from becoming blocked/wedged. Importantly it picks up the fix for kubernetes-sigs/apiserver-network-proxy#167. We believe this will also fix many of the failures currently seen on https://testgrid.k8s.io/sig-api-machinery-network-proxy#ci-kubernetes-e2e-gci-gce-network-proxy-grpc&width=5.
This commit is contained in:
36
vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/client.go
generated
vendored
36
vendor/sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/client.go
generated
vendored
@@ -19,6 +19,7 @@ package client
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"net"
|
||||
@@ -49,6 +50,10 @@ type grpcTunnel struct {
|
||||
conns map[int64]*conn
|
||||
pendingDialLock sync.RWMutex
|
||||
connsLock sync.RWMutex
|
||||
|
||||
// The tunnel will be closed if the caller fails to read via conn.Read()
|
||||
// more than readTimeoutSeconds after a packet has been received.
|
||||
readTimeoutSeconds int
|
||||
}
|
||||
|
||||
type clientConn interface {
|
||||
@@ -75,9 +80,10 @@ func CreateSingleUseGrpcTunnel(address string, opts ...grpc.DialOption) (Tunnel,
|
||||
}
|
||||
|
||||
tunnel := &grpcTunnel{
|
||||
stream: stream,
|
||||
pendingDial: make(map[int64]chan<- dialResult),
|
||||
conns: make(map[int64]*conn),
|
||||
stream: stream,
|
||||
pendingDial: make(map[int64]chan<- dialResult),
|
||||
conns: make(map[int64]*conn),
|
||||
readTimeoutSeconds: 10,
|
||||
}
|
||||
|
||||
go tunnel.serve(c)
|
||||
@@ -110,10 +116,17 @@ func (t *grpcTunnel) serve(c clientConn) {
|
||||
if !ok {
|
||||
klog.V(1).Infoln("DialResp not recognized; dropped")
|
||||
} else {
|
||||
ch <- dialResult{
|
||||
result := dialResult{
|
||||
err: resp.Error,
|
||||
connid: resp.ConnectID,
|
||||
}
|
||||
select {
|
||||
case ch <- result:
|
||||
default:
|
||||
klog.ErrorS(fmt.Errorf("blocked pending channel"), "Received second dial response for connection request", "connectionID", resp.ConnectID, "dialID", resp.Random)
|
||||
// On multiple dial responses, avoid leaking serve goroutine.
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if resp.Error != "" {
|
||||
@@ -129,7 +142,14 @@ func (t *grpcTunnel) serve(c clientConn) {
|
||||
t.connsLock.RUnlock()
|
||||
|
||||
if ok {
|
||||
conn.readCh <- resp.Data
|
||||
timer := time.NewTimer((time.Duration)(t.readTimeoutSeconds) * time.Second)
|
||||
select {
|
||||
case conn.readCh <- resp.Data:
|
||||
timer.Stop()
|
||||
case <-timer.C:
|
||||
klog.ErrorS(fmt.Errorf("timeout"), "readTimeout has been reached, the grpc connection to the proxy server will be closed", "connectionID", conn.connID, "readTimeoutSeconds", t.readTimeoutSeconds)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
klog.V(1).InfoS("connection not recognized", "connectionID", resp.ConnectID)
|
||||
}
|
||||
@@ -160,8 +180,8 @@ func (t *grpcTunnel) Dial(protocol, address string) (net.Conn, error) {
|
||||
return nil, errors.New("protocol not supported")
|
||||
}
|
||||
|
||||
random := rand.Int63()
|
||||
resCh := make(chan dialResult)
|
||||
random := rand.Int63() /* #nosec G404 */
|
||||
resCh := make(chan dialResult, 1)
|
||||
t.pendingDialLock.Lock()
|
||||
t.pendingDial[random] = resCh
|
||||
t.pendingDialLock.Unlock()
|
||||
@@ -199,7 +219,7 @@ func (t *grpcTunnel) Dial(protocol, address string) (net.Conn, error) {
|
||||
}
|
||||
c.connID = res.connid
|
||||
c.readCh = make(chan []byte, 10)
|
||||
c.closeCh = make(chan string)
|
||||
c.closeCh = make(chan string, 1)
|
||||
t.connsLock.Lock()
|
||||
t.conns[res.connid] = c
|
||||
t.connsLock.Unlock()
|
||||
|
Reference in New Issue
Block a user