e2e: seperate wait for termination notice and graceful termination
This commit is contained in:
		@@ -46,7 +46,7 @@ type resourceTest struct {
 | 
			
		||||
 | 
			
		||||
func logPodsOnNodes(c *client.Client, nodeNames []string) {
 | 
			
		||||
	for _, n := range nodeNames {
 | 
			
		||||
		podList, err := GetKubeletPods(c, n)
 | 
			
		||||
		podList, err := GetKubeletRunningPods(c, n)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			Logf("Unable to retrieve kubelet pods for node %v", n)
 | 
			
		||||
			continue
 | 
			
		||||
 
 | 
			
		||||
@@ -32,7 +32,6 @@ import (
 | 
			
		||||
	cadvisorapi "github.com/google/cadvisor/info/v1"
 | 
			
		||||
	"github.com/prometheus/common/model"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/client/restclient"
 | 
			
		||||
	client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/metrics"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/server/stats"
 | 
			
		||||
@@ -42,11 +41,6 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/util/wait"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	// timeout for proxy requests.
 | 
			
		||||
	proxyTimeout = 2 * time.Minute
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint.
 | 
			
		||||
// TODO: Get some more structure around the metrics and this type
 | 
			
		||||
type KubeletMetric struct {
 | 
			
		||||
@@ -342,46 +336,9 @@ type usageDataPerContainer struct {
 | 
			
		||||
	memWorkSetData []uint64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Performs a get on a node proxy endpoint given the nodename and rest client.
 | 
			
		||||
func nodeProxyRequest(c *client.Client, node, endpoint string) (restclient.Result, error) {
 | 
			
		||||
	// proxy tends to hang in some cases when Node is not ready. Add an artificial timeout for this call.
 | 
			
		||||
	// This will leak a goroutine if proxy hangs. #22165
 | 
			
		||||
	subResourceProxyAvailable, err := serverVersionGTE(subResourceServiceAndNodeProxyVersion, c)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return restclient.Result{}, err
 | 
			
		||||
	}
 | 
			
		||||
	var result restclient.Result
 | 
			
		||||
	finished := make(chan struct{})
 | 
			
		||||
	go func() {
 | 
			
		||||
		if subResourceProxyAvailable {
 | 
			
		||||
			result = c.Get().
 | 
			
		||||
				Resource("nodes").
 | 
			
		||||
				SubResource("proxy").
 | 
			
		||||
				Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)).
 | 
			
		||||
				Suffix(endpoint).
 | 
			
		||||
				Do()
 | 
			
		||||
 | 
			
		||||
		} else {
 | 
			
		||||
			result = c.Get().
 | 
			
		||||
				Prefix("proxy").
 | 
			
		||||
				Resource("nodes").
 | 
			
		||||
				Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)).
 | 
			
		||||
				Suffix(endpoint).
 | 
			
		||||
				Do()
 | 
			
		||||
		}
 | 
			
		||||
		finished <- struct{}{}
 | 
			
		||||
	}()
 | 
			
		||||
	select {
 | 
			
		||||
	case <-finished:
 | 
			
		||||
		return result, nil
 | 
			
		||||
	case <-time.After(proxyTimeout):
 | 
			
		||||
		return restclient.Result{}, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Retrieve metrics from the kubelet server of the given node.
 | 
			
		||||
func getKubeletMetricsThroughProxy(c *client.Client, node string) (string, error) {
 | 
			
		||||
	client, err := nodeProxyRequest(c, node, "metrics")
 | 
			
		||||
	client, err := NodeProxyRequest(c, node, "metrics")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return "", err
 | 
			
		||||
	}
 | 
			
		||||
@@ -408,7 +365,7 @@ func getKubeletMetricsThroughNode(nodeName string) (string, error) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getKubeletHeapStats(c *client.Client, nodeName string) (string, error) {
 | 
			
		||||
	client, err := nodeProxyRequest(c, nodeName, "debug/pprof/heap")
 | 
			
		||||
	client, err := NodeProxyRequest(c, nodeName, "debug/pprof/heap")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return "", err
 | 
			
		||||
	}
 | 
			
		||||
@@ -423,31 +380,10 @@ func getKubeletHeapStats(c *client.Client, nodeName string) (string, error) {
 | 
			
		||||
	return strings.Join(lines[len(lines)-numLines:], "\n"), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetKubeletPods retrieves the list of running pods on the kubelet. The pods
 | 
			
		||||
// includes necessary information (e.g., UID, name, namespace for
 | 
			
		||||
// pods/containers), but do not contain the full spec.
 | 
			
		||||
func GetKubeletPods(c *client.Client, node string) (*api.PodList, error) {
 | 
			
		||||
	result := &api.PodList{}
 | 
			
		||||
	client, err := nodeProxyRequest(c, node, "runningpods")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return &api.PodList{}, err
 | 
			
		||||
	}
 | 
			
		||||
	if err = client.Into(result); err != nil {
 | 
			
		||||
		return &api.PodList{}, err
 | 
			
		||||
	}
 | 
			
		||||
	return result, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func PrintAllKubeletPods(c *client.Client, nodeName string) {
 | 
			
		||||
	result, err := nodeProxyRequest(c, nodeName, "pods")
 | 
			
		||||
	podList, err := GetKubeletPods(c, nodeName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		Logf("Unable to retrieve kubelet pods for node %v", nodeName)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	podList := &api.PodList{}
 | 
			
		||||
	err = result.Into(podList)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		Logf("Unable to cast result to pods for node %v", nodeName)
 | 
			
		||||
		Logf("Unable to retrieve kubelet pods for node %v: %v", nodeName, err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	for _, p := range podList.Items {
 | 
			
		||||
 
 | 
			
		||||
@@ -351,6 +351,28 @@ var _ = Describe("Pods", func() {
 | 
			
		||||
			Failf("Failed to delete pod: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		By("verifying the kubelet observed the termination notice")
 | 
			
		||||
		pod, err = podClient.Get(pod.Name)
 | 
			
		||||
		Expect(wait.Poll(time.Second*5, time.Second*30, func() (bool, error) {
 | 
			
		||||
			podList, err := GetKubeletPods(framework.Client, pod.Spec.NodeName)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				Logf("Unable to retrieve kubelet pods for node %v: %v", pod.Spec.NodeName, err)
 | 
			
		||||
				return false, nil
 | 
			
		||||
			}
 | 
			
		||||
			for _, kubeletPod := range podList.Items {
 | 
			
		||||
				if pod.Name != kubeletPod.Name {
 | 
			
		||||
					continue
 | 
			
		||||
				}
 | 
			
		||||
				if kubeletPod.ObjectMeta.DeletionTimestamp == nil {
 | 
			
		||||
					Logf("deletion has not yet been observed")
 | 
			
		||||
					return false, nil
 | 
			
		||||
				}
 | 
			
		||||
				return true, nil
 | 
			
		||||
			}
 | 
			
		||||
			Logf("no pod exists with the name we were looking for, assuming the termination request was observed and completed")
 | 
			
		||||
			return true, nil
 | 
			
		||||
		})).NotTo(HaveOccurred(), "kubelet never observed the termination notice")
 | 
			
		||||
 | 
			
		||||
		By("verifying pod deletion was observed")
 | 
			
		||||
		deleted := false
 | 
			
		||||
		timeout := false
 | 
			
		||||
 
 | 
			
		||||
@@ -56,6 +56,7 @@ import (
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubectl"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/kubelet/util/format"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/labels"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/master/ports"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/runtime"
 | 
			
		||||
	sshutil "k8s.io/kubernetes/pkg/ssh"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/types"
 | 
			
		||||
@@ -3497,6 +3498,70 @@ func GetReadyNodes(f *Framework) (nodes *api.NodeList, err error) {
 | 
			
		||||
	return nodes, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// timeout for proxy requests.
 | 
			
		||||
const proxyTimeout = 2 * time.Minute
 | 
			
		||||
 | 
			
		||||
// NodeProxyRequest performs a get on a node proxy endpoint given the nodename and rest client.
 | 
			
		||||
func NodeProxyRequest(c *client.Client, node, endpoint string) (restclient.Result, error) {
 | 
			
		||||
	// proxy tends to hang in some cases when Node is not ready. Add an artificial timeout for this call.
 | 
			
		||||
	// This will leak a goroutine if proxy hangs. #22165
 | 
			
		||||
	subResourceProxyAvailable, err := serverVersionGTE(subResourceServiceAndNodeProxyVersion, c)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return restclient.Result{}, err
 | 
			
		||||
	}
 | 
			
		||||
	var result restclient.Result
 | 
			
		||||
	finished := make(chan struct{})
 | 
			
		||||
	go func() {
 | 
			
		||||
		if subResourceProxyAvailable {
 | 
			
		||||
			result = c.Get().
 | 
			
		||||
				Resource("nodes").
 | 
			
		||||
				SubResource("proxy").
 | 
			
		||||
				Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)).
 | 
			
		||||
				Suffix(endpoint).
 | 
			
		||||
				Do()
 | 
			
		||||
 | 
			
		||||
		} else {
 | 
			
		||||
			result = c.Get().
 | 
			
		||||
				Prefix("proxy").
 | 
			
		||||
				Resource("nodes").
 | 
			
		||||
				Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)).
 | 
			
		||||
				Suffix(endpoint).
 | 
			
		||||
				Do()
 | 
			
		||||
		}
 | 
			
		||||
		finished <- struct{}{}
 | 
			
		||||
	}()
 | 
			
		||||
	select {
 | 
			
		||||
	case <-finished:
 | 
			
		||||
		return result, nil
 | 
			
		||||
	case <-time.After(proxyTimeout):
 | 
			
		||||
		return restclient.Result{}, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetKubeletPods retrieves the list of pods on the kubelet
 | 
			
		||||
func GetKubeletPods(c *client.Client, node string) (*api.PodList, error) {
 | 
			
		||||
	return getKubeletPods(c, node, "pods")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetKubeletRunningPods retrieves the list of running pods on the kubelet. The pods
 | 
			
		||||
// includes necessary information (e.g., UID, name, namespace for
 | 
			
		||||
// pods/containers), but do not contain the full spec.
 | 
			
		||||
func GetKubeletRunningPods(c *client.Client, node string) (*api.PodList, error) {
 | 
			
		||||
	return getKubeletPods(c, node, "runningpods")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getKubeletPods(c *client.Client, node, resource string) (*api.PodList, error) {
 | 
			
		||||
	result := &api.PodList{}
 | 
			
		||||
	client, err := NodeProxyRequest(c, node, resource)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return &api.PodList{}, err
 | 
			
		||||
	}
 | 
			
		||||
	if err = client.Into(result); err != nil {
 | 
			
		||||
		return &api.PodList{}, err
 | 
			
		||||
	}
 | 
			
		||||
	return result, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// LaunchWebserverPod launches a pod serving http on port 8080 to act
 | 
			
		||||
// as the target for networking connectivity checks.  The ip address
 | 
			
		||||
// of the created pod will be returned if the pod is launched
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user