Merge pull request #121958 from neolit123/1.30-remove-kubelet-and-func
kubeadm: drop concurrency when waiting for kubelet /healthz
This commit is contained in:
		| @@ -26,9 +26,9 @@ import ( | ||||
| 	"github.com/pkg/errors" | ||||
|  | ||||
| 	clientset "k8s.io/client-go/kubernetes" | ||||
| 	"k8s.io/klog/v2" | ||||
|  | ||||
| 	"k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow" | ||||
| 	kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants" | ||||
| 	"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient" | ||||
| 	dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun" | ||||
| ) | ||||
| @@ -79,24 +79,23 @@ func runWaitControlPlanePhase(c workflow.RunData) error { | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// waiter holds the apiclient.Waiter implementation of choice, responsible for querying the API server in various ways and waiting for conditions to be fulfilled | ||||
| 	klog.V(1).Infoln("[wait-control-plane] Waiting for the API server to be healthy") | ||||
|  | ||||
| 	// WaitForAPI uses the /healthz endpoint, thus a client without permissions works fine | ||||
| 	// Both Wait* calls below use a /healthz endpoint, thus a client without permissions works fine | ||||
| 	client, err := data.ClientWithoutBootstrap() | ||||
| 	if err != nil { | ||||
| 		return errors.Wrap(err, "cannot obtain client without bootstrap") | ||||
| 	} | ||||
|  | ||||
| 	timeout := data.Cfg().ClusterConfiguration.APIServer.TimeoutForControlPlane.Duration | ||||
| 	waiter, err := newControlPlaneWaiter(data.DryRun(), timeout, client, data.OutputWriter()) | ||||
| 	waiter, err := newControlPlaneWaiter(data.DryRun(), 0, client, data.OutputWriter()) | ||||
| 	if err != nil { | ||||
| 		return errors.Wrap(err, "error creating waiter") | ||||
| 	} | ||||
|  | ||||
| 	fmt.Printf("[wait-control-plane] Waiting for the kubelet to boot up the control plane as static Pods from directory %q. This can take up to %v\n", data.ManifestDir(), timeout) | ||||
| 	controlPlaneTimeout := data.Cfg().ClusterConfiguration.APIServer.TimeoutForControlPlane.Duration | ||||
| 	fmt.Printf("[wait-control-plane] Waiting for the kubelet to boot up the control plane as static Pods"+ | ||||
| 		" from directory %q\n", | ||||
| 		data.ManifestDir()) | ||||
|  | ||||
| 	if err := waiter.WaitForKubeletAndFunc(waiter.WaitForAPI); err != nil { | ||||
| 	handleError := func(err error) error { | ||||
| 		context := struct { | ||||
| 			Error  string | ||||
| 			Socket string | ||||
| @@ -109,6 +108,16 @@ func runWaitControlPlanePhase(c workflow.RunData) error { | ||||
| 		return errors.New("couldn't initialize a Kubernetes cluster") | ||||
| 	} | ||||
|  | ||||
| 	waiter.SetTimeout(kubeadmconstants.DefaultKubeletTimeout) | ||||
| 	if err := waiter.WaitForKubelet(); err != nil { | ||||
| 		return handleError(err) | ||||
| 	} | ||||
|  | ||||
| 	waiter.SetTimeout(controlPlaneTimeout) | ||||
| 	if err := waiter.WaitForAPI(); err != nil { | ||||
| 		return handleError(err) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -205,8 +205,14 @@ func runKubeletStartJoinPhase(c workflow.RunData) (returnErr error) { | ||||
| 	// Now the kubelet will perform the TLS Bootstrap, transforming /etc/kubernetes/bootstrap-kubelet.conf to /etc/kubernetes/kubelet.conf | ||||
| 	// Wait for the kubelet to create the /etc/kubernetes/kubelet.conf kubeconfig file. If this process | ||||
| 	// times out, display a somewhat user-friendly message. | ||||
| 	waiter := apiclient.NewKubeWaiter(nil, kubeadmconstants.TLSBootstrapTimeout, os.Stdout) | ||||
| 	if err := waiter.WaitForKubeletAndFunc(waitForTLSBootstrappedClient); err != nil { | ||||
| 	waiter := apiclient.NewKubeWaiter(nil, 0, os.Stdout) | ||||
| 	waiter.SetTimeout(kubeadmconstants.DefaultKubeletTimeout) | ||||
| 	if err := waiter.WaitForKubelet(); err != nil { | ||||
| 		fmt.Printf(kubeadmJoinFailMsg, err) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if err := waitForTLSBootstrappedClient(); err != nil { | ||||
| 		fmt.Printf(kubeadmJoinFailMsg, err) | ||||
| 		return err | ||||
| 	} | ||||
| @@ -227,7 +233,7 @@ func runKubeletStartJoinPhase(c workflow.RunData) (returnErr error) { | ||||
|  | ||||
| // waitForTLSBootstrappedClient waits for the /etc/kubernetes/kubelet.conf file to be available | ||||
| func waitForTLSBootstrappedClient() error { | ||||
| 	fmt.Println("[kubelet-start] Waiting for the kubelet to perform the TLS Bootstrap...") | ||||
| 	fmt.Println("[kubelet-start] Waiting for the kubelet to perform the TLS Bootstrap") | ||||
|  | ||||
| 	// Loop on every falsy return. Return with an error if raised. Exit successfully if true is returned. | ||||
| 	return wait.PollImmediate(kubeadmconstants.TLSBootstrapRetryInterval, kubeadmconstants.TLSBootstrapTimeout, func() (bool, error) { | ||||
|   | ||||
| @@ -230,6 +230,8 @@ const ( | ||||
|  | ||||
| 	// DefaultControlPlaneTimeout specifies the default control plane (actually API Server) timeout for use by kubeadm | ||||
| 	DefaultControlPlaneTimeout = 4 * time.Minute | ||||
| 	// DefaultKubeletTimeout specifies the default kubelet timeout | ||||
| 	DefaultKubeletTimeout = 4 * time.Minute | ||||
|  | ||||
| 	// MinimumAddressesInServiceSubnet defines minimum amount of nodes the Service subnet should allow. | ||||
| 	// We need at least ten, because the DNS service is always at the tenth cluster clusterIP | ||||
|   | ||||
| @@ -22,10 +22,12 @@ import ( | ||||
| 	"github.com/pkg/errors" | ||||
|  | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	clientset "k8s.io/client-go/kubernetes" | ||||
| 	bootstraputil "k8s.io/cluster-bootstrap/token/util" | ||||
|  | ||||
| 	bootstraptokenv1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/bootstraptoken/v1" | ||||
| 	kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants" | ||||
| 	"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient" | ||||
| ) | ||||
|  | ||||
| @@ -46,15 +48,21 @@ func UpdateOrCreateTokens(client clientset.Interface, failIfExists bool, tokens | ||||
| 		} | ||||
|  | ||||
| 		updatedOrNewSecret := bootstraptokenv1.BootstrapTokenToSecret(&token) | ||||
| 		// Try to create or update the token with an exponential backoff | ||||
| 		err = apiclient.TryRunCommand(func() error { | ||||
|  | ||||
| 		var lastError error | ||||
| 		err = wait.PollUntilContextTimeout( | ||||
| 			context.Background(), | ||||
| 			kubeadmconstants.APICallRetryInterval, | ||||
| 			kubeadmconstants.APICallWithWriteTimeout, | ||||
| 			true, func(_ context.Context) (bool, error) { | ||||
| 				if err := apiclient.CreateOrUpdateSecret(client, updatedOrNewSecret); err != nil { | ||||
| 				return errors.Wrapf(err, "failed to create or update bootstrap token with name %s", secretName) | ||||
| 					lastError = errors.Wrapf(err, "failed to create or update bootstrap token with name %s", secretName) | ||||
| 					return false, nil | ||||
| 				} | ||||
| 			return nil | ||||
| 		}, 5) | ||||
| 				return true, nil | ||||
| 			}) | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 			return lastError | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
|   | ||||
| @@ -131,13 +131,8 @@ func (w *fakeWaiter) WaitForStaticPodHashChange(_, _, _ string) error { | ||||
| 	return w.errsToReturn[waitForHashChange] | ||||
| } | ||||
|  | ||||
| // WaitForHealthyKubelet returns a dummy nil just to implement the interface | ||||
| func (w *fakeWaiter) WaitForHealthyKubelet(_ time.Duration, _ string) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function | ||||
| func (w *fakeWaiter) WaitForKubeletAndFunc(f func() error) error { | ||||
| // WaitForHKubelet returns a dummy nil just to implement the interface | ||||
| func (w *fakeWaiter) WaitForKubelet() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -50,10 +50,8 @@ type Waiter interface { | ||||
| 	WaitForStaticPodHashChange(nodeName, component, previousHash string) error | ||||
| 	// WaitForStaticPodControlPlaneHashes fetches sha256 hashes for the control plane static pods | ||||
| 	WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error) | ||||
| 	// WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok' | ||||
| 	WaitForHealthyKubelet(initialTimeout time.Duration, healthzEndpoint string) error | ||||
| 	// WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function | ||||
| 	WaitForKubeletAndFunc(f func() error) error | ||||
| 	// WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok' | ||||
| 	WaitForKubelet() error | ||||
| 	// SetTimeout adjusts the timeout to the specified duration | ||||
| 	SetTimeout(timeout time.Duration) | ||||
| } | ||||
| @@ -76,17 +74,28 @@ func NewKubeWaiter(client clientset.Interface, timeout time.Duration, writer io. | ||||
|  | ||||
| // WaitForAPI waits for the API Server's /healthz endpoint to report "ok" | ||||
| func (w *KubeWaiter) WaitForAPI() error { | ||||
| 	fmt.Printf("[api-check] Waiting for a healthy API server. This can take up to %v\n", w.timeout) | ||||
|  | ||||
| 	start := time.Now() | ||||
| 	return wait.PollImmediate(kubeadmconstants.APICallRetryInterval, w.timeout, func() (bool, error) { | ||||
| 	err := wait.PollUntilContextTimeout( | ||||
| 		context.Background(), | ||||
| 		kubeadmconstants.APICallRetryInterval, | ||||
| 		w.timeout, | ||||
| 		true, func(ctx context.Context) (bool, error) { | ||||
| 			healthStatus := 0 | ||||
| 		w.client.Discovery().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).StatusCode(&healthStatus) | ||||
| 			w.client.Discovery().RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&healthStatus) | ||||
| 			if healthStatus != http.StatusOK { | ||||
| 				return false, nil | ||||
| 			} | ||||
|  | ||||
| 		fmt.Printf("[apiclient] All control plane components are healthy after %f seconds\n", time.Since(start).Seconds()) | ||||
| 			return true, nil | ||||
| 		}) | ||||
| 	if err != nil { | ||||
| 		fmt.Printf("[api-check] The API server is not healthy after %v\n", time.Since(start)) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	fmt.Printf("[api-check] The API server is healthy after %v\n", time.Since(start)) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // WaitForPodsWithLabel will lookup pods with the given label and wait until they are all | ||||
| @@ -133,47 +142,54 @@ func (w *KubeWaiter) WaitForPodToDisappear(podName string) error { | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| // WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok' | ||||
| func (w *KubeWaiter) WaitForHealthyKubelet(initialTimeout time.Duration, healthzEndpoint string) error { | ||||
| 	time.Sleep(initialTimeout) | ||||
| 	fmt.Printf("[kubelet-check] Initial timeout of %v passed.\n", initialTimeout) | ||||
| 	return TryRunCommand(func() error { | ||||
| // WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok'. | ||||
| func (w *KubeWaiter) WaitForKubelet() error { | ||||
| 	var ( | ||||
| 		lastError       error | ||||
| 		start           = time.Now() | ||||
| 		healthzEndpoint = fmt.Sprintf("http://localhost:%d/healthz", kubeadmconstants.KubeletHealthzPort) | ||||
| 	) | ||||
|  | ||||
| 	fmt.Printf("[kubelet-check] Waiting for a healthy kubelet. This can take up to %v\n", w.timeout) | ||||
|  | ||||
| 	formatError := func(cause string) error { | ||||
| 		return errors.Errorf("The HTTP call equal to 'curl -sSL %s' returned %s\n", | ||||
| 			healthzEndpoint, cause) | ||||
| 	} | ||||
|  | ||||
| 	err := wait.PollUntilContextTimeout( | ||||
| 		context.Background(), | ||||
| 		kubeadmconstants.APICallRetryInterval, | ||||
| 		w.timeout, | ||||
| 		true, func(ctx context.Context) (bool, error) { | ||||
| 			client := &http.Client{Transport: netutil.SetOldTransportDefaults(&http.Transport{})} | ||||
| 		resp, err := client.Get(healthzEndpoint) | ||||
| 			req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthzEndpoint, nil) | ||||
| 			if err != nil { | ||||
| 			fmt.Println("[kubelet-check] It seems like the kubelet isn't running or healthy.") | ||||
| 			fmt.Printf("[kubelet-check] The HTTP call equal to 'curl -sSL %s' failed with error: %v.\n", healthzEndpoint, err) | ||||
| 			return err | ||||
| 				lastError = formatError(fmt.Sprintf("error: %v", err)) | ||||
| 				return false, err | ||||
| 			} | ||||
| 		defer resp.Body.Close() | ||||
| 			resp, err := client.Do(req) | ||||
| 			if err != nil { | ||||
| 				lastError = formatError(fmt.Sprintf("error: %v", err)) | ||||
| 				return false, nil | ||||
| 			} | ||||
| 			defer func() { | ||||
| 				_ = resp.Body.Close() | ||||
| 			}() | ||||
| 			if resp.StatusCode != http.StatusOK { | ||||
| 			fmt.Println("[kubelet-check] It seems like the kubelet isn't running or healthy.") | ||||
| 			fmt.Printf("[kubelet-check] The HTTP call equal to 'curl -sSL %s' returned HTTP code %d\n", healthzEndpoint, resp.StatusCode) | ||||
| 			return errors.New("the kubelet healthz endpoint is unhealthy") | ||||
| 				lastError = formatError(fmt.Sprintf("status code: %d", resp.StatusCode)) | ||||
| 				return false, nil | ||||
| 			} | ||||
|  | ||||
| 			return true, nil | ||||
| 		}) | ||||
| 	if err != nil { | ||||
| 		fmt.Printf("[kubelet-check] The kubelet is not healthy after %v\n", time.Since(start)) | ||||
| 		return lastError | ||||
| 	} | ||||
|  | ||||
| 	fmt.Printf("[kubelet-check] The kubelet is healthy after %v\n", time.Since(start)) | ||||
| 	return nil | ||||
| 	}, 5) // a failureThreshold of five means waiting for a total of 155 seconds | ||||
| } | ||||
|  | ||||
| // WaitForKubeletAndFunc waits primarily for the function f to execute, even though it might take some time. If that takes a long time, and the kubelet | ||||
| // /healthz continuously are unhealthy, kubeadm will error out after a period of exponential backoff | ||||
| func (w *KubeWaiter) WaitForKubeletAndFunc(f func() error) error { | ||||
| 	errorChan := make(chan error, 1) | ||||
|  | ||||
| 	go func(errC chan error, waiter Waiter) { | ||||
| 		if err := waiter.WaitForHealthyKubelet(40*time.Second, fmt.Sprintf("http://localhost:%d/healthz", kubeadmconstants.KubeletHealthzPort)); err != nil { | ||||
| 			errC <- err | ||||
| 		} | ||||
| 	}(errorChan, w) | ||||
|  | ||||
| 	go func(errC chan error) { | ||||
| 		// This main goroutine sends whatever the f function returns (error or not) to the channel | ||||
| 		// This in order to continue on success (nil error), or just fail if the function returns an error | ||||
| 		errC <- f() | ||||
| 	}(errorChan) | ||||
|  | ||||
| 	// This call is blocking until one of the goroutines sends to errorChan | ||||
| 	return <-errorChan | ||||
| } | ||||
|  | ||||
| // SetTimeout adjusts the timeout to the specified duration | ||||
| @@ -264,21 +280,3 @@ func getStaticPodSingleHash(client clientset.Interface, nodeName string, compone | ||||
| 	staticPodHash := staticPod.Annotations["kubernetes.io/config.hash"] | ||||
| 	return staticPodHash, nil | ||||
| } | ||||
|  | ||||
| // TryRunCommand runs a function a maximum of failureThreshold times, and retries on error. If failureThreshold is hit; the last error is returned | ||||
| func TryRunCommand(f func() error, failureThreshold int) error { | ||||
| 	backoff := wait.Backoff{ | ||||
| 		Duration: 5 * time.Second, | ||||
| 		Factor:   2, // double the timeout for every failure | ||||
| 		Steps:    failureThreshold, | ||||
| 	} | ||||
| 	return wait.ExponentialBackoff(backoff, func() (bool, error) { | ||||
| 		err := f() | ||||
| 		if err != nil { | ||||
| 			// Retry until the timeout | ||||
| 			return false, nil | ||||
| 		} | ||||
| 		// The last f() call was a success, return cleanly | ||||
| 		return true, nil | ||||
| 	}) | ||||
| } | ||||
|   | ||||
| @@ -106,14 +106,9 @@ func (w *Waiter) WaitForPodToDisappear(podName string) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // WaitForHealthyKubelet blocks until the kubelet /healthz endpoint returns 'ok' | ||||
| func (w *Waiter) WaitForHealthyKubelet(_ time.Duration, healthzEndpoint string) error { | ||||
| 	fmt.Printf("[dryrun] Would make sure the kubelet %q endpoint is healthy\n", healthzEndpoint) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // WaitForKubeletAndFunc is a wrapper for WaitForHealthyKubelet that also blocks for a function | ||||
| func (w *Waiter) WaitForKubeletAndFunc(f func() error) error { | ||||
| // WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok' | ||||
| func (w *Waiter) WaitForKubelet() error { | ||||
| 	fmt.Println("[dryrun] Would make sure the kubelet's /healthz endpoint is healthy") | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot