Merge pull request #126231 from seans3/websocket-https-proxy-fix
Falls back to SPDY for gorilla/websocket https proxy error
This commit is contained in:
		@@ -116,6 +116,15 @@ func IsUpgradeFailure(err error) bool {
 | 
				
			|||||||
	return errors.As(err, &upgradeErr)
 | 
						return errors.As(err, &upgradeErr)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// isHTTPSProxyError returns true if error is Gorilla/Websockets HTTPS Proxy dial error;
 | 
				
			||||||
 | 
					// false otherwise (see https://github.com/kubernetes/kubernetes/issues/126134).
 | 
				
			||||||
 | 
					func IsHTTPSProxyError(err error) bool {
 | 
				
			||||||
 | 
						if err == nil {
 | 
				
			||||||
 | 
							return false
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return strings.Contains(err.Error(), "proxy: unknown scheme: https")
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// IsUpgradeRequest returns true if the given request is a connection upgrade request
 | 
					// IsUpgradeRequest returns true if the given request is a connection upgrade request
 | 
				
			||||||
func IsUpgradeRequest(req *http.Request) bool {
 | 
					func IsUpgradeRequest(req *http.Request) bool {
 | 
				
			||||||
	for _, h := range req.Header[http.CanonicalHeaderKey(HeaderConnection)] {
 | 
						for _, h := range req.Header[http.CanonicalHeaderKey(HeaderConnection)] {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -168,3 +168,32 @@ func TestIsUpgradeFailureError(t *testing.T) {
 | 
				
			|||||||
		})
 | 
							})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestIsHTTPSProxyError(t *testing.T) {
 | 
				
			||||||
 | 
						testCases := map[string]struct {
 | 
				
			||||||
 | 
							err      error
 | 
				
			||||||
 | 
							expected bool
 | 
				
			||||||
 | 
						}{
 | 
				
			||||||
 | 
							"nil error should return false": {
 | 
				
			||||||
 | 
								err:      nil,
 | 
				
			||||||
 | 
								expected: false,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							"Not HTTPS proxy error should return false": {
 | 
				
			||||||
 | 
								err:      errors.New("this is not an upgrade error"),
 | 
				
			||||||
 | 
								expected: false,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							"HTTPS proxy error should return true": {
 | 
				
			||||||
 | 
								err:      errors.New("proxy: unknown scheme: https"),
 | 
				
			||||||
 | 
								expected: true,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for name, test := range testCases {
 | 
				
			||||||
 | 
							t.Run(name, func(t *testing.T) {
 | 
				
			||||||
 | 
								actual := IsHTTPSProxyError(test.err)
 | 
				
			||||||
 | 
								if test.expected != actual {
 | 
				
			||||||
 | 
									t.Errorf("expected HTTPS proxy error %t, got %t", test.expected, actual)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -17,10 +17,12 @@ limitations under the License.
 | 
				
			|||||||
package portforward
 | 
					package portforward
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/stretchr/testify/assert"
 | 
						"github.com/stretchr/testify/assert"
 | 
				
			||||||
 | 
						"github.com/stretchr/testify/require"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/httpstream"
 | 
						"k8s.io/apimachinery/pkg/util/httpstream"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -36,7 +38,7 @@ func TestFallbackDialer(t *testing.T) {
 | 
				
			|||||||
	assert.True(t, primary.dialed, "no fallback; primary should have dialed")
 | 
						assert.True(t, primary.dialed, "no fallback; primary should have dialed")
 | 
				
			||||||
	assert.False(t, secondary.dialed, "no fallback; secondary should *not* have dialed")
 | 
						assert.False(t, secondary.dialed, "no fallback; secondary should *not* have dialed")
 | 
				
			||||||
	assert.Equal(t, primaryProtocol, negotiated, "primary negotiated protocol returned")
 | 
						assert.Equal(t, primaryProtocol, negotiated, "primary negotiated protocol returned")
 | 
				
			||||||
	assert.Nil(t, err, "error from primary dialer should be nil")
 | 
						require.NoError(t, err, "error from primary dialer should be nil")
 | 
				
			||||||
	// If primary dialer error is upgrade error, then fallback returning secondary dial response.
 | 
						// If primary dialer error is upgrade error, then fallback returning secondary dial response.
 | 
				
			||||||
	primary = &fakeDialer{dialed: false, negotiatedProtocol: primaryProtocol, err: &httpstream.UpgradeFailureError{}}
 | 
						primary = &fakeDialer{dialed: false, negotiatedProtocol: primaryProtocol, err: &httpstream.UpgradeFailureError{}}
 | 
				
			||||||
	secondary = &fakeDialer{dialed: false, negotiatedProtocol: secondaryProtocol}
 | 
						secondary = &fakeDialer{dialed: false, negotiatedProtocol: secondaryProtocol}
 | 
				
			||||||
@@ -45,7 +47,18 @@ func TestFallbackDialer(t *testing.T) {
 | 
				
			|||||||
	assert.True(t, primary.dialed, "fallback; primary should have dialed")
 | 
						assert.True(t, primary.dialed, "fallback; primary should have dialed")
 | 
				
			||||||
	assert.True(t, secondary.dialed, "fallback; secondary should have dialed")
 | 
						assert.True(t, secondary.dialed, "fallback; secondary should have dialed")
 | 
				
			||||||
	assert.Equal(t, secondaryProtocol, negotiated, "negotiated protocol is from secondary dialer")
 | 
						assert.Equal(t, secondaryProtocol, negotiated, "negotiated protocol is from secondary dialer")
 | 
				
			||||||
	assert.Nil(t, err, "error from secondary dialer should be nil")
 | 
						require.NoError(t, err, "error from secondary dialer should be nil")
 | 
				
			||||||
 | 
						// If primary dialer error is https proxy dialing error, then fallback returning secondary dial response.
 | 
				
			||||||
 | 
						primary = &fakeDialer{negotiatedProtocol: primaryProtocol, err: errors.New("proxy: unknown scheme: https")}
 | 
				
			||||||
 | 
						secondary = &fakeDialer{negotiatedProtocol: secondaryProtocol}
 | 
				
			||||||
 | 
						fallbackDialer = NewFallbackDialer(primary, secondary, func(err error) bool {
 | 
				
			||||||
 | 
							return httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err)
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						_, negotiated, err = fallbackDialer.Dial(protocols...)
 | 
				
			||||||
 | 
						assert.True(t, primary.dialed, "fallback; primary should have dialed")
 | 
				
			||||||
 | 
						assert.True(t, secondary.dialed, "fallback; secondary should have dialed")
 | 
				
			||||||
 | 
						assert.Equal(t, secondaryProtocol, negotiated, "negotiated protocol is from secondary dialer")
 | 
				
			||||||
 | 
						require.NoError(t, err, "error from secondary dialer should be nil")
 | 
				
			||||||
	// If primary dialer returns non-upgrade error, then primary error is returned.
 | 
						// If primary dialer returns non-upgrade error, then primary error is returned.
 | 
				
			||||||
	nonUpgradeErr := fmt.Errorf("This is a non-upgrade error")
 | 
						nonUpgradeErr := fmt.Errorf("This is a non-upgrade error")
 | 
				
			||||||
	primary = &fakeDialer{dialed: false, err: nonUpgradeErr}
 | 
						primary = &fakeDialer{dialed: false, err: nonUpgradeErr}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -20,15 +20,19 @@ import (
 | 
				
			|||||||
	"bytes"
 | 
						"bytes"
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"crypto/rand"
 | 
						"crypto/rand"
 | 
				
			||||||
 | 
						"crypto/tls"
 | 
				
			||||||
	"io"
 | 
						"io"
 | 
				
			||||||
	"net/http"
 | 
						"net/http"
 | 
				
			||||||
	"net/http/httptest"
 | 
						"net/http/httptest"
 | 
				
			||||||
	"net/url"
 | 
						"net/url"
 | 
				
			||||||
 | 
						"sync/atomic"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/stretchr/testify/assert"
 | 
						"github.com/stretchr/testify/assert"
 | 
				
			||||||
	"github.com/stretchr/testify/require"
 | 
						"github.com/stretchr/testify/require"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/util/httpstream"
 | 
				
			||||||
 | 
						utilnettesting "k8s.io/apimachinery/pkg/util/net/testing"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/remotecommand"
 | 
						"k8s.io/apimachinery/pkg/util/remotecommand"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						"k8s.io/apimachinery/pkg/util/wait"
 | 
				
			||||||
	"k8s.io/client-go/rest"
 | 
						"k8s.io/client-go/rest"
 | 
				
			||||||
@@ -225,3 +229,175 @@ func TestFallbackClient_PrimaryAndSecondaryFail(t *testing.T) {
 | 
				
			|||||||
		require.Error(t, err)
 | 
							require.Error(t, err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// localhostCert was generated from crypto/tls/generate_cert.go with the following command:
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					//	go run generate_cert.go  --rsa-bits 2048 --host 127.0.0.1,::1,example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h
 | 
				
			||||||
 | 
					var localhostCert = []byte(`-----BEGIN CERTIFICATE-----
 | 
				
			||||||
 | 
					MIIDGTCCAgGgAwIBAgIRALL5AZcefF4kkYV1SEG6YrMwDQYJKoZIhvcNAQELBQAw
 | 
				
			||||||
 | 
					EjEQMA4GA1UEChMHQWNtZSBDbzAgFw03MDAxMDEwMDAwMDBaGA8yMDg0MDEyOTE2
 | 
				
			||||||
 | 
					MDAwMFowEjEQMA4GA1UEChMHQWNtZSBDbzCCASIwDQYJKoZIhvcNAQEBBQADggEP
 | 
				
			||||||
 | 
					ADCCAQoCggEBALQ/FHcyVwdFHxARbbD2KBtDUT7Eni+8ioNdjtGcmtXqBv45EC1C
 | 
				
			||||||
 | 
					JOqqGJTroFGJ6Q9kQIZ9FqH5IJR2fOOJD9kOTueG4Vt1JY1rj1Kbpjefu8XleZ5L
 | 
				
			||||||
 | 
					SBwIWVnN/lEsEbuKmj7N2gLt5AH3zMZiBI1mg1u9Z5ZZHYbCiTpBrwsq6cTlvR9g
 | 
				
			||||||
 | 
					dyo1YkM5hRESCzsrL0aUByoo0qRMD8ZsgANJwgsiO0/M6idbxDwv1BnGwGmRYvOE
 | 
				
			||||||
 | 
					Hxpy3v0Jg7GJYrvnpnifJTs4nw91N5X9pXxR7FFzi/6HTYDWRljvTb0w6XciKYAz
 | 
				
			||||||
 | 
					bWZ0+cJr5F7wB7ovlbm7HrQIR7z7EIIu2d8CAwEAAaNoMGYwDgYDVR0PAQH/BAQD
 | 
				
			||||||
 | 
					AgKkMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA8GA1UdEwEB/wQFMAMBAf8wLgYDVR0R
 | 
				
			||||||
 | 
					BCcwJYILZXhhbXBsZS5jb22HBH8AAAGHEAAAAAAAAAAAAAAAAAAAAAEwDQYJKoZI
 | 
				
			||||||
 | 
					hvcNAQELBQADggEBAFPPWopNEJtIA2VFAQcqN6uJK+JVFOnjGRoCrM6Xgzdm0wxY
 | 
				
			||||||
 | 
					XCGjsxY5dl+V7KzdGqu858rCaq5osEBqypBpYAnS9C38VyCDA1vPS1PsN8SYv48z
 | 
				
			||||||
 | 
					DyBwj+7R2qar0ADBhnhWxvYO9M72lN/wuCqFKYMeFSnJdQLv3AsrrHe9lYqOa36s
 | 
				
			||||||
 | 
					8wxSwVTFTYXBzljPEnSaaJMPqFD8JXaZK1ryJPkO5OsCNQNGtatNiWAf3DcmwHAT
 | 
				
			||||||
 | 
					MGYMzP0u4nw47aRz9shB8w+taPKHx2BVwE1m/yp3nHVioOjXqA1fwRQVGclCJSH1
 | 
				
			||||||
 | 
					D2iq3hWVHRENgjTjANBPICLo9AZ4JfN6PH19mnU=
 | 
				
			||||||
 | 
					-----END CERTIFICATE-----`)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// localhostKey is the private key for localhostCert.
 | 
				
			||||||
 | 
					var localhostKey = []byte(`-----BEGIN RSA PRIVATE KEY-----
 | 
				
			||||||
 | 
					MIIEogIBAAKCAQEAtD8UdzJXB0UfEBFtsPYoG0NRPsSeL7yKg12O0Zya1eoG/jkQ
 | 
				
			||||||
 | 
					LUIk6qoYlOugUYnpD2RAhn0WofkglHZ844kP2Q5O54bhW3UljWuPUpumN5+7xeV5
 | 
				
			||||||
 | 
					nktIHAhZWc3+USwRu4qaPs3aAu3kAffMxmIEjWaDW71nllkdhsKJOkGvCyrpxOW9
 | 
				
			||||||
 | 
					H2B3KjViQzmFERILOysvRpQHKijSpEwPxmyAA0nCCyI7T8zqJ1vEPC/UGcbAaZFi
 | 
				
			||||||
 | 
					84QfGnLe/QmDsYliu+emeJ8lOzifD3U3lf2lfFHsUXOL/odNgNZGWO9NvTDpdyIp
 | 
				
			||||||
 | 
					gDNtZnT5wmvkXvAHui+VubsetAhHvPsQgi7Z3wIDAQABAoIBAGmw93IxjYCQ0ncc
 | 
				
			||||||
 | 
					kSKMJNZfsdtJdaxuNRZ0nNNirhQzR2h403iGaZlEpmdkhzxozsWcto1l+gh+SdFk
 | 
				
			||||||
 | 
					bTUK4MUZM8FlgO2dEqkLYh5BcMT7ICMZvSfJ4v21E5eqR68XVUqQKoQbNvQyxFk3
 | 
				
			||||||
 | 
					EddeEGdNrkb0GDK8DKlBlzAW5ep4gjG85wSTjR+J+muUv3R0BgLBFSuQnIDM/IMB
 | 
				
			||||||
 | 
					LWqsja/QbtB7yppe7jL5u8UCFdZG8BBKT9fcvFIu5PRLO3MO0uOI7LTc8+W1Xm23
 | 
				
			||||||
 | 
					uv+j3SY0+v+6POjK0UlJFFi/wkSPTFIfrQO1qFBkTDQHhQ6q/7GnILYYOiGbIRg2
 | 
				
			||||||
 | 
					NNuP52ECgYEAzXEoy50wSYh8xfFaBuxbm3ruuG2W49jgop7ZfoFrPWwOQKAZS441
 | 
				
			||||||
 | 
					VIwV4+e5IcA6KkuYbtGSdTYqK1SMkgnUyD/VevwAqH5TJoEIGu0pDuKGwVuwqioZ
 | 
				
			||||||
 | 
					frCIAV5GllKyUJ55VZNbRr2vY2fCsWbaCSCHETn6C16DNuTCe5C0JBECgYEA4JqY
 | 
				
			||||||
 | 
					5GpNbMG8fOt4H7hU0Fbm2yd6SHJcQ3/9iimef7xG6ajxsYrIhg1ft+3IPHMjVI0+
 | 
				
			||||||
 | 
					9brwHDnWg4bOOx/VO4VJBt6Dm/F33bndnZRkuIjfSNpLM51P+EnRdaFVHOJHwKqx
 | 
				
			||||||
 | 
					uF69kihifCAG7YATgCveeXImzBUSyZUz9UrETu8CgYARNBimdFNG1RcdvEg9rC0/
 | 
				
			||||||
 | 
					p9u1tfecvNySwZqU7WF9kz7eSonTueTdX521qAHowaAdSpdJMGODTTXaywm6cPhQ
 | 
				
			||||||
 | 
					jIfj9JZZhbqQzt1O4+08Qdvm9TamCUB5S28YLjza+bHU7nBaqixKkDfPqzCyilpX
 | 
				
			||||||
 | 
					yVGGL8SwjwmN3zop/sQXAQKBgC0JMsESQ6YcDsRpnrOVjYQc+LtW5iEitTdfsaID
 | 
				
			||||||
 | 
					iGGKihmOI7B66IxgoCHMTws39wycKdSyADVYr5e97xpR3rrJlgQHmBIrz+Iow7Q2
 | 
				
			||||||
 | 
					LiAGaec8xjl6QK/DdXmFuQBKqyKJ14rljFODP4QuE9WJid94bGqjpf3j99ltznZP
 | 
				
			||||||
 | 
					4J8HAoGAJb4eb4lu4UGwifDzqfAPzLGCoi0fE1/hSx34lfuLcc1G+LEu9YDKoOVJ
 | 
				
			||||||
 | 
					9suOh0b5K/bfEy9KrVMBBriduvdaERSD8S3pkIQaitIz0B029AbE4FLFf9lKQpP2
 | 
				
			||||||
 | 
					KR8NJEkK99Vh/tew6jAMll70xFrE7aF8VLXJVE7w4sQzuvHxl9Q=
 | 
				
			||||||
 | 
					-----END RSA PRIVATE KEY-----
 | 
				
			||||||
 | 
					`)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// See (https://github.com/kubernetes/kubernetes/issues/126134).
 | 
				
			||||||
 | 
					func TestFallbackClient_WebSocketHTTPSProxyCausesSPDYFallback(t *testing.T) {
 | 
				
			||||||
 | 
						cert, err := tls.X509KeyPair(localhostCert, localhostKey)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Errorf("https (valid hostname): proxy_test: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var proxyCalled atomic.Int64
 | 
				
			||||||
 | 
						proxyHandler := utilnettesting.NewHTTPProxyHandler(t, func(req *http.Request) bool {
 | 
				
			||||||
 | 
							proxyCalled.Add(1)
 | 
				
			||||||
 | 
							return true
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						defer proxyHandler.Wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						proxyServer := httptest.NewUnstartedServer(proxyHandler)
 | 
				
			||||||
 | 
						proxyServer.TLS = &tls.Config{Certificates: []tls.Certificate{cert}}
 | 
				
			||||||
 | 
						proxyServer.StartTLS()
 | 
				
			||||||
 | 
						defer proxyServer.Close() //nolint:errcheck
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						proxyLocation, err := url.Parse(proxyServer.URL)
 | 
				
			||||||
 | 
						require.NoError(t, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Create fake SPDY server. Copy received STDIN data back onto STDOUT stream.
 | 
				
			||||||
 | 
						spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
 | 
				
			||||||
 | 
							var stdin, stdout bytes.Buffer
 | 
				
			||||||
 | 
							ctx, err := createHTTPStreams(w, req, &StreamOptions{
 | 
				
			||||||
 | 
								Stdin:  &stdin,
 | 
				
			||||||
 | 
								Stdout: &stdout,
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								w.WriteHeader(http.StatusForbidden)
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							defer ctx.conn.Close() //nolint:errcheck
 | 
				
			||||||
 | 
							_, err = io.Copy(ctx.stdoutStream, ctx.stdinStream)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								t.Fatalf("error copying STDIN to STDOUT: %v", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}))
 | 
				
			||||||
 | 
						defer spdyServer.Close() //nolint:errcheck
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						backendLocation, err := url.Parse(spdyServer.URL)
 | 
				
			||||||
 | 
						require.NoError(t, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						clientConfig := &rest.Config{
 | 
				
			||||||
 | 
							Host:            spdyServer.URL,
 | 
				
			||||||
 | 
							TLSClientConfig: rest.TLSClientConfig{CAData: localhostCert},
 | 
				
			||||||
 | 
							Proxy: func(req *http.Request) (*url.URL, error) {
 | 
				
			||||||
 | 
								return proxyLocation, nil
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Websocket with https proxy will fail in dialing (falling back to SPDY).
 | 
				
			||||||
 | 
						websocketExecutor, err := NewWebSocketExecutor(clientConfig, "GET", backendLocation.String())
 | 
				
			||||||
 | 
						require.NoError(t, err)
 | 
				
			||||||
 | 
						spdyExecutor, err := NewSPDYExecutor(clientConfig, "POST", backendLocation)
 | 
				
			||||||
 | 
						require.NoError(t, err)
 | 
				
			||||||
 | 
						// Fallback to spdyExecutor with websocket https proxy error; spdyExecutor succeeds against fake spdy server.
 | 
				
			||||||
 | 
						sawHTTPSProxyError := false
 | 
				
			||||||
 | 
						exec, err := NewFallbackExecutor(websocketExecutor, spdyExecutor, func(err error) bool {
 | 
				
			||||||
 | 
							if httpstream.IsUpgradeFailure(err) {
 | 
				
			||||||
 | 
								t.Errorf("saw upgrade failure: %v", err)
 | 
				
			||||||
 | 
								return true
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if httpstream.IsHTTPSProxyError(err) {
 | 
				
			||||||
 | 
								sawHTTPSProxyError = true
 | 
				
			||||||
 | 
								t.Logf("saw https proxy error: %v", err)
 | 
				
			||||||
 | 
								return true
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return false
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						require.NoError(t, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Generate random data, and set it up to stream on STDIN. The data will be
 | 
				
			||||||
 | 
						// returned on the STDOUT buffer.
 | 
				
			||||||
 | 
						randomSize := 1024 * 1024
 | 
				
			||||||
 | 
						randomData := make([]byte, randomSize)
 | 
				
			||||||
 | 
						if _, err := rand.Read(randomData); err != nil {
 | 
				
			||||||
 | 
							t.Errorf("unexpected error reading random data: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						var stdout bytes.Buffer
 | 
				
			||||||
 | 
						options := &StreamOptions{
 | 
				
			||||||
 | 
							Stdin:  bytes.NewReader(randomData),
 | 
				
			||||||
 | 
							Stdout: &stdout,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						errorChan := make(chan error)
 | 
				
			||||||
 | 
						go func() {
 | 
				
			||||||
 | 
							errorChan <- exec.StreamWithContext(context.Background(), *options)
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-time.After(wait.ForeverTestTimeout):
 | 
				
			||||||
 | 
							t.Fatalf("expect stream to be closed after connection is closed.")
 | 
				
			||||||
 | 
						case err := <-errorChan:
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								t.Errorf("unexpected error")
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						data, err := io.ReadAll(bytes.NewReader(stdout.Bytes()))
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Errorf("error reading the stream: %v", err)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Check the random data sent on STDIN was the same returned on STDOUT.
 | 
				
			||||||
 | 
						if !bytes.Equal(randomData, data) {
 | 
				
			||||||
 | 
							t.Errorf("unexpected data received: %d sent: %d", len(data), len(randomData))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Ensure the https proxy error was observed
 | 
				
			||||||
 | 
						if !sawHTTPSProxyError {
 | 
				
			||||||
 | 
							t.Errorf("expected to see https proxy error")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Ensure the proxy was called once
 | 
				
			||||||
 | 
						if e, a := int64(1), proxyCalled.Load(); e != a {
 | 
				
			||||||
 | 
							t.Errorf("expected %d proxy call, got %d", e, a)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -39,6 +39,7 @@ import (
 | 
				
			|||||||
	v1 "k8s.io/api/core/v1"
 | 
						v1 "k8s.io/api/core/v1"
 | 
				
			||||||
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
						apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/util/httpstream"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
 | 
						"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/remotecommand"
 | 
						"k8s.io/apimachinery/pkg/util/remotecommand"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						"k8s.io/apimachinery/pkg/util/wait"
 | 
				
			||||||
@@ -1340,3 +1341,39 @@ func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *opti
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	return wsStreams, nil
 | 
						return wsStreams, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// See (https://github.com/kubernetes/kubernetes/issues/126134).
 | 
				
			||||||
 | 
					func TestWebSocketClient_HTTPSProxyErrorExpected(t *testing.T) {
 | 
				
			||||||
 | 
						urlStr := "http://127.0.0.1/never-used" + "?" + "stdin=true" + "&" + "stdout=true"
 | 
				
			||||||
 | 
						websocketLocation, err := url.Parse(urlStr)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("Unable to parse WebSocket server URL: %s", urlStr)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// proxy url with https scheme will trigger websocket dialing error.
 | 
				
			||||||
 | 
						httpsProxyFunc := func(req *http.Request) (*url.URL, error) { return url.Parse("https://127.0.0.1") }
 | 
				
			||||||
 | 
						exec, err := NewWebSocketExecutor(&rest.Config{Host: websocketLocation.Host, Proxy: httpsProxyFunc}, "GET", urlStr)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							t.Errorf("unexpected error creating websocket executor: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						var stdout bytes.Buffer
 | 
				
			||||||
 | 
						options := &StreamOptions{
 | 
				
			||||||
 | 
							Stdout: &stdout,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						errorChan := make(chan error)
 | 
				
			||||||
 | 
						go func() {
 | 
				
			||||||
 | 
							// Start the streaming on the WebSocket "exec" client.
 | 
				
			||||||
 | 
							errorChan <- exec.StreamWithContext(context.Background(), *options)
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-time.After(wait.ForeverTestTimeout):
 | 
				
			||||||
 | 
							t.Fatalf("expect stream to be closed after connection is closed.")
 | 
				
			||||||
 | 
						case err := <-errorChan:
 | 
				
			||||||
 | 
							if err == nil {
 | 
				
			||||||
 | 
								t.Errorf("expected error but received none")
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if !httpstream.IsHTTPSProxyError(err) {
 | 
				
			||||||
 | 
								t.Errorf("expected https proxy error, got (%s)", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -184,7 +184,9 @@ func createExecutor(url *url.URL, config *restclient.Config) (remotecommand.Exec
 | 
				
			|||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return nil, err
 | 
								return nil, err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure)
 | 
							exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, func(err error) bool {
 | 
				
			||||||
 | 
								return httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err)
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return nil, err
 | 
								return nil, err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -147,7 +147,9 @@ func createExecutor(url *url.URL, config *restclient.Config) (remotecommand.Exec
 | 
				
			|||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return nil, err
 | 
								return nil, err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, httpstream.IsUpgradeFailure)
 | 
							exec, err = remotecommand.NewFallbackExecutor(websocketExec, exec, func(err error) bool {
 | 
				
			||||||
 | 
								return httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err)
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return nil, err
 | 
								return nil, err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -148,7 +148,9 @@ func createDialer(method string, url *url.URL, opts PortForwardOptions) (httpstr
 | 
				
			|||||||
			return nil, err
 | 
								return nil, err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		// First attempt tunneling (websocket) dialer, then fallback to spdy dialer.
 | 
							// First attempt tunneling (websocket) dialer, then fallback to spdy dialer.
 | 
				
			||||||
		dialer = portforward.NewFallbackDialer(tunnelingDialer, dialer, httpstream.IsUpgradeFailure)
 | 
							dialer = portforward.NewFallbackDialer(tunnelingDialer, dialer, func(err error) bool {
 | 
				
			||||||
 | 
								return httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err)
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return dialer, nil
 | 
						return dialer, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user