Revert "Merge pull request #30090 from mtaufen/dynamic-kubelet-restart"

This reverts commit fe808ec2a4, reversing
changes made to f297ea966e.
This commit is contained in:
Mike Danese
2016-08-23 14:11:48 -07:00
parent 7cf1c73fef
commit 0a735b7886
4 changed files with 25 additions and 495 deletions

View File

@@ -1,195 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e_node
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/componentconfig"
"k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
// This test is marked [Disruptive] because the Kubelet temporarily goes down as part of of this test.
var _ = framework.KubeDescribe("DynamicKubeletConfiguration [Feature:dynamicKubeletConfig] [Disruptive]", func() {
f := framework.NewDefaultFramework("dynamic-kubelet-configuration-test")
Context("When a configmap called `kubelet-<node-name>` is added to the `kube-system` namespace", func() {
It("The Kubelet on that node should restart to take up the new config", func() {
const (
restartGap = 40 * time.Second
)
// Get the current KubeletConfiguration (known to be valid) by
// querying the configz endpoint for the current node.
resp := pollConfigz(2*time.Minute, 5*time.Second)
kubeCfg, err := decodeConfigz(resp)
framework.ExpectNoError(err)
glog.Infof("KubeletConfiguration - Initial values: %+v", *kubeCfg)
// Change a safe value e.g. file check frequency.
// Make sure we're providing a value distinct from the current one.
oldFileCheckFrequency := kubeCfg.FileCheckFrequency.Duration
newFileCheckFrequency := 11 * time.Second
if kubeCfg.FileCheckFrequency.Duration == newFileCheckFrequency {
newFileCheckFrequency = 10 * time.Second
}
kubeCfg.FileCheckFrequency.Duration = newFileCheckFrequency
// Use the new config to create a new kube-<node-name> configmap in `kube-system` namespace.
_, err = createConfigMap(f, kubeCfg)
framework.ExpectNoError(err)
// Give the Kubelet time to see that there is new config and restart. If we don't do this,
// the Kubelet will still have the old config when we poll, and the test will fail.
time.Sleep(restartGap)
// Use configz to get the new config.
resp = pollConfigz(2*time.Minute, 5*time.Second)
kubeCfg, err = decodeConfigz(resp)
framework.ExpectNoError(err)
glog.Infof("KubeletConfiguration - After modification of FileCheckFrequency: %+v", *kubeCfg)
// We expect to see the new value in the new config.
Expect(kubeCfg.FileCheckFrequency.Duration).To(Equal(newFileCheckFrequency))
// Change the config back to what it originally was.
kubeCfg.FileCheckFrequency.Duration = oldFileCheckFrequency
_, err = updateConfigMap(f, kubeCfg)
framework.ExpectNoError(err)
// Give the Kubelet time to see that there is new config and restart. If we don't do this,
// the Kubelet will still have the old config when we poll, and the test will fail.
time.Sleep(restartGap)
// User configz to get the new config.
resp = pollConfigz(2*time.Minute, 5*time.Second)
kubeCfg, err = decodeConfigz(resp)
framework.ExpectNoError(err)
glog.Infof("KubeletConfiguration - After restoration of FileCheckFrequency: %+v", *kubeCfg)
// We expect to see the original value restored in the new config.
Expect(kubeCfg.FileCheckFrequency.Duration).To(Equal(oldFileCheckFrequency))
})
})
})
// This function either causes the test to fail, or it returns a status 200 response.
func pollConfigz(timeout time.Duration, pollInterval time.Duration) *http.Response {
endpoint := fmt.Sprintf("http://127.0.0.1:8080/api/v1/proxy/nodes/%s/configz", framework.TestContext.NodeName)
client := &http.Client{}
req, err := http.NewRequest("GET", endpoint, nil)
framework.ExpectNoError(err)
req.Header.Add("Accept", "application/json")
var resp *http.Response
Eventually(func() bool {
resp, err = client.Do(req)
if err != nil {
glog.Errorf("Failed to get /configz, retrying. Error: %v", err)
return false
}
if resp.StatusCode != 200 {
glog.Errorf("/configz response status not 200, retrying. Response was: %+v", resp)
return false
}
return true
}, timeout, pollInterval).Should(Equal(true))
return resp
}
// Decodes the http response from /configz and returns a componentconfig.KubeletConfiguration (internal type).
func decodeConfigz(resp *http.Response) (*componentconfig.KubeletConfiguration, error) {
// This hack because /configz reports the following structure:
// {"componentconfig": {the JSON representation of v1alpha1.KubeletConfiguration}}
type configzWrapper struct {
ComponentConfig v1alpha1.KubeletConfiguration `json:"componentconfig"`
}
configz := configzWrapper{}
kubeCfg := componentconfig.KubeletConfiguration{}
contentsBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
err = json.Unmarshal(contentsBytes, &configz)
if err != nil {
return nil, err
}
err = api.Scheme.Convert(&configz.ComponentConfig, &kubeCfg, nil)
if err != nil {
return nil, err
}
return &kubeCfg, nil
}
// Uses KubeletConfiguration to create a `kubelet-<node-name>` ConfigMap in the "kube-system" namespace.
func createConfigMap(f *framework.Framework, kubeCfg *componentconfig.KubeletConfiguration) (*api.ConfigMap, error) {
kubeCfgExt := v1alpha1.KubeletConfiguration{}
api.Scheme.Convert(kubeCfg, &kubeCfgExt, nil)
bytes, err := json.Marshal(kubeCfgExt)
framework.ExpectNoError(err)
cmap, err := f.Client.ConfigMaps("kube-system").Create(&api.ConfigMap{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("kubelet-%s", framework.TestContext.NodeName),
},
Data: map[string]string{
"kubelet.config": string(bytes),
},
})
if err != nil {
return nil, err
}
return cmap, nil
}
// Similar to createConfigMap, except this updates an existing ConfigMap.
func updateConfigMap(f *framework.Framework, kubeCfg *componentconfig.KubeletConfiguration) (*api.ConfigMap, error) {
kubeCfgExt := v1alpha1.KubeletConfiguration{}
api.Scheme.Convert(kubeCfg, &kubeCfgExt, nil)
bytes, err := json.Marshal(kubeCfgExt)
framework.ExpectNoError(err)
cmap, err := f.Client.ConfigMaps("kube-system").Update(&api.ConfigMap{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("kubelet-%s", framework.TestContext.NodeName),
},
Data: map[string]string{
"kubelet.config": string(bytes),
},
})
if err != nil {
return nil, err
}
return cmap, nil
}

View File

@@ -87,7 +87,7 @@ func (e *E2EServices) Start() error {
"--manifest-path", framework.TestContext.ManifestPath,
"--eviction-hard", framework.TestContext.EvictionHard,
)
e.services = newServer("services", startCmd, nil, nil, getHealthCheckURLs(), servicesLogFile, false)
e.services = newServer("services", startCmd, nil, getHealthCheckURLs(), servicesLogFile)
return e.services.start()
}
@@ -335,7 +335,7 @@ func (es *e2eService) startNamespaceController() error {
}
func (es *e2eService) startKubeletServer() (*server, error) {
var killCommand, restartCommand *exec.Cmd
var killCommand *exec.Cmd
cmdArgs := []string{}
if systemdRun, err := exec.LookPath("systemd-run"); err == nil {
// On systemd services, detection of a service / unit works reliably while
@@ -343,9 +343,8 @@ func (es *e2eService) startKubeletServer() (*server, error) {
// Since kubelet will typically be run as a service it also makes more
// sense to test it that way
unitName := fmt.Sprintf("kubelet-%d.service", rand.Int31())
cmdArgs = append(cmdArgs, systemdRun, "--unit="+unitName, "--remain-after-exit", getKubeletServerBin())
cmdArgs = append(cmdArgs, systemdRun, "--unit="+unitName, getKubeletServerBin())
killCommand = exec.Command("sudo", "systemctl", "kill", unitName)
restartCommand = exec.Command("sudo", "systemctl", "restart", unitName)
es.logFiles["kubelet.log"] = logFileData{
journalctlCommand: []string{"-u", unitName},
}
@@ -373,7 +372,6 @@ func (es *e2eService) startKubeletServer() (*server, error) {
"--pod-cidr=10.180.0.0/24", // Assign a fixed CIDR to the node because there is no node controller.
"--eviction-hard", framework.TestContext.EvictionHard,
"--eviction-pressure-transition-period", "30s",
"--feature-gates", "DynamicKubeletConfig=true", // TODO(mtaufen): Eventually replace with a value from the framework.TestContext
)
if framework.TestContext.CgroupsPerQOS {
// TODO: enable this when the flag is stable and available in kubelet.
@@ -396,10 +394,8 @@ func (es *e2eService) startKubeletServer() (*server, error) {
"kubelet",
cmd,
killCommand,
restartCommand,
[]string{kubeletHealthCheckURL},
"kubelet.log",
true)
"kubelet.log")
return server, server.start()
}
@@ -412,32 +408,20 @@ type server struct {
// killCommand is the command used to stop the server. It is not required. If it
// is not specified, `sudo kill` will be used to stop the server.
killCommand *exec.Cmd
// restartCommand is the command used to restart the server. If provided, it will be used
// instead of startCommand when restarting the server.
restartCommand *exec.Cmd
// healthCheckUrls is the urls used to check whether the server is ready.
healthCheckUrls []string
// outFilename is the name of the log file. The stdout and stderr of the server
// will be redirected to this file.
outFilename string
// restartOnExit determines whether a restart loop is launched with the server
restartOnExit bool
// Writing to this channel, if it is not nil, stops the restart loop.
// When tearing down a server, you should check for this channel and write to it if it exists.
stopRestartingCh chan<- bool
// Read from this to confirm that the restart loop has stopped.
ackStopRestartingCh <-chan bool
}
func newServer(name string, start, kill, restart *exec.Cmd, urls []string, filename string, restartOnExit bool) *server {
func newServer(name string, start, kill *exec.Cmd, urls []string, filename string) *server {
return &server{
name: name,
startCommand: start,
killCommand: kill,
restartCommand: restart,
healthCheckUrls: urls,
outFilename: filename,
restartOnExit: restartOnExit,
}
}
@@ -450,8 +434,8 @@ func commandToString(c *exec.Cmd) string {
}
func (s *server) String() string {
return fmt.Sprintf("server %q start-command: `%s`, kill-command: `%s`, restart-command: `%s`, health-check: %v, output-file: %q", s.name,
commandToString(s.startCommand), commandToString(s.killCommand), commandToString(s.restartCommand), s.healthCheckUrls, s.outFilename)
return fmt.Sprintf("server %q start-command: `%s`, kill-command: `%s`, health-check: %v, output-file: %q", s.name,
commandToString(s.startCommand), commandToString(s.killCommand), s.healthCheckUrls, s.outFilename)
}
// readinessCheck checks whether services are ready via the health check urls. Once there is
@@ -497,23 +481,8 @@ func readinessCheck(urls []string, errCh <-chan error) error {
return fmt.Errorf("e2e service readiness check timeout %v", *serverStartTimeout)
}
// Note: restartOnExit == true requires len(s.healthCheckUrls) > 0 to work properly.
func (s *server) start() error {
errCh := make(chan error)
var stopRestartingCh, ackStopRestartingCh chan bool
if s.restartOnExit {
if len(s.healthCheckUrls) == 0 {
return fmt.Errorf("Tried to start %s which has s.restartOnExit == true, but no health check urls provided.", s)
}
stopRestartingCh = make(chan bool)
ackStopRestartingCh = make(chan bool)
s.stopRestartingCh = stopRestartingCh
s.ackStopRestartingCh = ackStopRestartingCh
}
go func() {
defer close(errCh)
@@ -527,9 +496,10 @@ func (s *server) start() error {
defer outfile.Close()
defer outfile.Sync()
cmd := s.startCommand
// Set the command to write the output file
s.startCommand.Stdout = outfile
s.startCommand.Stderr = outfile
cmd.Stdout = outfile
cmd.Stderr = outfile
// Death of this test process should kill the server as well.
attrs := &syscall.SysProcAttr{}
@@ -541,96 +511,14 @@ func (s *server) start() error {
errCh <- fmt.Errorf("failed to set Pdeathsig field (non-linux build)")
return
}
s.startCommand.SysProcAttr = attrs
cmd.SysProcAttr = attrs
// Start the command
err = s.startCommand.Start()
// Run the command
err = cmd.Run()
if err != nil {
errCh <- fmt.Errorf("failed to run %s: %v", s, err)
errCh <- fmt.Errorf("failed to run server start command %q: %v", commandToString(cmd), err)
return
}
if !s.restartOnExit {
// If we aren't planning on restarting, ok to Wait() here to release resources.
// Otherwise, we Wait() in the restart loop.
err = s.startCommand.Wait()
if err != nil {
errCh <- fmt.Errorf("failed to run %s: %v", s, err)
return
}
} else {
// New stuff
usedStartCmd := true
for {
// Wait for an initial health check to pass, so that we are sure the server started.
err := readinessCheck(s.healthCheckUrls, nil)
if err != nil {
if usedStartCmd {
s.startCommand.Wait() // Release resources if necessary.
}
// This should not happen, immediately stop the e2eService process.
glog.Fatalf("restart loop readinessCheck failed for %s", s)
}
// Initial health check passed, wait until a health check fails again.
stillAlive:
for {
select {
case <-stopRestartingCh:
ackStopRestartingCh <- true
return
case <-time.After(time.Second):
for _, url := range s.healthCheckUrls {
resp, err := http.Get(url)
if err != nil || resp.StatusCode != http.StatusOK {
break stillAlive
}
}
}
}
if usedStartCmd {
s.startCommand.Wait() // Release resources from last cmd
usedStartCmd = false
}
if s.restartCommand != nil {
// Always make a fresh copy of restartCommand before running, we may have to restart multiple times
s.restartCommand = &exec.Cmd{
Path: s.restartCommand.Path,
Args: s.restartCommand.Args,
Env: s.restartCommand.Env,
Dir: s.restartCommand.Dir,
Stdin: s.restartCommand.Stdin,
Stdout: s.restartCommand.Stdout,
Stderr: s.restartCommand.Stderr,
ExtraFiles: s.restartCommand.ExtraFiles,
SysProcAttr: s.restartCommand.SysProcAttr,
}
err = s.restartCommand.Run() // Run and wait for exit. This command is assumed to have short duration, e.g. systemctl restart
if err != nil {
// This should not happen, immediately stop the e2eService process.
glog.Fatalf("restarting %s with restartCommand failed. Error: %v.", s, err)
}
} else {
s.startCommand = &exec.Cmd{
Path: s.startCommand.Path,
Args: s.startCommand.Args,
Env: s.startCommand.Env,
Dir: s.startCommand.Dir,
Stdin: s.startCommand.Stdin,
Stdout: s.startCommand.Stdout,
Stderr: s.startCommand.Stderr,
ExtraFiles: s.startCommand.ExtraFiles,
SysProcAttr: s.startCommand.SysProcAttr,
}
err = s.startCommand.Start()
usedStartCmd = true
if err != nil {
// This should not happen, immediately stop the e2eService process.
glog.Fatalf("restarting %s with startCommand failed. Error: %v.", s, err)
}
}
}
}
}()
return readinessCheck(s.healthCheckUrls, errCh)
@@ -640,12 +528,6 @@ func (s *server) kill() error {
name := s.name
cmd := s.startCommand
// If s has a restart loop, turn it off.
if s.restartOnExit {
s.stopRestartingCh <- true
<-s.ackStopRestartingCh
}
if s.killCommand != nil {
return s.killCommand.Run()
}