Merge pull request #117053 from dims/refactor-remote-runner-to-be-pluggable
Refactor remote runners to allow pluggable cloud specific extensions
This commit is contained in:
		@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package remote
 | 
			
		||||
package gce
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
@@ -30,6 +30,8 @@ import (
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/test/e2e_node/remote"
 | 
			
		||||
 | 
			
		||||
	"github.com/google/uuid"
 | 
			
		||||
	"golang.org/x/oauth2/google"
 | 
			
		||||
	"google.golang.org/api/compute/v1"
 | 
			
		||||
@@ -39,7 +41,11 @@ import (
 | 
			
		||||
	"sigs.k8s.io/yaml"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var _ Runner = (*GCERunner)(nil)
 | 
			
		||||
var _ remote.Runner = (*GCERunner)(nil)
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	remote.RegisterRunner("gce", NewGCERunner)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// envs is the type used to collect all node envs. The key is the env name,
 | 
			
		||||
// and the value is the env value
 | 
			
		||||
@@ -80,12 +86,12 @@ const (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type GCERunner struct {
 | 
			
		||||
	cfg               Config
 | 
			
		||||
	cfg               remote.Config
 | 
			
		||||
	gceComputeService *compute.Service
 | 
			
		||||
	gceImages         *internalGCEImageConfig
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewGCERunner(cfg Config) *GCERunner {
 | 
			
		||||
func NewGCERunner(cfg remote.Config) remote.Runner {
 | 
			
		||||
	if cfg.InstanceNamePrefix == "" {
 | 
			
		||||
		cfg.InstanceNamePrefix = "tmp-node-e2e-" + uuid.New().String()[:8]
 | 
			
		||||
	}
 | 
			
		||||
@@ -108,7 +114,7 @@ func (g *GCERunner) Validate() error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *GCERunner) StartTests(suite TestSuite, archivePath string, results chan *TestResult) (numTests int) {
 | 
			
		||||
func (g *GCERunner) StartTests(suite remote.TestSuite, archivePath string, results chan *remote.TestResult) (numTests int) {
 | 
			
		||||
	for shortName := range g.gceImages.images {
 | 
			
		||||
		imageConfig := g.gceImages.images[shortName]
 | 
			
		||||
		numTests++
 | 
			
		||||
@@ -443,7 +449,7 @@ func ignitionInjectGCEPublicKey(path string, content string) string {
 | 
			
		||||
 | 
			
		||||
// Provision a gce instance using image and run the tests in archive against the instance.
 | 
			
		||||
// Delete the instance afterward.
 | 
			
		||||
func (g *GCERunner) testGCEImage(suite TestSuite, archivePath string, imageConfig *internalGCEImage, junitFileName string) *TestResult {
 | 
			
		||||
func (g *GCERunner) testGCEImage(suite remote.TestSuite, archivePath string, imageConfig *internalGCEImage, junitFileName string) *remote.TestResult {
 | 
			
		||||
	ginkgoFlagsStr := g.cfg.GinkgoFlags
 | 
			
		||||
 | 
			
		||||
	host, err := g.createGCEInstance(imageConfig)
 | 
			
		||||
@@ -451,7 +457,7 @@ func (g *GCERunner) testGCEImage(suite TestSuite, archivePath string, imageConfi
 | 
			
		||||
		defer g.deleteGCEInstance(host)
 | 
			
		||||
	}
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return &TestResult{
 | 
			
		||||
		return &remote.TestResult{
 | 
			
		||||
			Err: fmt.Errorf("unable to create gce instance with running docker daemon for image %s.  %v", imageConfig.image, err),
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
@@ -461,27 +467,27 @@ func (g *GCERunner) testGCEImage(suite TestSuite, archivePath string, imageConfi
 | 
			
		||||
	deleteFiles := !g.cfg.DeleteInstances && g.cfg.Cleanup
 | 
			
		||||
 | 
			
		||||
	if err = g.registerGceHostIP(host); err != nil {
 | 
			
		||||
		return &TestResult{
 | 
			
		||||
		return &remote.TestResult{
 | 
			
		||||
			Err:    err,
 | 
			
		||||
			Host:   host,
 | 
			
		||||
			ExitOK: false,
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	output, exitOk, err := RunRemote(RunRemoteConfig{
 | 
			
		||||
		suite:          suite,
 | 
			
		||||
		archive:        archivePath,
 | 
			
		||||
		host:           host,
 | 
			
		||||
		cleanup:        deleteFiles,
 | 
			
		||||
		imageDesc:      imageConfig.imageDesc,
 | 
			
		||||
		junitFileName:  junitFileName,
 | 
			
		||||
		testArgs:       g.cfg.TestArgs,
 | 
			
		||||
		ginkgoArgs:     ginkgoFlagsStr,
 | 
			
		||||
		systemSpecName: g.cfg.SystemSpecName,
 | 
			
		||||
		extraEnvs:      g.cfg.ExtraEnvs,
 | 
			
		||||
		runtimeConfig:  g.cfg.RuntimeConfig,
 | 
			
		||||
	output, exitOk, err := remote.RunRemote(remote.RunRemoteConfig{
 | 
			
		||||
		Suite:          suite,
 | 
			
		||||
		Archive:        archivePath,
 | 
			
		||||
		Host:           host,
 | 
			
		||||
		Cleanup:        deleteFiles,
 | 
			
		||||
		ImageDesc:      imageConfig.imageDesc,
 | 
			
		||||
		JunitFileName:  junitFileName,
 | 
			
		||||
		TestArgs:       g.cfg.TestArgs,
 | 
			
		||||
		GinkgoArgs:     ginkgoFlagsStr,
 | 
			
		||||
		SystemSpecName: g.cfg.SystemSpecName,
 | 
			
		||||
		ExtraEnvs:      g.cfg.ExtraEnvs,
 | 
			
		||||
		RuntimeConfig:  g.cfg.RuntimeConfig,
 | 
			
		||||
	})
 | 
			
		||||
	result := TestResult{
 | 
			
		||||
	result := remote.TestResult{
 | 
			
		||||
		Output: output,
 | 
			
		||||
		Err:    err,
 | 
			
		||||
		Host:   host,
 | 
			
		||||
@@ -495,7 +501,7 @@ func (g *GCERunner) testGCEImage(suite TestSuite, archivePath string, imageConfi
 | 
			
		||||
		klog.Errorf("Failed to collect serial Output from node %q: %v", host, err)
 | 
			
		||||
	} else {
 | 
			
		||||
		logFilename := "serial-1.log"
 | 
			
		||||
		err := WriteLog(host, logFilename, serialPortOutput.Contents)
 | 
			
		||||
		err := remote.WriteLog(host, logFilename, serialPortOutput.Contents)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			klog.Errorf("Failed to write serial Output from node %q to %q: %v", host, logFilename, err)
 | 
			
		||||
		}
 | 
			
		||||
@@ -618,11 +624,11 @@ func (g *GCERunner) createGCEInstance(imageConfig *internalGCEImage) (string, er
 | 
			
		||||
		}
 | 
			
		||||
		externalIP := g.getExternalIP(instance)
 | 
			
		||||
		if len(externalIP) > 0 {
 | 
			
		||||
			AddHostnameIP(name, externalIP)
 | 
			
		||||
			remote.AddHostnameIP(name, externalIP)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		var output string
 | 
			
		||||
		output, err = SSH(name, "sh", "-c",
 | 
			
		||||
		output, err = remote.SSH(name, "sh", "-c",
 | 
			
		||||
			"'systemctl list-units  --type=service  --state=running | grep -e containerd -e crio'")
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			err = fmt.Errorf("instance %s not running containerd/crio daemon - Command failed: %s", name, output)
 | 
			
		||||
@@ -647,7 +653,7 @@ func (g *GCERunner) createGCEInstance(imageConfig *internalGCEImage) (string, er
 | 
			
		||||
				time.Sleep(time.Second * 20)
 | 
			
		||||
			}
 | 
			
		||||
			var finished string
 | 
			
		||||
			finished, err = SSH(name, "ls", "/var/lib/cloud/instance/boot-finished")
 | 
			
		||||
			finished, err = remote.SSH(name, "ls", "/var/lib/cloud/instance/boot-finished")
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				err = fmt.Errorf("instance %s has not finished cloud-init script: %s", name, finished)
 | 
			
		||||
				continue
 | 
			
		||||
@@ -702,7 +708,7 @@ func (g *GCERunner) registerGceHostIP(host string) error {
 | 
			
		||||
	}
 | 
			
		||||
	externalIP := g.getExternalIP(instance)
 | 
			
		||||
	if len(externalIP) > 0 {
 | 
			
		||||
		AddHostnameIP(host, externalIP)
 | 
			
		||||
		remote.AddHostnameIP(host, externalIP)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
@@ -744,7 +750,7 @@ func (g *GCERunner) updateKernelArguments(instance *compute.Instance, image stri
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	out, err := SSH(instance.Name, "sh", "-c", fmt.Sprintf("'%s'", strings.Join(cmd, "&&")))
 | 
			
		||||
	out, err := remote.SSH(instance.Name, "sh", "-c", fmt.Sprintf("'%s'", strings.Join(cmd, "&&")))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		klog.Errorf("failed to run command %s: out: %s, Err: %v", cmd, out, err)
 | 
			
		||||
		return err
 | 
			
		||||
@@ -767,7 +773,7 @@ func (g *GCERunner) rebootInstance(instance *compute.Instance) error {
 | 
			
		||||
	// wait until the instance will not response to SSH
 | 
			
		||||
	klog.Info("Reboot the node and wait for instance not to be available via SSH")
 | 
			
		||||
	if waitErr := wait.PollImmediate(5*time.Second, 5*time.Minute, func() (bool, error) {
 | 
			
		||||
		if _, err := SSH(instance.Name, "reboot"); err != nil {
 | 
			
		||||
		if _, err := remote.SSH(instance.Name, "reboot"); err != nil {
 | 
			
		||||
			return true, nil
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
@@ -779,7 +785,7 @@ func (g *GCERunner) rebootInstance(instance *compute.Instance) error {
 | 
			
		||||
	// wait until the instance will response again to SSH
 | 
			
		||||
	klog.Info("Wait for instance to be available via SSH")
 | 
			
		||||
	if waitErr := wait.PollImmediate(30*time.Second, 5*time.Minute, func() (bool, error) {
 | 
			
		||||
		if _, err := SSH(instance.Name, "sh", "-c", "date"); err != nil {
 | 
			
		||||
		if _, err := remote.SSH(instance.Name, "sh", "-c", "date"); err != nil {
 | 
			
		||||
			return false, nil
 | 
			
		||||
		}
 | 
			
		||||
		return true, nil
 | 
			
		||||
@@ -102,33 +102,33 @@ func CreateTestArchive(suite TestSuite, systemSpecName, kubeletConfigFile string
 | 
			
		||||
 | 
			
		||||
// RunRemote returns the command Output, whether the exit was ok, and any errors
 | 
			
		||||
type RunRemoteConfig struct {
 | 
			
		||||
	suite                                                                                    TestSuite
 | 
			
		||||
	archive                                                                                  string
 | 
			
		||||
	host                                                                                     string
 | 
			
		||||
	cleanup                                                                                  bool
 | 
			
		||||
	imageDesc, junitFileName, testArgs, ginkgoArgs, systemSpecName, extraEnvs, runtimeConfig string
 | 
			
		||||
	Suite                                                                                    TestSuite
 | 
			
		||||
	Archive                                                                                  string
 | 
			
		||||
	Host                                                                                     string
 | 
			
		||||
	Cleanup                                                                                  bool
 | 
			
		||||
	ImageDesc, JunitFileName, TestArgs, GinkgoArgs, SystemSpecName, ExtraEnvs, RuntimeConfig string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func RunRemote(cfg RunRemoteConfig) (string, bool, error) {
 | 
			
		||||
	// Create the temp staging directory
 | 
			
		||||
	klog.V(2).Infof("Staging test binaries on %q", cfg.host)
 | 
			
		||||
	klog.V(2).Infof("Staging test binaries on %q", cfg.Host)
 | 
			
		||||
	workspace := newWorkspaceDir()
 | 
			
		||||
	// Do not sudo here, so that we can use scp to copy test archive to the directory.
 | 
			
		||||
	if output, err := SSHNoSudo(cfg.host, "mkdir", workspace); err != nil {
 | 
			
		||||
	if output, err := SSHNoSudo(cfg.Host, "mkdir", workspace); err != nil {
 | 
			
		||||
		// Exit failure with the error
 | 
			
		||||
		return "", false, fmt.Errorf("failed to create workspace directory %q on Host %q: %v Output: %q", workspace, cfg.host, err, output)
 | 
			
		||||
		return "", false, fmt.Errorf("failed to create workspace directory %q on Host %q: %v Output: %q", workspace, cfg.Host, err, output)
 | 
			
		||||
	}
 | 
			
		||||
	if cfg.cleanup {
 | 
			
		||||
	if cfg.Cleanup {
 | 
			
		||||
		defer func() {
 | 
			
		||||
			output, err := SSH(cfg.host, "rm", "-rf", workspace)
 | 
			
		||||
			output, err := SSH(cfg.Host, "rm", "-rf", workspace)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				klog.Errorf("failed to cleanup workspace %q on Host %q: %v.  Output:\n%s", workspace, cfg.host, err, output)
 | 
			
		||||
				klog.Errorf("failed to cleanup workspace %q on Host %q: %v.  Output:\n%s", workspace, cfg.Host, err, output)
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Copy the archive to the staging directory
 | 
			
		||||
	if output, err := runSSHCommand(cfg.host, "scp", cfg.archive, fmt.Sprintf("%s:%s/", GetHostnameOrIP(cfg.host), workspace)); err != nil {
 | 
			
		||||
	if output, err := runSSHCommand(cfg.Host, "scp", cfg.Archive, fmt.Sprintf("%s:%s/", GetHostnameOrIP(cfg.Host), workspace)); err != nil {
 | 
			
		||||
		// Exit failure with the error
 | 
			
		||||
		return "", false, fmt.Errorf("failed to copy test archive: %v, Output: %q", err, output)
 | 
			
		||||
	}
 | 
			
		||||
@@ -138,34 +138,34 @@ func RunRemote(cfg RunRemoteConfig) (string, bool, error) {
 | 
			
		||||
		fmt.Sprintf("cd %s", workspace),
 | 
			
		||||
		fmt.Sprintf("tar -xzvf ./%s", archiveName),
 | 
			
		||||
	)
 | 
			
		||||
	klog.V(2).Infof("Extracting tar on %q", cfg.host)
 | 
			
		||||
	klog.V(2).Infof("Extracting tar on %q", cfg.Host)
 | 
			
		||||
	// Do not use sudo here, because `sudo tar -x` will recover the file ownership inside the tar ball, but
 | 
			
		||||
	// we want the extracted files to be owned by the current user.
 | 
			
		||||
	if output, err := SSHNoSudo(cfg.host, "sh", "-c", cmd); err != nil {
 | 
			
		||||
	if output, err := SSHNoSudo(cfg.Host, "sh", "-c", cmd); err != nil {
 | 
			
		||||
		// Exit failure with the error
 | 
			
		||||
		return "", false, fmt.Errorf("failed to extract test archive: %v, Output: %q", err, output)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Create the test result directory.
 | 
			
		||||
	resultDir := filepath.Join(workspace, "results")
 | 
			
		||||
	if output, err := SSHNoSudo(cfg.host, "mkdir", resultDir); err != nil {
 | 
			
		||||
	if output, err := SSHNoSudo(cfg.Host, "mkdir", resultDir); err != nil {
 | 
			
		||||
		// Exit failure with the error
 | 
			
		||||
		return "", false, fmt.Errorf("failed to create test result directory %q on Host %q: %v Output: %q", resultDir, cfg.host, err, output)
 | 
			
		||||
		return "", false, fmt.Errorf("failed to create test result directory %q on Host %q: %v Output: %q", resultDir, cfg.Host, err, output)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	klog.V(2).Infof("Running test on %q", cfg.host)
 | 
			
		||||
	output, err := cfg.suite.RunTest(cfg.host, workspace, resultDir, cfg.imageDesc, cfg.junitFileName, cfg.testArgs,
 | 
			
		||||
		cfg.ginkgoArgs, cfg.systemSpecName, cfg.extraEnvs, cfg.runtimeConfig, *testTimeout)
 | 
			
		||||
	klog.V(2).Infof("Running test on %q", cfg.Host)
 | 
			
		||||
	output, err := cfg.Suite.RunTest(cfg.Host, workspace, resultDir, cfg.ImageDesc, cfg.JunitFileName, cfg.TestArgs,
 | 
			
		||||
		cfg.GinkgoArgs, cfg.SystemSpecName, cfg.ExtraEnvs, cfg.RuntimeConfig, *testTimeout)
 | 
			
		||||
 | 
			
		||||
	var aggErrs []error
 | 
			
		||||
	// Do not log the Output here, let the caller deal with the test Output.
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		aggErrs = append(aggErrs, err)
 | 
			
		||||
		collectSystemLog(cfg.host)
 | 
			
		||||
		collectSystemLog(cfg.Host)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	klog.V(2).Infof("Copying test artifacts from %q", cfg.host)
 | 
			
		||||
	scpErr := getTestArtifacts(cfg.host, workspace)
 | 
			
		||||
	klog.V(2).Infof("Copying test artifacts from %q", cfg.Host)
 | 
			
		||||
	scpErr := getTestArtifacts(cfg.Host, workspace)
 | 
			
		||||
	if scpErr != nil {
 | 
			
		||||
		aggErrs = append(aggErrs, scpErr)
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -107,12 +107,16 @@ func RunRemoteTestSuite(testSuite TestSuite) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var sshRunner Runner
 | 
			
		||||
	switch *mode {
 | 
			
		||||
	case "gce":
 | 
			
		||||
		runner = NewGCERunner(cfg)
 | 
			
		||||
		sshRunner = NewSSHRunner(cfg)
 | 
			
		||||
	case "ssh":
 | 
			
		||||
 | 
			
		||||
	if *mode == "ssh" {
 | 
			
		||||
		runner = NewSSHRunner(cfg)
 | 
			
		||||
	} else {
 | 
			
		||||
		getRunner, err := GetRunner(*mode)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			klog.Fatalf("getting runner mode %q : %v", *mode, err)
 | 
			
		||||
		}
 | 
			
		||||
		runner = getRunner(cfg)
 | 
			
		||||
		sshRunner = NewSSHRunner(cfg)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := runner.Validate(); err != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -32,17 +32,17 @@ func (s *SSHRunner) StartTests(suite TestSuite, archivePath string, results chan
 | 
			
		||||
		numTests++
 | 
			
		||||
		go func(host string, junitFileName string) {
 | 
			
		||||
			output, exitOk, err := RunRemote(RunRemoteConfig{
 | 
			
		||||
				suite:          suite,
 | 
			
		||||
				archive:        archivePath,
 | 
			
		||||
				host:           host,
 | 
			
		||||
				cleanup:        s.cfg.Cleanup,
 | 
			
		||||
				imageDesc:      "",
 | 
			
		||||
				junitFileName:  junitFileName,
 | 
			
		||||
				testArgs:       s.cfg.TestArgs,
 | 
			
		||||
				ginkgoArgs:     s.cfg.GinkgoFlags,
 | 
			
		||||
				systemSpecName: s.cfg.SystemSpecName,
 | 
			
		||||
				extraEnvs:      s.cfg.ExtraEnvs,
 | 
			
		||||
				runtimeConfig:  s.cfg.RuntimeConfig,
 | 
			
		||||
				Suite:          suite,
 | 
			
		||||
				Archive:        archivePath,
 | 
			
		||||
				Host:           host,
 | 
			
		||||
				Cleanup:        s.cfg.Cleanup,
 | 
			
		||||
				ImageDesc:      "",
 | 
			
		||||
				JunitFileName:  junitFileName,
 | 
			
		||||
				TestArgs:       s.cfg.TestArgs,
 | 
			
		||||
				GinkgoArgs:     s.cfg.GinkgoFlags,
 | 
			
		||||
				SystemSpecName: s.cfg.SystemSpecName,
 | 
			
		||||
				ExtraEnvs:      s.cfg.ExtraEnvs,
 | 
			
		||||
				RuntimeConfig:  s.cfg.RuntimeConfig,
 | 
			
		||||
			})
 | 
			
		||||
			results <- &TestResult{
 | 
			
		||||
				Output: output,
 | 
			
		||||
 
 | 
			
		||||
@@ -74,3 +74,19 @@ func GetTestSuite(name string) (TestSuite, error) {
 | 
			
		||||
	}
 | 
			
		||||
	return nil, fmt.Errorf("unable to find testsuite for %s", name)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type NewRunner func(Config) Runner
 | 
			
		||||
 | 
			
		||||
var runners = make(map[string]NewRunner)
 | 
			
		||||
 | 
			
		||||
func RegisterRunner(name string, runner NewRunner) {
 | 
			
		||||
	runners[name] = runner
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func GetRunner(name string) (NewRunner, error) {
 | 
			
		||||
	runner, ok := runners[name]
 | 
			
		||||
	if ok {
 | 
			
		||||
		return runner, nil
 | 
			
		||||
	}
 | 
			
		||||
	return nil, fmt.Errorf("unable to runner for %s", name)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -27,6 +27,7 @@ import (
 | 
			
		||||
	"k8s.io/klog/v2"
 | 
			
		||||
 | 
			
		||||
	"k8s.io/kubernetes/test/e2e_node/remote"
 | 
			
		||||
	_ "k8s.io/kubernetes/test/e2e_node/remote/gce"
 | 
			
		||||
	"k8s.io/kubernetes/test/e2e_node/system"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user