Merge pull request #102050 from pohly/fix/deflake-metrics-proxy

remove metrics proxy
This commit is contained in:
Kubernetes Prow Robot
2021-07-02 12:30:10 -07:00
committed by GitHub
23 changed files with 378 additions and 300 deletions

View File

@@ -296,7 +296,7 @@ func (f *Framework) BeforeEach() {
gatherMetricsAfterTest := TestContext.GatherMetricsAfterTest == "true" || TestContext.GatherMetricsAfterTest == "master"
if gatherMetricsAfterTest && TestContext.IncludeClusterAutoscalerMetrics {
grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, !ProviderIs("kubemark"), false, false, false, TestContext.IncludeClusterAutoscalerMetrics, false)
grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, f.ClientConfig(), !ProviderIs("kubemark"), false, false, false, TestContext.IncludeClusterAutoscalerMetrics, false)
if err != nil {
Logf("Failed to create MetricsGrabber (skipping ClusterAutoscaler metrics gathering before test): %v", err)
} else {
@@ -449,7 +449,7 @@ func (f *Framework) AfterEach() {
ginkgo.By("Gathering metrics")
// Grab apiserver, scheduler, controller-manager metrics and (optionally) nodes' kubelet metrics.
grabMetricsFromKubelets := TestContext.GatherMetricsAfterTest != "master" && !ProviderIs("kubemark")
grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, grabMetricsFromKubelets, true, true, true, TestContext.IncludeClusterAutoscalerMetrics, false)
grabber, err := e2emetrics.NewMetricsGrabber(f.ClientSet, f.KubemarkExternalClusterClientSet, f.ClientConfig(), grabMetricsFromKubelets, true, true, true, TestContext.IncludeClusterAutoscalerMetrics, false)
if err != nil {
Logf("Failed to create MetricsGrabber (skipping metrics gathering): %v", err)
} else {

View File

@@ -139,7 +139,7 @@ func getKubeletMetricsFromNode(c clientset.Interface, nodeName string) (KubeletM
if c == nil {
return GrabKubeletMetricsWithoutProxy(nodeName, "/metrics")
}
grabber, err := NewMetricsGrabber(c, nil, true, false, false, false, false, false)
grabber, err := NewMetricsGrabber(c, nil, nil, true, false, false, false, false, false)
if err != nil {
return KubeletMetrics{}, err
}

View File

@@ -18,15 +18,19 @@ package metrics
import (
"context"
"errors"
"fmt"
"net"
"regexp"
"sync"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@@ -37,11 +41,16 @@ const (
kubeSchedulerPort = 10259
// kubeControllerManagerPort is the default port for the controller manager status server.
kubeControllerManagerPort = 10257
metricsProxyPod = "metrics-proxy"
// snapshotControllerPort is the port for the snapshot controller
snapshotControllerPort = 9102
)
// MetricsGrabbingDisabledError is an error that is wrapped by the
// different MetricsGrabber.Wrap functions when metrics grabbing is
// not supported. Tests that check metrics data should then skip
// the check.
var MetricsGrabbingDisabledError = errors.New("metrics grabbing disabled")
// Collection is metrics collection of components
type Collection struct {
APIServerMetrics APIServerMetrics
@@ -56,6 +65,7 @@ type Collection struct {
type Grabber struct {
client clientset.Interface
externalClient clientset.Interface
config *rest.Config
grabFromAPIServer bool
grabFromControllerManager bool
grabFromKubelets bool
@@ -70,8 +80,15 @@ type Grabber struct {
waitForSnapshotControllerReadyOnce sync.Once
}
// NewMetricsGrabber returns new metrics which are initialized.
func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, kubelets bool, scheduler bool, controllers bool, apiServer bool, clusterAutoscaler bool, snapshotController bool) (*Grabber, error) {
// NewMetricsGrabber prepares for grabbing metrics data from several different
// components. It should be called when those components are running because
// it needs to communicate with them to determine for which components
// metrics data can be retrieved.
//
// Collecting metrics data is an optional debug feature. Not all clusters will
// support it. If disabled for a component, the corresponding Grab function
// will immediately return an error derived from MetricsGrabbingDisabledError.
func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, config *rest.Config, kubelets bool, scheduler bool, controllers bool, apiServer bool, clusterAutoscaler bool, snapshotController bool) (*Grabber, error) {
kubeScheduler := ""
kubeControllerManager := ""
@@ -81,6 +98,10 @@ func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, kubelets b
regKubeControllerManager := regexp.MustCompile("kube-controller-manager-.*")
regSnapshotController := regexp.MustCompile("volume-snapshot-controller.*")
if (scheduler || controllers) && config == nil {
return nil, errors.New("a rest config is required for grabbing kube-controller and kube-controller-manager metrics")
}
podList, err := c.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
@@ -102,37 +123,48 @@ func NewMetricsGrabber(c clientset.Interface, ec clientset.Interface, kubelets b
break
}
}
if kubeScheduler == "" {
scheduler = false
klog.Warningf("Can't find kube-scheduler pod. Grabbing metrics from kube-scheduler is disabled.")
}
if kubeControllerManager == "" {
controllers = false
klog.Warningf("Can't find kube-controller-manager pod. Grabbing metrics from kube-controller-manager is disabled.")
}
if snapshotControllerManager == "" {
snapshotController = false
klog.Warningf("Can't find snapshot-controller pod. Grabbing metrics from snapshot-controller is disabled.")
}
if ec == nil {
if clusterAutoscaler && ec == nil {
klog.Warningf("Did not receive an external client interface. Grabbing metrics from ClusterAutoscaler is disabled.")
}
return &Grabber{
client: c,
externalClient: ec,
config: config,
grabFromAPIServer: apiServer,
grabFromControllerManager: controllers,
grabFromControllerManager: checkPodDebugHandlers(c, controllers, "kube-controller-manager", kubeControllerManager),
grabFromKubelets: kubelets,
grabFromScheduler: scheduler,
grabFromScheduler: checkPodDebugHandlers(c, scheduler, "kube-scheduler", kubeScheduler),
grabFromClusterAutoscaler: clusterAutoscaler,
grabFromSnapshotController: snapshotController,
grabFromSnapshotController: checkPodDebugHandlers(c, snapshotController, "snapshot-controller", snapshotControllerManager),
kubeScheduler: kubeScheduler,
kubeControllerManager: kubeControllerManager,
snapshotController: snapshotControllerManager,
}, nil
}
func checkPodDebugHandlers(c clientset.Interface, requested bool, component, podName string) bool {
if !requested {
return false
}
if podName == "" {
klog.Warningf("Can't find %s pod. Grabbing metrics from %s is disabled.", component, component)
return false
}
// The debug handlers on the host where the pod runs might be disabled.
// We can check that indirectly by trying to retrieve log output.
limit := int64(1)
if _, err := c.CoreV1().Pods(metav1.NamespaceSystem).GetLogs(podName, &v1.PodLogOptions{LimitBytes: &limit}).DoRaw(context.TODO()); err != nil {
klog.Warningf("Can't retrieve log output of %s (%q). Debug handlers might be disabled in kubelet. Grabbing metrics from %s is disabled.",
podName, err, component)
return false
}
// Metrics gathering enabled.
return true
}
// HasControlPlanePods returns true if metrics grabber was able to find control-plane pods
func (g *Grabber) HasControlPlanePods() bool {
return g.kubeScheduler != "" && g.kubeControllerManager != ""
@@ -164,8 +196,8 @@ func (g *Grabber) grabFromKubeletInternal(nodeName string, kubeletPort int) (Kub
// GrabFromScheduler returns metrics from scheduler
func (g *Grabber) GrabFromScheduler() (SchedulerMetrics, error) {
if g.kubeScheduler == "" {
return SchedulerMetrics{}, fmt.Errorf("kube-scheduler pod is not registered. Skipping Scheduler's metrics gathering")
if !g.grabFromScheduler {
return SchedulerMetrics{}, fmt.Errorf("kube-scheduler: %w", MetricsGrabbingDisabledError)
}
var err error
@@ -173,7 +205,7 @@ func (g *Grabber) GrabFromScheduler() (SchedulerMetrics, error) {
g.waitForSchedulerReadyOnce.Do(func() {
var lastMetricsFetchErr error
if metricsWaitErr := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
output, lastMetricsFetchErr = g.getMetricsFromPod(g.client, metricsProxyPod, metav1.NamespaceSystem, kubeSchedulerPort)
output, lastMetricsFetchErr = g.getSecureMetricsFromPod(g.kubeScheduler, metav1.NamespaceSystem, kubeSchedulerPort)
return lastMetricsFetchErr == nil, nil
}); metricsWaitErr != nil {
err = fmt.Errorf("error waiting for scheduler pod to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr)
@@ -189,7 +221,7 @@ func (g *Grabber) GrabFromScheduler() (SchedulerMetrics, error) {
// GrabFromClusterAutoscaler returns metrics from cluster autoscaler
func (g *Grabber) GrabFromClusterAutoscaler() (ClusterAutoscalerMetrics, error) {
if !g.HasControlPlanePods() && g.externalClient == nil {
return ClusterAutoscalerMetrics{}, fmt.Errorf("Did not find control-plane pods. Skipping ClusterAutoscaler's metrics gathering")
return ClusterAutoscalerMetrics{}, fmt.Errorf("ClusterAutoscaler: %w", MetricsGrabbingDisabledError)
}
var client clientset.Interface
var namespace string
@@ -209,8 +241,8 @@ func (g *Grabber) GrabFromClusterAutoscaler() (ClusterAutoscalerMetrics, error)
// GrabFromControllerManager returns metrics from controller manager
func (g *Grabber) GrabFromControllerManager() (ControllerManagerMetrics, error) {
if g.kubeControllerManager == "" {
return ControllerManagerMetrics{}, fmt.Errorf("kube-controller-manager pod is not registered. Skipping ControllerManager's metrics gathering")
if !g.grabFromControllerManager {
return ControllerManagerMetrics{}, fmt.Errorf("kube-controller-manager: %w", MetricsGrabbingDisabledError)
}
var err error
@@ -224,7 +256,7 @@ func (g *Grabber) GrabFromControllerManager() (ControllerManagerMetrics, error)
var lastMetricsFetchErr error
if metricsWaitErr := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
output, lastMetricsFetchErr = g.getMetricsFromPod(g.client, metricsProxyPod, metav1.NamespaceSystem, kubeControllerManagerPort)
output, lastMetricsFetchErr = g.getSecureMetricsFromPod(g.kubeControllerManager, metav1.NamespaceSystem, kubeControllerManagerPort)
return lastMetricsFetchErr == nil, nil
}); metricsWaitErr != nil {
err = fmt.Errorf("error waiting for controller manager pod to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr)
@@ -239,8 +271,8 @@ func (g *Grabber) GrabFromControllerManager() (ControllerManagerMetrics, error)
// GrabFromSnapshotController returns metrics from controller manager
func (g *Grabber) GrabFromSnapshotController(podName string, port int) (SnapshotControllerMetrics, error) {
if g.snapshotController == "" {
return SnapshotControllerMetrics{}, fmt.Errorf("SnapshotController pod is not registered. Skipping SnapshotController's metrics gathering")
if !g.grabFromSnapshotController {
return SnapshotControllerMetrics{}, fmt.Errorf("snapshot controller: %w", MetricsGrabbingDisabledError)
}
// Use overrides if provided via test config flags.
@@ -354,6 +386,7 @@ func (g *Grabber) Grab() (Collection, error) {
return result, nil
}
// getMetricsFromPod retrieves metrics data from an insecure port.
func (g *Grabber) getMetricsFromPod(client clientset.Interface, podName string, namespace string, port int) (string, error) {
rawOutput, err := client.CoreV1().RESTClient().Get().
Namespace(namespace).
@@ -367,3 +400,50 @@ func (g *Grabber) getMetricsFromPod(client clientset.Interface, podName string,
}
return string(rawOutput), nil
}
// getSecureMetricsFromPod retrieves metrics from a pod that uses TLS
// and checks client credentials. Conceptually this function is
// similar to "kubectl port-forward" + "kubectl get --raw
// https://localhost:<port>/metrics". It uses the same credentials
// as kubelet.
func (g *Grabber) getSecureMetricsFromPod(podName string, namespace string, port int) (string, error) {
dialer := e2epod.NewDialer(g.client, g.config)
metricConfig := rest.CopyConfig(g.config)
addr := e2epod.Addr{
Namespace: namespace,
PodName: podName,
Port: port,
}
metricConfig.Dial = func(ctx context.Context, network, address string) (net.Conn, error) {
return dialer.DialContainerPort(ctx, addr)
}
// This should make it possible verify the server, but while it
// got past the server name check, certificate validation
// still failed.
metricConfig.Host = addr.String()
metricConfig.ServerName = "localhost"
// Verifying the pod certificate with the same root CA
// as for the API server led to an error about "unknown root
// certificate". Disabling certificate checking on the client
// side gets around that and should be good enough for
// E2E testing.
metricConfig.Insecure = true
metricConfig.CAFile = ""
metricConfig.CAData = nil
// clientset.NewForConfig is used because
// metricClient.RESTClient() is directly usable, in contrast
// to the client constructed by rest.RESTClientFor().
metricClient, err := clientset.NewForConfig(metricConfig)
if err != nil {
return "", err
}
rawOutput, err := metricClient.RESTClient().Get().
AbsPath("metrics").
Do(context.TODO()).Raw()
if err != nil {
return "", err
}
return string(rawOutput), nil
}

View File

@@ -1,214 +0,0 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"context"
"fmt"
"strings"
"time"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
imageutils "k8s.io/kubernetes/test/utils/image"
)
type componentInfo struct {
Name string
Port int
IP string
}
// SetupMetricsProxy creates a nginx Pod to expose metrics from the secure port of kube-scheduler and kube-controller-manager in tests.
func SetupMetricsProxy(c clientset.Interface) error {
var infos []componentInfo
// The component pods might take some time to show up.
err := wait.PollImmediate(time.Second*5, time.Minute*5, func() (bool, error) {
podList, err := c.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return false, fmt.Errorf("list pods in ns %s: %w", metav1.NamespaceSystem, err)
}
var foundComponents []componentInfo
for _, pod := range podList.Items {
if len(pod.Status.PodIP) == 0 {
continue
}
switch {
case strings.HasPrefix(pod.Name, "kube-scheduler-"):
foundComponents = append(foundComponents, componentInfo{
Name: pod.Name,
Port: kubeSchedulerPort,
IP: pod.Status.PodIP,
})
case strings.HasPrefix(pod.Name, "kube-controller-manager-"):
foundComponents = append(foundComponents, componentInfo{
Name: pod.Name,
Port: kubeControllerManagerPort,
IP: pod.Status.PodIP,
})
}
}
if len(foundComponents) != 2 {
klog.Infof("Only %d components found. Will retry.", len(foundComponents))
klog.Infof("Found components: %v", foundComponents)
return false, nil
}
infos = foundComponents
return true, nil
})
if err != nil {
return fmt.Errorf("missing component pods: %w", err)
}
klog.Infof("Found components: %v", infos)
const name = metricsProxyPod
_, err = c.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Create(context.TODO(), &v1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{Name: name},
}, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("create serviceAccount: %w", err)
}
_, err = c.RbacV1().ClusterRoles().Create(context.TODO(), &rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: name},
Rules: []rbacv1.PolicyRule{
{
NonResourceURLs: []string{"/metrics"},
Verbs: []string{"get"},
},
},
}, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("create clusterRole: %w", err)
}
_, err = c.RbacV1().ClusterRoleBindings().Create(context.TODO(), &rbacv1.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{Name: name},
Subjects: []rbacv1.Subject{
{
Kind: rbacv1.ServiceAccountKind,
Name: name,
Namespace: metav1.NamespaceSystem,
},
},
RoleRef: rbacv1.RoleRef{
APIGroup: "rbac.authorization.k8s.io",
Kind: "ClusterRole",
Name: name,
},
}, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("create clusterRoleBinding: %w", err)
}
var token string
err = wait.PollImmediate(time.Second*5, time.Minute*5, func() (done bool, err error) {
sa, err := c.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
klog.Warningf("Fail to get serviceAccount %s: %v", name, err)
return false, nil
}
if len(sa.Secrets) < 1 {
klog.Warningf("No secret found in serviceAccount %s", name)
return false, nil
}
secretRef := sa.Secrets[0]
secret, err := c.CoreV1().Secrets(metav1.NamespaceSystem).Get(context.TODO(), secretRef.Name, metav1.GetOptions{})
if err != nil {
klog.Warningf("Fail to get secret %s", secretRef.Name)
return false, nil
}
token = string(secret.Data["token"])
if len(token) == 0 {
klog.Warningf("Token in secret %s is empty", secretRef.Name)
return false, nil
}
return true, nil
})
if err != nil {
return err
}
var nginxConfig string
for _, info := range infos {
nginxConfig += fmt.Sprintf(`
server {
listen %d;
server_name _;
proxy_set_header Authorization "Bearer %s";
proxy_ssl_verify off;
location /metrics {
proxy_pass https://%s:%d;
}
}
`, info.Port, token, info.IP, info.Port)
}
_, err = c.CoreV1().ConfigMaps(metav1.NamespaceSystem).Create(context.TODO(), &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceSystem,
},
Data: map[string]string{
"metrics.conf": nginxConfig,
},
}, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("create nginx configmap: %w", err)
}
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: metav1.NamespaceSystem,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "nginx",
Image: imageutils.GetE2EImage(imageutils.Nginx),
VolumeMounts: []v1.VolumeMount{{
Name: "config",
MountPath: "/etc/nginx/conf.d",
ReadOnly: true,
}},
}},
Volumes: []v1.Volume{{
Name: "config",
VolumeSource: v1.VolumeSource{
ConfigMap: &v1.ConfigMapVolumeSource{
LocalObjectReference: v1.LocalObjectReference{
Name: name,
},
},
},
}},
},
}
_, err = c.CoreV1().Pods(metav1.NamespaceSystem).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
return err
}
err = e2epod.WaitForPodNameRunningInNamespace(c, name, metav1.NamespaceSystem)
if err != nil {
return err
}
klog.Info("Successfully setup metrics-proxy")
return nil
}

View File

@@ -0,0 +1,215 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pod
import (
"context"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"regexp"
"strconv"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
"k8s.io/klog/v2"
)
// NewTransport creates a transport which uses the port forward dialer.
// URLs must use <namespace>.<pod>:<port> as host.
func NewTransport(client kubernetes.Interface, restConfig *rest.Config) *http.Transport {
return &http.Transport{
DialContext: func(ctx context.Context, _, addr string) (net.Conn, error) {
dialer := NewDialer(client, restConfig)
a, err := ParseAddr(addr)
if err != nil {
return nil, err
}
return dialer.DialContainerPort(ctx, *a)
},
}
}
// NewDialer creates a dialer that supports connecting to container ports.
func NewDialer(client kubernetes.Interface, restConfig *rest.Config) *Dialer {
return &Dialer{
client: client,
restConfig: restConfig,
}
}
// Dialer holds the relevant parameters that are independent of a particular connection.
type Dialer struct {
client kubernetes.Interface
restConfig *rest.Config
}
// DialContainerPort connects to a certain container port in a pod.
func (d *Dialer) DialContainerPort(ctx context.Context, addr Addr) (conn net.Conn, finalErr error) {
restClient := d.client.CoreV1().RESTClient()
restConfig := d.restConfig
if restConfig.GroupVersion == nil {
restConfig.GroupVersion = &schema.GroupVersion{}
}
if restConfig.NegotiatedSerializer == nil {
restConfig.NegotiatedSerializer = scheme.Codecs
}
// The setup code around the actual portforward is from
// https://github.com/kubernetes/kubernetes/blob/c652ffbe4a29143623a1aaec39f745575f7e43ad/staging/src/k8s.io/kubectl/pkg/cmd/portforward/portforward.go
req := restClient.Post().
Resource("pods").
Namespace(addr.Namespace).
Name(addr.PodName).
SubResource("portforward")
transport, upgrader, err := spdy.RoundTripperFor(restConfig)
if err != nil {
return nil, fmt.Errorf("create round tripper: %v", err)
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL())
streamConn, _, err := dialer.Dial(portforward.PortForwardProtocolV1Name)
if err != nil {
return nil, fmt.Errorf("dialer failed: %v", err)
}
requestID := "1"
defer func() {
if finalErr != nil {
streamConn.Close()
}
}()
// create error stream
headers := http.Header{}
headers.Set(v1.StreamType, v1.StreamTypeError)
headers.Set(v1.PortHeader, fmt.Sprintf("%d", addr.Port))
headers.Set(v1.PortForwardRequestIDHeader, requestID)
// We're not writing to this stream, just reading an error message from it.
// This happens asynchronously.
errorStream, err := streamConn.CreateStream(headers)
if err != nil {
return nil, fmt.Errorf("error creating error stream: %v", err)
}
errorStream.Close()
go func() {
message, err := ioutil.ReadAll(errorStream)
switch {
case err != nil:
klog.ErrorS(err, "error reading from error stream")
case len(message) > 0:
klog.ErrorS(errors.New(string(message)), "an error occurred connecting to the remote port")
}
}()
// create data stream
headers.Set(v1.StreamType, v1.StreamTypeData)
dataStream, err := streamConn.CreateStream(headers)
if err != nil {
return nil, fmt.Errorf("error creating data stream: %v", err)
}
return &stream{
Stream: dataStream,
streamConn: streamConn,
}, nil
}
// Addr contains all relevant parameters for a certain port in a pod.
// The container should be running before connections are attempted,
// otherwise the connection will fail.
type Addr struct {
Namespace, PodName string
Port int
}
var _ net.Addr = Addr{}
func (a Addr) Network() string {
return "port-forwarding"
}
func (a Addr) String() string {
return fmt.Sprintf("%s.%s:%d", a.Namespace, a.PodName, a.Port)
}
// ParseAddr expects a <namespace>.<pod>:<port number> as produced
// by Addr.String.
func ParseAddr(addr string) (*Addr, error) {
parts := addrRegex.FindStringSubmatch(addr)
if parts == nil {
return nil, fmt.Errorf("%q: must match the format <namespace>.<pod>:<port number>", addr)
}
port, _ := strconv.Atoi(parts[3])
return &Addr{
Namespace: parts[1],
PodName: parts[2],
Port: port,
}, nil
}
var addrRegex = regexp.MustCompile(`^([^\.]+)\.([^:]+):(\d+)$`)
type stream struct {
addr Addr
httpstream.Stream
streamConn httpstream.Connection
}
var _ net.Conn = &stream{}
func (s *stream) Close() error {
s.Stream.Close()
s.streamConn.Close()
return nil
}
func (s *stream) LocalAddr() net.Addr {
return LocalAddr{}
}
func (s *stream) RemoteAddr() net.Addr {
return s.addr
}
func (s *stream) SetDeadline(t time.Time) error {
return nil
}
func (s *stream) SetReadDeadline(t time.Time) error {
return nil
}
func (s *stream) SetWriteDeadline(t time.Time) error {
return nil
}
type LocalAddr struct{}
var _ net.Addr = LocalAddr{}
func (l LocalAddr) Network() string { return "port-forwarding" }
func (l LocalAddr) String() string { return "apiserver" }