Add run-services-mode option, and start e2e services in a separate

process.
This commit is contained in:
Random-Liu
2016-08-04 16:51:11 -07:00
parent 89651077b1
commit 3910a66bb5
6 changed files with 310 additions and 172 deletions

View File

@@ -24,6 +24,7 @@ import (
"net/http"
"os"
"os/exec"
"os/signal"
"path"
"path/filepath"
"reflect"
@@ -33,22 +34,122 @@ import (
"time"
"github.com/golang/glog"
"github.com/kardianos/osext"
"k8s.io/kubernetes/test/e2e/framework"
)
// TODO(random-liu): Move this file to a separate package.
var serverStartTimeout = flag.Duration("server-start-timeout", time.Second*120, "Time to wait for each server to become healthy.")
type e2eService struct {
killCmds []*killCmd
rmDirs []string
// E2EServices starts and stops e2e services in a separate process. The test uses it to start and
// stop all e2e services.
type E2EServices struct {
services *server
}
context *SharedContext
etcdDataDir string
nodeName string
logFiles map[string]logFileData
cgroupsPerQOS bool
evictionHard string
func NewE2EServices() *E2EServices {
return &E2EServices{}
}
// services.log is the combined log of all services
const servicesLogFile = "services.log"
// Start starts the e2e services in another process, it returns when all e2e
// services are ready.
// We want to statically link e2e services into the test binary, but we don't
// want their glog to pollute the test result. So we run the binary in run-
// services-mode to start e2e services in another process.
func (e *E2EServices) Start() error {
var err error
// Create the manifest path for kubelet.
// TODO(random-liu): Remove related logic when we move kubelet starting logic out of the test.
framework.TestContext.ManifestPath, err = ioutil.TempDir("", "node-e2e-pod")
if err != nil {
return fmt.Errorf("failed to create static pod manifest directory: %v", err)
}
testBin, err := osext.Executable()
if err != nil {
return fmt.Errorf("can't get current binary: %v", err)
}
// TODO(random-liu): Add sudo after we statically link apiserver and etcd, because apiserver needs
// sudo. We can't add sudo now, because etcd may not be in PATH of root.
startCmd := exec.Command(testBin,
"--run-services-mode",
"--server-start-timeout", serverStartTimeout.String(),
"--report-dir", framework.TestContext.ReportDir,
// TODO(random-liu): Remove the following flags after we move kubelet starting logic
// out of the test.
"--node-name", framework.TestContext.NodeName,
"--disable-kubenet="+strconv.FormatBool(framework.TestContext.DisableKubenet),
"--cgroups-per-qos="+strconv.FormatBool(framework.TestContext.CgroupsPerQOS),
"--manifest-path", framework.TestContext.ManifestPath,
"--eviction-hard", framework.TestContext.EvictionHard,
)
e.services = newServer("services", startCmd, nil, getHealthCheckURLs(), servicesLogFile)
return e.services.start()
}
// Stop stops the e2e services.
func (e *E2EServices) Stop() error {
defer func() {
// Cleanup the manifest path for kubelet.
manifestPath := framework.TestContext.ManifestPath
if manifestPath != "" {
err := os.RemoveAll(manifestPath)
if err != nil {
glog.Errorf("Failed to delete static pod manifest directory %s.\n%v", manifestPath, err)
}
}
}()
if e.services == nil {
glog.Errorf("can't stop e2e services, because `services` is nil")
}
return e.services.kill()
}
// RunE2EServices actually start the e2e services. This function is used to
// start e2e services in current process. This is only used in run-services-mode.
func RunE2EServices() {
e := newE2EService()
if err := e.run(); err != nil {
glog.Fatalf("Failed to run e2e services: %v", err)
}
}
// Ports of different e2e services.
const (
etcdPort = "4001"
apiserverPort = "8080"
kubeletPort = "10250"
kubeletReadOnlyPort = "10255"
)
// Health check urls of different e2e services.
var (
etcdHealthCheckURL = getEndpoint(etcdPort) + "/v2/keys/" // Trailing slash is required,
apiserverHealthCheckURL = getEndpoint(apiserverPort) + "/healthz"
kubeletHealthCheckURL = getEndpoint(kubeletReadOnlyPort) + "/healthz"
)
// getEndpoint generates endpoint url from service port.
func getEndpoint(port string) string {
return "http://127.0.0.1:" + port
}
func getHealthCheckURLs() []string {
return []string{
etcdHealthCheckURL,
apiserverHealthCheckURL,
kubeletHealthCheckURL,
}
}
// e2eService is used internally in this file to start e2e services in current process.
type e2eService struct {
services []*server
rmDirs []string
logFiles map[string]logFileData
}
type logFileData struct {
@@ -63,7 +164,7 @@ const (
defaultEtcdPath = "/tmp/etcd"
)
func newE2eService(nodeName string, cgroupsPerQOS bool, evictionHard string, context *SharedContext) *e2eService {
func newE2EService() *e2eService {
// Special log files that need to be collected for additional debugging.
var logFiles = map[string]logFileData{
"kern.log": {[]string{"/var/log/kern.log"}, []string{"-k"}},
@@ -71,13 +172,25 @@ func newE2eService(nodeName string, cgroupsPerQOS bool, evictionHard string, con
"cloud-init.log": {[]string{"/var/log/cloud-init.log"}, []string{"-u", "cloud*"}},
}
return &e2eService{
context: context,
nodeName: nodeName,
logFiles: logFiles,
cgroupsPerQOS: cgroupsPerQOS,
evictionHard: evictionHard,
return &e2eService{logFiles: logFiles}
}
// terminationSignals are signals that cause the program to exit in the
// supported platforms (linux, darwin, windows).
var terminationSignals = []os.Signal{syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT}
// run starts all e2e services and wait for the termination signal. Once receives the
// termination signal, it will stop the e2e services gracefully.
func (es *e2eService) run() error {
defer es.stop()
if err := es.start(); err != nil {
return err
}
// Wait until receiving a termination signal.
sig := make(chan os.Signal, 1)
signal.Notify(sig, terminationSignals...)
<-sig
return nil
}
func (es *e2eService) start() error {
@@ -88,25 +201,23 @@ func (es *e2eService) start() error {
return err
}
cmd, err := es.startEtcd()
s, err := es.startEtcd()
if err != nil {
return err
}
es.killCmds = append(es.killCmds, cmd)
es.rmDirs = append(es.rmDirs, es.etcdDataDir)
es.services = append(es.services, s)
cmd, err = es.startApiServer()
s, err = es.startApiServer()
if err != nil {
return err
}
es.killCmds = append(es.killCmds, cmd)
es.services = append(es.services, s)
cmd, err = es.startKubeletServer()
s, err = es.startKubeletServer()
if err != nil {
return err
}
es.killCmds = append(es.killCmds, cmd)
es.rmDirs = append(es.rmDirs, es.context.PodConfigPath)
es.services = append(es.services, s)
return nil
}
@@ -167,9 +278,10 @@ func isJournaldAvailable() bool {
}
func (es *e2eService) stop() {
for _, k := range es.killCmds {
if err := k.Kill(); err != nil {
glog.Errorf("Failed to stop %v: %v", k.name, err)
es.getLogFiles()
for _, s := range es.services {
if err := s.kill(); err != nil {
glog.Errorf("Failed to stop %v: %v", s.name, err)
}
}
for _, d := range es.rmDirs {
@@ -180,12 +292,13 @@ func (es *e2eService) stop() {
}
}
func (es *e2eService) startEtcd() (*killCmd, error) {
func (es *e2eService) startEtcd() (*server, error) {
dataDir, err := ioutil.TempDir("", "node-e2e")
if err != nil {
return nil, err
}
es.etcdDataDir = dataDir
// Mark the dataDir as directories to remove.
es.rmDirs = append(es.rmDirs, dataDir)
var etcdPath string
// CoreOS ships a binary named 'etcd' which is really old, so prefer 'etcd2' if it exists
etcdPath, err = exec.LookPath("etcd2")
@@ -205,37 +318,36 @@ func (es *e2eService) startEtcd() (*killCmd, error) {
"--advertise-client-urls=http://0.0.0.0:2379,http://0.0.0.0:4001")
// Execute etcd in the data directory instead of using --data-dir because the flag sometimes requires additional
// configuration (e.g. --name in version 0.4.9)
cmd.Dir = es.etcdDataDir
hcc := newHealthCheckCommand(
"http://127.0.0.1:4001/v2/keys/", // Trailing slash is required,
cmd.Dir = dataDir
server := newServer(
"etcd",
cmd,
nil,
[]string{etcdHealthCheckURL},
"etcd.log")
return &killCmd{name: "etcd", cmd: cmd}, es.startServer(hcc)
return server, server.start()
}
func (es *e2eService) startApiServer() (*killCmd, error) {
func (es *e2eService) startApiServer() (*server, error) {
cmd := exec.Command("sudo", getApiServerBin(),
"--etcd-servers", "http://127.0.0.1:4001",
"--etcd-servers", getEndpoint(etcdPort),
"--insecure-bind-address", "0.0.0.0",
"--service-cluster-ip-range", "10.0.0.1/24",
"--kubelet-port", "10250",
"--kubelet-port", kubeletPort,
"--allow-privileged", "true",
"--v", LOG_VERBOSITY_LEVEL, "--logtostderr",
)
hcc := newHealthCheckCommand(
"http://127.0.0.1:8080/healthz",
server := newServer(
"apiserver",
cmd,
nil,
[]string{apiserverHealthCheckURL},
"kube-apiserver.log")
return &killCmd{name: "kube-apiserver", cmd: cmd}, es.startServer(hcc)
return server, server.start()
}
func (es *e2eService) startKubeletServer() (*killCmd, error) {
dataDir, err := ioutil.TempDir("", "node-e2e-pod")
if err != nil {
return nil, err
}
es.context.PodConfigPath = dataDir
var killOverride *exec.Cmd
func (es *e2eService) startKubeletServer() (*server, error) {
var killCommand *exec.Cmd
cmdArgs := []string{}
if systemdRun, err := exec.LookPath("systemd-run"); err == nil {
// On systemd services, detection of a service / unit works reliably while
@@ -244,7 +356,7 @@ func (es *e2eService) startKubeletServer() (*killCmd, error) {
// sense to test it that way
unitName := fmt.Sprintf("kubelet-%d.service", rand.Int31())
cmdArgs = append(cmdArgs, systemdRun, "--unit="+unitName, getKubeletServerBin())
killOverride = exec.Command("sudo", "systemctl", "kill", unitName)
killCommand = exec.Command("sudo", "systemctl", "kill", unitName)
es.logFiles["kubelet.log"] = logFileData{
journalctlCommand: []string{"-u", unitName},
}
@@ -258,26 +370,27 @@ func (es *e2eService) startKubeletServer() (*killCmd, error) {
)
}
cmdArgs = append(cmdArgs,
"--api-servers", "http://127.0.0.1:8080",
"--api-servers", getEndpoint(apiserverPort),
"--address", "0.0.0.0",
"--port", "10250",
"--hostname-override", es.nodeName, // Required because hostname is inconsistent across hosts
"--port", kubeletPort,
"--read-only-port", kubeletReadOnlyPort,
"--hostname-override", framework.TestContext.NodeName, // Required because hostname is inconsistent across hosts
"--volume-stats-agg-period", "10s", // Aggregate volumes frequently so tests don't need to wait as long
"--allow-privileged", "true",
"--serialize-image-pulls", "false",
"--config", es.context.PodConfigPath,
"--config", framework.TestContext.ManifestPath,
"--file-check-frequency", "10s", // Check file frequently so tests won't wait too long
"--v", LOG_VERBOSITY_LEVEL, "--logtostderr",
"--pod-cidr=10.180.0.0/24", // Assign a fixed CIDR to the node because there is no node controller.
"--eviction-hard", es.evictionHard,
"--eviction-hard", framework.TestContext.EvictionHard,
"--eviction-pressure-transition-period", "30s",
)
if es.cgroupsPerQOS {
if framework.TestContext.CgroupsPerQOS {
cmdArgs = append(cmdArgs,
"--cgroups-per-qos", "true",
)
}
if !*disableKubenet {
if !framework.TestContext.DisableKubenet {
cwd, err := os.Getwd()
if err != nil {
return nil, err
@@ -288,31 +401,99 @@ func (es *e2eService) startKubeletServer() (*killCmd, error) {
}
cmd := exec.Command("sudo", cmdArgs...)
hcc := newHealthCheckCommand(
"http://127.0.0.1:10255/healthz",
server := newServer(
"kubelet",
cmd,
killCommand,
[]string{kubeletHealthCheckURL},
"kubelet.log")
return &killCmd{name: "kubelet", cmd: cmd, override: killOverride}, es.startServer(hcc)
return server, server.start()
}
func (es *e2eService) startServer(cmd *healthCheckCommand) error {
cmdErrorChan := make(chan error)
// server manages a server started and killed with commands.
type server struct {
// name is the name of the server, it is only used for logging.
name string
// startCommand is the command used to start the server
startCommand *exec.Cmd
// killCommand is the command used to stop the server. It is not required. If it
// is not specified, `sudo kill` will be used to stop the server.
killCommand *exec.Cmd
// healthCheckUrls is the urls used to check whether the server is ready.
healthCheckUrls []string
// outFilename is the name of the log file. The stdout and stderr of the server
// will be redirected to this file.
outFilename string
}
func newServer(name string, start, kill *exec.Cmd, urls []string, filename string) *server {
return &server{
name: name,
startCommand: start,
killCommand: kill,
healthCheckUrls: urls,
outFilename: filename,
}
}
// commandToString format command to string.
func commandToString(c *exec.Cmd) string {
if c == nil {
return ""
}
return strings.Join(append([]string{c.Path}, c.Args[1:]...), " ")
}
func (s *server) String() string {
return fmt.Sprintf("server %q start-command: `%s`, kill-command: `%s`, health-check: %v, output-file: %q", s.name,
commandToString(s.startCommand), commandToString(s.killCommand), s.healthCheckUrls, s.outFilename)
}
// readinessCheck checks whether services are ready via the health check urls. Once there is
// an error in errCh, the function will stop waiting and return the error.
// TODO(random-liu): Move this to util
func readinessCheck(urls []string, errCh <-chan error) error {
endTime := time.Now().Add(*serverStartTimeout)
for endTime.After(time.Now()) {
select {
case err := <-errCh:
return err
case <-time.After(time.Second):
ready := true
for _, url := range urls {
resp, err := http.Get(url)
if err != nil || resp.StatusCode != http.StatusOK {
ready = false
break
}
}
if ready {
return nil
}
}
}
return fmt.Errorf("e2e service readiness check timeout %v", *serverStartTimeout)
}
func (s *server) start() error {
errCh := make(chan error)
go func() {
defer close(cmdErrorChan)
defer close(errCh)
// Create the output filename
outPath := path.Join(framework.TestContext.ReportDir, cmd.outputFilename)
outPath := path.Join(framework.TestContext.ReportDir, s.outFilename)
outfile, err := os.Create(outPath)
if err != nil {
cmdErrorChan <- fmt.Errorf("Failed to create file %s for `%s` %v.", outPath, cmd, err)
errCh <- fmt.Errorf("failed to create file %q for `%s` %v.", outPath, s, err)
return
}
defer outfile.Close()
defer outfile.Sync()
cmd := s.startCommand
// Set the command to write the output file
cmd.Cmd.Stdout = outfile
cmd.Cmd.Stderr = outfile
cmd.Stdout = outfile
cmd.Stderr = outfile
// Death of this test process should kill the server as well.
attrs := &syscall.SysProcAttr{}
@@ -321,63 +502,41 @@ func (es *e2eService) startServer(cmd *healthCheckCommand) error {
if deathSigField.IsValid() {
deathSigField.Set(reflect.ValueOf(syscall.SIGTERM))
} else {
cmdErrorChan <- fmt.Errorf("Failed to set Pdeathsig field (non-linux build)")
errCh <- fmt.Errorf("failed to set Pdeathsig field (non-linux build)")
return
}
cmd.Cmd.SysProcAttr = attrs
cmd.SysProcAttr = attrs
// Run the command
err = cmd.Run()
if err != nil {
cmdErrorChan <- fmt.Errorf("%s Failed with error \"%v\". Output written to: %s", cmd, err, outPath)
errCh <- fmt.Errorf("failed to run server start command %q: %v", commandToString(cmd), err)
return
}
}()
endTime := time.Now().Add(*serverStartTimeout)
for endTime.After(time.Now()) {
select {
case err := <-cmdErrorChan:
return err
case <-time.After(time.Second):
resp, err := http.Get(cmd.HealthCheckUrl)
if err == nil && resp.StatusCode == http.StatusOK {
return nil
}
}
}
return fmt.Errorf("Timeout waiting for service %s", cmd)
return readinessCheck(s.healthCheckUrls, errCh)
}
// killCmd is a struct to kill a given cmd. The cmd member specifies a command
// to find the pid of and attempt to kill.
// If the override field is set, that will be used instead to kill the command.
// name is only used for logging
type killCmd struct {
name string
cmd *exec.Cmd
override *exec.Cmd
}
func (s *server) kill() error {
name := s.name
cmd := s.startCommand
func (k *killCmd) Kill() error {
name := k.name
cmd := k.cmd
if k.override != nil {
return k.override.Run()
if s.killCommand != nil {
return s.killCommand.Run()
}
if cmd == nil {
return fmt.Errorf("Could not kill %s because both `override` and `cmd` are nil", name)
return fmt.Errorf("could not kill %q because both `killCommand` and `startCommand` are nil", name)
}
if cmd.Process == nil {
glog.V(2).Infof("%s not running", name)
glog.V(2).Infof("%q not running", name)
return nil
}
pid := cmd.Process.Pid
if pid <= 1 {
return fmt.Errorf("invalid PID %d for %s", pid, name)
return fmt.Errorf("invalid PID %d for %q", pid, name)
}
// Attempt to shut down the process in a friendly manner before forcing it.
@@ -413,7 +572,7 @@ func (k *killCmd) Kill() error {
select {
case err := <-waitChan:
if err != nil {
return fmt.Errorf("error stopping %s: %v", name, err)
return fmt.Errorf("error stopping %q: %v", name, err)
}
// Success!
return nil
@@ -422,23 +581,5 @@ func (k *killCmd) Kill() error {
}
}
return fmt.Errorf("unable to stop %s", name)
}
type healthCheckCommand struct {
*exec.Cmd
HealthCheckUrl string
outputFilename string
}
func newHealthCheckCommand(healthCheckUrl string, cmd *exec.Cmd, filename string) *healthCheckCommand {
return &healthCheckCommand{
HealthCheckUrl: healthCheckUrl,
Cmd: cmd,
outputFilename: filename,
}
}
func (hcc *healthCheckCommand) String() string {
return fmt.Sprintf("`%s` health-check: %s", strings.Join(append([]string{hcc.Path}, hcc.Args[1:]...), " "), hcc.HealthCheckUrl)
return fmt.Errorf("unable to stop %q", name)
}