Merge pull request #110731 from jkh52/update-netproxy

Bump konnectivity-client to 0.0.32
This commit is contained in:
Kubernetes Prow Robot
2022-06-23 12:41:20 -07:00
committed by GitHub
21 changed files with 70 additions and 40 deletions

4
vendor/modules.txt vendored
View File

@@ -2449,7 +2449,7 @@ k8s.io/utils/pointer
k8s.io/utils/strings
k8s.io/utils/strings/slices
k8s.io/utils/trace
# sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.30 => sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.30
# sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.32 => sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.32
## explicit; go 1.17
sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client
sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client
@@ -2893,7 +2893,7 @@ sigs.k8s.io/yaml
# modernc.org/strutil => modernc.org/strutil v1.0.0
# modernc.org/xc => modernc.org/xc v1.0.0
# rsc.io/pdf => rsc.io/pdf v0.1.1
# sigs.k8s.io/apiserver-network-proxy/konnectivity-client => sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.30
# sigs.k8s.io/apiserver-network-proxy/konnectivity-client => sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.32
# sigs.k8s.io/json => sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2
# sigs.k8s.io/kustomize/api => sigs.k8s.io/kustomize/api v0.11.4
# sigs.k8s.io/kustomize/cmd/config => sigs.k8s.io/kustomize/cmd/config v0.10.6

View File

@@ -35,7 +35,7 @@ import (
type Tunnel interface {
// Dial connects to the address on the named network, similar to
// what net.Dial does. The only supported protocol is tcp.
DialContext(ctx context.Context, protocol, address string) (net.Conn, error)
DialContext(requestCtx context.Context, protocol, address string) (net.Conn, error)
}
type dialResult struct {
@@ -73,15 +73,27 @@ var _ clientConn = &grpc.ClientConn{}
// gRPC based proxy service.
// Currently, a single tunnel supports a single connection, and the tunnel is closed when the connection is terminated
// The Dial() method of the returned tunnel should only be called once
func CreateSingleUseGrpcTunnel(ctx context.Context, address string, opts ...grpc.DialOption) (Tunnel, error) {
c, err := grpc.DialContext(ctx, address, opts...)
// Deprecated 2022-06-07: use CreateSingleUseGrpcTunnelWithContext
func CreateSingleUseGrpcTunnel(tunnelCtx context.Context, address string, opts ...grpc.DialOption) (Tunnel, error) {
return CreateSingleUseGrpcTunnelWithContext(context.TODO(), tunnelCtx, address, opts...)
}
// CreateSingleUseGrpcTunnelWithContext creates a Tunnel to dial to a remote server through a
// gRPC based proxy service.
// Currently, a single tunnel supports a single connection.
// The tunnel is normally closed when the connection is terminated.
// If createCtx is cancelled before tunnel creation, an error will be returned.
// If tunnelCtx is cancelled while the tunnel is still in use, the tunnel (and any in flight connections) will be closed.
// The Dial() method of the returned tunnel should only be called once
func CreateSingleUseGrpcTunnelWithContext(createCtx, tunnelCtx context.Context, address string, opts ...grpc.DialOption) (Tunnel, error) {
c, err := grpc.DialContext(createCtx, address, opts...)
if err != nil {
return nil, err
}
grpcClient := client.NewProxyServiceClient(c)
stream, err := grpcClient.Proxy(ctx)
stream, err := grpcClient.Proxy(tunnelCtx)
if err != nil {
c.Close()
return nil, err
@@ -94,13 +106,24 @@ func CreateSingleUseGrpcTunnel(ctx context.Context, address string, opts ...grpc
readTimeoutSeconds: 10,
}
go tunnel.serve(c)
go tunnel.serve(tunnelCtx, c)
return tunnel, nil
}
func (t *grpcTunnel) serve(c clientConn) {
defer c.Close()
func (t *grpcTunnel) serve(tunnelCtx context.Context, c clientConn) {
defer func() {
c.Close()
// A connection in t.conns after serve() returns means
// we never received a CLOSE_RSP for it, so we need to
// close any channels remaining for these connections.
t.connsLock.Lock()
for _, conn := range t.conns {
close(conn.readCh)
}
t.connsLock.Unlock()
}()
for {
pkt, err := t.stream.Recv()
@@ -141,6 +164,9 @@ func (t *grpcTunnel) serve(c clientConn) {
// In either scenario, we should return here as this tunnel is no longer needed.
klog.V(1).InfoS("Pending dial has been cancelled; dropped", "connectionID", resp.ConnectID, "dialID", resp.Random)
return
case <-tunnelCtx.Done():
klog.V(1).InfoS("Tunnel has been closed; dropped", "connectionID", resp.ConnectID, "dialID", resp.Random)
return
}
}
@@ -164,6 +190,8 @@ func (t *grpcTunnel) serve(c clientConn) {
case <-timer.C:
klog.ErrorS(fmt.Errorf("timeout"), "readTimeout has been reached, the grpc connection to the proxy server will be closed", "connectionID", conn.connID, "readTimeoutSeconds", t.readTimeoutSeconds)
return
case <-tunnelCtx.Done():
klog.V(1).InfoS("Tunnel has been closed, the grpc connection to the proxy server will be closed", "connectionID", conn.connID)
}
} else {
klog.V(1).InfoS("connection not recognized", "connectionID", resp.ConnectID)
@@ -190,7 +218,7 @@ func (t *grpcTunnel) serve(c clientConn) {
// Dial connects to the address on the named network, similar to
// what net.Dial does. The only supported protocol is tcp.
func (t *grpcTunnel) DialContext(ctx context.Context, protocol, address string) (net.Conn, error) {
func (t *grpcTunnel) DialContext(requestCtx context.Context, protocol, address string) (net.Conn, error) {
if protocol != "tcp" {
return nil, errors.New("protocol not supported")
}
@@ -248,8 +276,8 @@ func (t *grpcTunnel) DialContext(ctx context.Context, protocol, address string)
case <-time.After(30 * time.Second):
klog.V(5).InfoS("Timed out waiting for DialResp", "dialID", random)
return nil, errors.New("dial timeout, backstop")
case <-ctx.Done():
klog.V(5).InfoS("Context canceled waiting for DialResp", "ctxErr", ctx.Err(), "dialID", random)
case <-requestCtx.Done():
klog.V(5).InfoS("Context canceled waiting for DialResp", "ctxErr", requestCtx.Err(), "dialID", random)
return nil, errors.New("dial timeout, context")
}

View File

@@ -30,6 +30,8 @@ import (
// successful delivery of CLOSE_REQ.
const CloseTimeout = 10 * time.Second
var errConnCloseTimeout = errors.New("close timeout")
// conn is an implementation of net.Conn, where the data is transported
// over an established tunnel defined by a gRPC service ProxyService.
type conn struct {
@@ -151,5 +153,5 @@ func (c *conn) Close() error {
case <-time.After(CloseTimeout):
}
return errors.New("close timeout")
return errConnCloseTimeout
}