Merge pull request #115191 from jkh52/zero-one-one

Bump konnectivity-client to v0.1.1
This commit is contained in:
Kubernetes Prow Robot
2023-01-20 17:56:02 -08:00
committed by GitHub
26 changed files with 946 additions and 667 deletions

2
vendor/modules.txt vendored
View File

@@ -2407,7 +2407,7 @@ k8s.io/utils/pointer
k8s.io/utils/strings
k8s.io/utils/strings/slices
k8s.io/utils/trace
# sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.35
# sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.1.1
## explicit; go 1.17
sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client
sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics

View File

@@ -118,6 +118,8 @@ func (cm *connectionManager) closeAll() {
// grpcTunnel implements Tunnel
type grpcTunnel struct {
stream client.ProxyService_ProxyClient
sendLock sync.Mutex
recvLock sync.Mutex
clientConn clientConn
pendingDial pendingDialManager
conns connectionManager
@@ -243,20 +245,17 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) {
}()
for {
pkt, err := t.stream.Recv()
pkt, err := t.Recv()
if err == io.EOF {
return
}
const segment = commonmetrics.SegmentToClient
isClosing := t.isClosing()
if err != nil || pkt == nil {
if !isClosing {
klog.ErrorS(err, "stream read failure")
}
metrics.Metrics.ObserveStreamErrorNoPacket(segment, err)
return
}
metrics.Metrics.ObservePacket(segment, pkt.Type)
if isClosing {
return
}
@@ -335,11 +334,23 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) {
case client.PacketType_DATA:
resp := pkt.GetData()
if resp.ConnectID == 0 {
klog.ErrorS(nil, "Received packet missing ConnectID", "packetType", "DATA")
continue
}
// TODO: flow control
conn, ok := t.conns.get(resp.ConnectID)
if !ok {
klog.V(1).InfoS("Connection not recognized", "connectionID", resp.ConnectID)
klog.ErrorS(nil, "Connection not recognized", "connectionID", resp.ConnectID, "packetType", "DATA")
t.Send(&client.Packet{
Type: client.PacketType_CLOSE_REQ,
Payload: &client.Packet_CloseRequest{
CloseRequest: &client.CloseRequest{
ConnectID: resp.ConnectID,
},
},
})
continue
}
timer := time.NewTimer((time.Duration)(t.readTimeoutSeconds) * time.Second)
@@ -358,7 +369,7 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) {
conn, ok := t.conns.get(resp.ConnectID)
if !ok {
klog.V(1).InfoS("Connection not recognized", "connectionID", resp.ConnectID)
klog.V(1).InfoS("Connection not recognized", "connectionID", resp.ConnectID, "packetType", "CLOSE_RSP")
continue
}
close(conn.readCh)
@@ -418,18 +429,15 @@ func (t *grpcTunnel) dialContext(requestCtx context.Context, protocol, address s
}
klog.V(5).InfoS("[tracing] send packet", "type", req.Type)
const segment = commonmetrics.SegmentFromClient
metrics.Metrics.ObservePacket(segment, req.Type)
err := t.stream.Send(req)
err := t.Send(req)
if err != nil {
metrics.Metrics.ObserveStreamError(segment, err, req.Type)
return nil, err
}
klog.V(5).Infoln("DIAL_REQ sent to proxy server")
c := &conn{
stream: t.stream,
tunnel: t,
random: random,
closeTunnel: t.closeTunnel,
}
@@ -473,10 +481,7 @@ func (t *grpcTunnel) closeDial(dialID int64) {
},
},
}
const segment = commonmetrics.SegmentFromClient
metrics.Metrics.ObservePacket(segment, req.Type)
if err := t.stream.Send(req); err != nil {
metrics.Metrics.ObserveStreamError(segment, err, req.Type)
if err := t.Send(req); err != nil {
klog.V(5).InfoS("Failed to send DIAL_CLS", "err", err, "dialID", dialID)
}
t.closeTunnel()
@@ -491,6 +496,35 @@ func (t *grpcTunnel) isClosing() bool {
return atomic.LoadUint32(&t.closing) != 0
}
func (t *grpcTunnel) Send(pkt *client.Packet) error {
t.sendLock.Lock()
defer t.sendLock.Unlock()
const segment = commonmetrics.SegmentFromClient
metrics.Metrics.ObservePacket(segment, pkt.Type)
err := t.stream.Send(pkt)
if err != nil && err != io.EOF {
metrics.Metrics.ObserveStreamError(segment, err, pkt.Type)
}
return err
}
func (t *grpcTunnel) Recv() (*client.Packet, error) {
t.recvLock.Lock()
defer t.recvLock.Unlock()
const segment = commonmetrics.SegmentToClient
pkt, err := t.stream.Recv()
if err != nil && err != io.EOF {
metrics.Metrics.ObserveStreamErrorNoPacket(segment, err)
}
if err != nil {
return pkt, err
}
metrics.Metrics.ObservePacket(segment, pkt.Type)
return pkt, nil
}
func GetDialFailureReason(err error) (isDialFailure bool, reason metrics.DialFailureReason) {
var df *dialFailure
if errors.As(err, &df) {

View File

@@ -24,8 +24,6 @@ import (
"k8s.io/klog/v2"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics"
commonmetrics "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
)
@@ -38,7 +36,7 @@ var errConnCloseTimeout = errors.New("close timeout")
// conn is an implementation of net.Conn, where the data is transported
// over an established tunnel defined by a gRPC service ProxyService.
type conn struct {
stream client.ProxyService_ProxyClient
tunnel *grpcTunnel
connID int64
random int64
readCh chan []byte
@@ -65,11 +63,8 @@ func (c *conn) Write(data []byte) (n int, err error) {
klog.V(5).InfoS("[tracing] send req", "type", req.Type)
const segment = commonmetrics.SegmentFromClient
metrics.Metrics.ObservePacket(segment, req.Type)
err = c.stream.Send(req)
err = c.tunnel.Send(req)
if err != nil {
metrics.Metrics.ObserveStreamError(segment, err, req.Type)
return 0, err
}
return len(data), err
@@ -153,10 +148,7 @@ func (c *conn) Close() error {
klog.V(5).InfoS("[tracing] send req", "type", req.Type)
const segment = commonmetrics.SegmentFromClient
metrics.Metrics.ObservePacket(segment, req.Type)
if err := c.stream.Send(req); err != nil {
metrics.Metrics.ObserveStreamError(segment, err, req.Type)
if err := c.tunnel.Send(req); err != nil {
return err
}

File diff suppressed because it is too large Load Diff

View File

@@ -32,11 +32,6 @@ enum PacketType {
DIAL_CLS = 5;
}
enum Error {
EOF = 0;
// ...
}
message Packet {
PacketType type = 1;

View File

@@ -0,0 +1,150 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.12.4
// source: konnectivity-client/proto/client/client.proto
package client
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// ProxyServiceClient is the client API for ProxyService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type ProxyServiceClient interface {
Proxy(ctx context.Context, opts ...grpc.CallOption) (ProxyService_ProxyClient, error)
}
type proxyServiceClient struct {
cc grpc.ClientConnInterface
}
func NewProxyServiceClient(cc grpc.ClientConnInterface) ProxyServiceClient {
return &proxyServiceClient{cc}
}
func (c *proxyServiceClient) Proxy(ctx context.Context, opts ...grpc.CallOption) (ProxyService_ProxyClient, error) {
stream, err := c.cc.NewStream(ctx, &ProxyService_ServiceDesc.Streams[0], "/ProxyService/Proxy", opts...)
if err != nil {
return nil, err
}
x := &proxyServiceProxyClient{stream}
return x, nil
}
type ProxyService_ProxyClient interface {
Send(*Packet) error
Recv() (*Packet, error)
grpc.ClientStream
}
type proxyServiceProxyClient struct {
grpc.ClientStream
}
func (x *proxyServiceProxyClient) Send(m *Packet) error {
return x.ClientStream.SendMsg(m)
}
func (x *proxyServiceProxyClient) Recv() (*Packet, error) {
m := new(Packet)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// ProxyServiceServer is the server API for ProxyService service.
// All implementations should embed UnimplementedProxyServiceServer
// for forward compatibility
type ProxyServiceServer interface {
Proxy(ProxyService_ProxyServer) error
}
// UnimplementedProxyServiceServer should be embedded to have forward compatible implementations.
type UnimplementedProxyServiceServer struct {
}
func (UnimplementedProxyServiceServer) Proxy(ProxyService_ProxyServer) error {
return status.Errorf(codes.Unimplemented, "method Proxy not implemented")
}
// UnsafeProxyServiceServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ProxyServiceServer will
// result in compilation errors.
type UnsafeProxyServiceServer interface {
mustEmbedUnimplementedProxyServiceServer()
}
func RegisterProxyServiceServer(s grpc.ServiceRegistrar, srv ProxyServiceServer) {
s.RegisterService(&ProxyService_ServiceDesc, srv)
}
func _ProxyService_Proxy_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(ProxyServiceServer).Proxy(&proxyServiceProxyServer{stream})
}
type ProxyService_ProxyServer interface {
Send(*Packet) error
Recv() (*Packet, error)
grpc.ServerStream
}
type proxyServiceProxyServer struct {
grpc.ServerStream
}
func (x *proxyServiceProxyServer) Send(m *Packet) error {
return x.ServerStream.SendMsg(m)
}
func (x *proxyServiceProxyServer) Recv() (*Packet, error) {
m := new(Packet)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// ProxyService_ServiceDesc is the grpc.ServiceDesc for ProxyService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var ProxyService_ServiceDesc = grpc.ServiceDesc{
ServiceName: "ProxyService",
HandlerType: (*ProxyServiceServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "Proxy",
Handler: _ProxyService_Proxy_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "konnectivity-client/proto/client/client.proto",
}