Merge pull request #125528 from seans3/port-forward-beta
PortForward over Websockets Graduates to Beta
This commit is contained in:
		@@ -600,6 +600,7 @@ const (
 | 
			
		||||
	// owner: @seans3
 | 
			
		||||
	// kep: http://kep.k8s.io/4006
 | 
			
		||||
	// alpha: v1.30
 | 
			
		||||
	// beta: v1.31
 | 
			
		||||
	//
 | 
			
		||||
	// Enables PortForward to be proxied with a websocket client
 | 
			
		||||
	PortForwardWebsockets featuregate.Feature = "PortForwardWebsockets"
 | 
			
		||||
@@ -805,6 +806,7 @@ const (
 | 
			
		||||
 | 
			
		||||
	// owner: @seans3
 | 
			
		||||
	// kep: http://kep.k8s.io/4006
 | 
			
		||||
	// alpha: v1.29
 | 
			
		||||
	// beta: v1.30
 | 
			
		||||
	//
 | 
			
		||||
	// Enables StreamTranslator proxy to handle WebSockets upgrade requests for the
 | 
			
		||||
@@ -1104,7 +1106,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
 | 
			
		||||
 | 
			
		||||
	PodSchedulingReadiness: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // GA in 1.30; remove in 1.32
 | 
			
		||||
 | 
			
		||||
	PortForwardWebsockets: {Default: false, PreRelease: featuregate.Alpha},
 | 
			
		||||
	PortForwardWebsockets: {Default: true, PreRelease: featuregate.Beta},
 | 
			
		||||
 | 
			
		||||
	ProcMountType: {Default: false, PreRelease: featuregate.Alpha},
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -21,21 +21,21 @@ import (
 | 
			
		||||
	"k8s.io/klog/v2"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var _ httpstream.Dialer = &fallbackDialer{}
 | 
			
		||||
var _ httpstream.Dialer = &FallbackDialer{}
 | 
			
		||||
 | 
			
		||||
// fallbackDialer encapsulates a primary and secondary dialer, including
 | 
			
		||||
// FallbackDialer encapsulates a primary and secondary dialer, including
 | 
			
		||||
// the boolean function to determine if the primary dialer failed. Implements
 | 
			
		||||
// the httpstream.Dialer interface.
 | 
			
		||||
type fallbackDialer struct {
 | 
			
		||||
type FallbackDialer struct {
 | 
			
		||||
	primary        httpstream.Dialer
 | 
			
		||||
	secondary      httpstream.Dialer
 | 
			
		||||
	shouldFallback func(error) bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewFallbackDialer creates the fallbackDialer with the primary and secondary dialers,
 | 
			
		||||
// NewFallbackDialer creates the FallbackDialer with the primary and secondary dialers,
 | 
			
		||||
// as well as the boolean function to determine if the primary dialer failed.
 | 
			
		||||
func NewFallbackDialer(primary, secondary httpstream.Dialer, shouldFallback func(error) bool) httpstream.Dialer {
 | 
			
		||||
	return &fallbackDialer{
 | 
			
		||||
	return &FallbackDialer{
 | 
			
		||||
		primary:        primary,
 | 
			
		||||
		secondary:      secondary,
 | 
			
		||||
		shouldFallback: shouldFallback,
 | 
			
		||||
@@ -47,7 +47,7 @@ func NewFallbackDialer(primary, secondary httpstream.Dialer, shouldFallback func
 | 
			
		||||
// httstream.Connection and the negotiated protocol version accepted. If the initial
 | 
			
		||||
// primary dialer fails, this function attempts the secondary dialer. Returns an error
 | 
			
		||||
// if one occurs.
 | 
			
		||||
func (f *fallbackDialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
 | 
			
		||||
func (f *FallbackDialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
 | 
			
		||||
	conn, version, err := f.primary.Dial(protocols...)
 | 
			
		||||
	if err != nil && f.shouldFallback(err) {
 | 
			
		||||
		klog.V(4).Infof("fallback to secondary dialer from primary dialer err: %v", err)
 | 
			
		||||
 
 | 
			
		||||
@@ -136,20 +136,28 @@ type defaultPortForwarder struct {
 | 
			
		||||
	genericiooptions.IOStreams
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error {
 | 
			
		||||
func createDialer(method string, url *url.URL, opts PortForwardOptions) (httpstream.Dialer, error) {
 | 
			
		||||
	transport, upgrader, err := spdy.RoundTripperFor(opts.Config)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
 | 
			
		||||
	if cmdutil.PortForwardWebsockets.IsEnabled() {
 | 
			
		||||
	if !cmdutil.PortForwardWebsockets.IsDisabled() {
 | 
			
		||||
		tunnelingDialer, err := portforward.NewSPDYOverWebsocketDialer(url, opts.Config)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
		// First attempt tunneling (websocket) dialer, then fallback to spdy dialer.
 | 
			
		||||
		dialer = portforward.NewFallbackDialer(tunnelingDialer, dialer, httpstream.IsUpgradeFailure)
 | 
			
		||||
	}
 | 
			
		||||
	return dialer, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error {
 | 
			
		||||
	dialer, err := createDialer(method, url, opts)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	fw, err := portforward.NewOnAddresses(dialer, opts.Address, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.Out, f.ErrOut)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
 
 | 
			
		||||
@@ -32,7 +32,9 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/intstr"
 | 
			
		||||
	"k8s.io/cli-runtime/pkg/genericiooptions"
 | 
			
		||||
	"k8s.io/client-go/rest/fake"
 | 
			
		||||
	"k8s.io/client-go/tools/portforward"
 | 
			
		||||
	cmdtesting "k8s.io/kubectl/pkg/cmd/testing"
 | 
			
		||||
	cmdutil "k8s.io/kubectl/pkg/cmd/util"
 | 
			
		||||
	"k8s.io/kubectl/pkg/scheme"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@@ -983,3 +985,38 @@ func TestCheckUDPPort(t *testing.T) {
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestCreateDialer(t *testing.T) {
 | 
			
		||||
	url, err := url.Parse("http://localhost:8080/index.html")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unable to parse test url: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	config := cmdtesting.DefaultClientConfig()
 | 
			
		||||
	opts := PortForwardOptions{Config: config}
 | 
			
		||||
	// First, ensure that no environment variable creates the fallback dialer.
 | 
			
		||||
	dialer, err := createDialer("GET", url, opts)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unable to create dialer: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if _, isFallback := dialer.(*portforward.FallbackDialer); !isFallback {
 | 
			
		||||
		t.Errorf("expected fallback dialer, got %#v", dialer)
 | 
			
		||||
	}
 | 
			
		||||
	// Next, check turning on feature flag explicitly also creates fallback dialer.
 | 
			
		||||
	t.Setenv(string(cmdutil.PortForwardWebsockets), "true")
 | 
			
		||||
	dialer, err = createDialer("GET", url, opts)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unable to create dialer: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if _, isFallback := dialer.(*portforward.FallbackDialer); !isFallback {
 | 
			
		||||
		t.Errorf("expected fallback dialer, got %#v", dialer)
 | 
			
		||||
	}
 | 
			
		||||
	// Finally, check explicit disabling does NOT create the fallback dialer.
 | 
			
		||||
	t.Setenv(string(cmdutil.PortForwardWebsockets), "false")
 | 
			
		||||
	dialer, err = createDialer("GET", url, opts)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unable to create dialer: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if _, isFallback := dialer.(*portforward.FallbackDialer); isFallback {
 | 
			
		||||
		t.Errorf("expected fallback dialer, got %#v", dialer)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user