integration: init release upgrade test

The TestUpgrade downloads the latest of previous release's binary and
use them to setup pods and then use current release to recover the
existing pods.

Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
Wei Fu 2023-10-18 22:17:55 +08:00
parent bd2db42464
commit 2fab240f21
3 changed files with 511 additions and 1 deletions

2
go.mod
View File

@ -64,6 +64,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.14.0
go.opentelemetry.io/otel/sdk v1.14.0
go.opentelemetry.io/otel/trace v1.14.0
golang.org/x/mod v0.12.0
golang.org/x/sync v0.3.0
golang.org/x/sys v0.13.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20230726155614-23370e0ffb3e
@ -114,7 +115,6 @@ require (
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect
go.opentelemetry.io/otel/metric v0.37.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.10.0 // indirect
golang.org/x/term v0.13.0 // indirect

View File

@ -0,0 +1,392 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package integration
import (
"bufio"
"context"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"syscall"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
criruntime "k8s.io/cri-api/pkg/apis/runtime/v1"
cri "github.com/containerd/containerd/v2/integration/cri-api/pkg/apis"
"github.com/containerd/containerd/v2/integration/images"
"github.com/containerd/containerd/v2/integration/remote"
)
// upgradeVerifyCaseFunc is used to verify the behavior after upgrade.
type upgradeVerifyCaseFunc func(t *testing.T, criRuntimeService cri.RuntimeService)
// TODO: Support Windows
func TestUpgrade(t *testing.T) {
previousReleaseBinDir := t.TempDir()
downloadPreviousReleaseBinary(t, previousReleaseBinDir)
t.Run("recover", runUpgradeTestCase(previousReleaseBinDir, shouldRecoverAllThePodsAfterUpgrade))
// TODO:
// Add exec/stats/stop-existing-running-pods/...
}
func runUpgradeTestCase(
previousReleaseBinDir string,
setupUpgradeVerifyCase func(t *testing.T, criRuntimeService cri.RuntimeService, criImageService cri.ImageManagerService) upgradeVerifyCaseFunc,
) func(t *testing.T) {
return func(t *testing.T) {
workDir := t.TempDir()
t.Log("Install config for previous release")
previousReleaseCtrdConfig(t, previousReleaseBinDir, workDir)
t.Log("Starting the previous release's containerd")
previousCtrdBinPath := filepath.Join(previousReleaseBinDir, "bin", "containerd")
previousProc := newCtrdProc(t, previousCtrdBinPath, workDir)
ctrdLogPath := previousProc.logPath()
t.Cleanup(func() {
dumpFileContent(t, ctrdLogPath)
})
require.NoError(t, previousProc.isReady())
needToCleanup := true
t.Cleanup(func() {
if t.Failed() && needToCleanup {
t.Logf("Try to cleanup leaky pods")
cleanupPods(t, previousProc.criRuntimeService(t))
}
})
t.Log("Prepare pods for current release")
upgradeCaseFunc := setupUpgradeVerifyCase(t, previousProc.criRuntimeService(t), previousProc.criImageService(t))
needToCleanup = false
t.Log("Gracefully stop previous release's containerd process")
require.NoError(t, previousProc.kill(syscall.SIGTERM))
require.NoError(t, previousProc.wait(5*time.Minute))
t.Log("Install default config for current release")
currentReleaseCtrdDefaultConfig(t, workDir)
t.Log("Starting the current release's containerd")
currentProc := newCtrdProc(t, "containerd", workDir)
require.NoError(t, currentProc.isReady())
t.Cleanup(func() {
t.Log("Cleanup all the pods")
cleanupPods(t, currentProc.criRuntimeService(t))
t.Log("Stopping current release's containerd process")
require.NoError(t, currentProc.kill(syscall.SIGTERM))
require.NoError(t, currentProc.wait(5*time.Minute))
})
t.Log("Verifing")
upgradeCaseFunc(t, currentProc.criRuntimeService(t))
}
}
func shouldRecoverAllThePodsAfterUpgrade(t *testing.T, criRuntimeService cri.RuntimeService, criImageService cri.ImageManagerService) upgradeVerifyCaseFunc {
var busyboxImage = images.Get(images.BusyBox)
t.Logf("Pulling image %q", busyboxImage)
_, err := criImageService.PullImage(&criruntime.ImageSpec{Image: busyboxImage}, nil, nil)
require.NoError(t, err)
t.Log("Create first sandbox")
firstSBConfig := PodSandboxConfig("sandbox", "running-pod")
firstSB, err := criRuntimeService.RunPodSandbox(firstSBConfig, "")
require.NoError(t, err)
t.Logf("Create a container config and run container in first pod")
containerConfig := ContainerConfig("running", busyboxImage, WithCommand("sleep", "1d"))
cn1InFirstSB, err := criRuntimeService.CreateContainer(firstSB, containerConfig, firstSBConfig)
require.NoError(t, err)
require.NoError(t, criRuntimeService.StartContainer(cn1InFirstSB))
t.Logf("Just create a container in first pod")
containerConfig = ContainerConfig("created", busyboxImage)
cn2InFirstSB, err := criRuntimeService.CreateContainer(firstSB, containerConfig, firstSBConfig)
require.NoError(t, err)
t.Logf("Just create stopped container in first pod")
containerConfig = ContainerConfig("stopped", busyboxImage, WithCommand("sleep", "1d"))
cn3InFirstSB, err := criRuntimeService.CreateContainer(firstSB, containerConfig, firstSBConfig)
require.NoError(t, err)
require.NoError(t, criRuntimeService.StartContainer(cn3InFirstSB))
require.NoError(t, criRuntimeService.StopContainer(cn3InFirstSB, 0))
t.Log("Create second sandbox")
secondSBConfig := PodSandboxConfig("sandbox", "stopped-pod")
secondSB, err := criRuntimeService.RunPodSandbox(secondSBConfig, "")
require.NoError(t, err)
t.Log("Stop second sandbox")
require.NoError(t, criRuntimeService.StopPodSandbox(secondSB))
return func(t *testing.T, criRuntimeService cri.RuntimeService) {
t.Log("List Pods")
pods, err := criRuntimeService.ListPodSandbox(nil)
require.NoError(t, err)
require.Len(t, pods, 2)
for _, pod := range pods {
t.Logf("Checking pod %s", pod.Id)
switch pod.Id {
case firstSB:
assert.Equal(t, criruntime.PodSandboxState_SANDBOX_READY, pod.State)
cntrs, err := criRuntimeService.ListContainers(&criruntime.ContainerFilter{
PodSandboxId: pod.Id,
})
require.NoError(t, err)
require.Equal(t, 3, len(cntrs))
for _, cntr := range cntrs {
switch cntr.Id {
case cn1InFirstSB:
assert.Equal(t, criruntime.ContainerState_CONTAINER_RUNNING, cntr.State)
case cn2InFirstSB:
assert.Equal(t, criruntime.ContainerState_CONTAINER_CREATED, cntr.State)
case cn3InFirstSB:
assert.Equal(t, criruntime.ContainerState_CONTAINER_EXITED, cntr.State)
default:
t.Errorf("unexpected container %s in %s", cntr.Id, pod.Id)
}
}
case secondSB:
assert.Equal(t, criruntime.PodSandboxState_SANDBOX_NOTREADY, pod.State)
default:
t.Errorf("unexpected pod %s", pod.Id)
}
}
}
}
// cleanupPods deletes all the pods based on the cri.RuntimeService connection.
func cleanupPods(t *testing.T, criRuntimeService cri.RuntimeService) {
pods, err := criRuntimeService.ListPodSandbox(nil)
require.NoError(t, err)
for _, pod := range pods {
assert.NoError(t, criRuntimeService.StopPodSandbox(pod.Id))
assert.NoError(t, criRuntimeService.RemovePodSandbox(pod.Id))
}
}
// currentReleaseCtrdDefaultConfig generates empty(default) config for current release.
func currentReleaseCtrdDefaultConfig(t *testing.T, targetDir string) {
fileName := filepath.Join(targetDir, "config.toml")
err := os.WriteFile(fileName, []byte(""), 0600)
require.NoError(t, err, "failed to create config for current release")
}
// previousReleaseCtrdConfig generates containerd config with previous release
// shim binary.
func previousReleaseCtrdConfig(t *testing.T, previousReleaseBinDir, targetDir string) {
// TODO(fuweid):
//
// We should choose correct config version based on previous release.
// Currently, we're focusing on v1.x -> v2.0 so we use version = 2 here.
rawCfg := fmt.Sprintf(`
version = 2
[plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runc]
runtime_type = "%s/bin/containerd-shim-runc-v2"
`,
previousReleaseBinDir)
fileName := filepath.Join(targetDir, "config.toml")
err := os.WriteFile(fileName, []byte(rawCfg), 0600)
require.NoError(t, err, "failed to create config for previous release")
}
// criRuntimeService returns cri.RuntimeService based on the grpc address.
func (p *ctrdProc) criRuntimeService(t *testing.T) cri.RuntimeService {
service, err := remote.NewRuntimeService(p.grpcAddress(), 1*time.Minute)
require.NoError(t, err)
return service
}
// criImageService returns cri.ImageManagerService based on the grpc address.
func (p *ctrdProc) criImageService(t *testing.T) cri.ImageManagerService {
service, err := remote.NewImageService(p.grpcAddress(), 1*time.Minute)
require.NoError(t, err)
return service
}
// newCtrdProc is to start containerd process.
func newCtrdProc(t *testing.T, ctrdBin string, ctrdWorkDir string) *ctrdProc {
p := &ctrdProc{workDir: ctrdWorkDir}
var args []string
args = append(args, "--root", p.rootPath())
args = append(args, "--state", p.statePath())
args = append(args, "--address", p.grpcAddress())
args = append(args, "--config", p.configPath())
args = append(args, "--log-level", "debug")
f, err := os.OpenFile(p.logPath(), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
require.NoError(t, err, "open log file %s", p.logPath())
t.Cleanup(func() { f.Close() })
cmd := exec.Command(ctrdBin, args...)
cmd.Stdout = f
cmd.Stderr = f
cmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGKILL}
p.cmd = cmd
p.waitBlock = make(chan struct{})
go func() {
// The PDeathSIG is based on the thread which forks the child
// process instead of the leader of thread group. Lock the
// thread just in case that the thread exits and causes unexpected
// SIGKILL to containerd.
runtime.LockOSThread()
defer runtime.UnlockOSThread()
defer close(p.waitBlock)
require.NoError(t, p.cmd.Start(), "start containerd(%s)", ctrdBin)
assert.NoError(t, p.cmd.Wait())
}()
return p
}
// ctrdProc is used to control the containerd process's lifecycle.
type ctrdProc struct {
// workDir has the following layout:
//
// - root (dir)
// - state (dir)
// - containerd.sock (sock file)
// - config.toml (toml file, required)
// - containerd.log (log file, always open with O_APPEND)
workDir string
cmd *exec.Cmd
waitBlock chan struct{}
}
// kill is to send the signal to containerd process.
func (p *ctrdProc) kill(sig syscall.Signal) error {
return p.cmd.Process.Signal(sig)
}
// wait is to wait for exit event of containerd process.
func (p *ctrdProc) wait(to time.Duration) error {
var ctx = context.Background()
var cancel context.CancelFunc
if to > 0 {
ctx, cancel = context.WithTimeout(ctx, to)
defer cancel()
}
select {
case <-ctx.Done():
return ctx.Err()
case <-p.waitBlock:
return nil
}
}
// grpcAddress is to return containerd's address.
func (p *ctrdProc) grpcAddress() string {
return filepath.Join(p.workDir, "containerd.sock")
}
// configPath is to return containerd's config file.
func (p *ctrdProc) configPath() string {
return filepath.Join(p.workDir, "config.toml")
}
// rootPath is to return containerd's root path.
func (p *ctrdProc) rootPath() string {
return filepath.Join(p.workDir, "root")
}
// statePath is to return containerd's state path.
func (p *ctrdProc) statePath() string {
return filepath.Join(p.workDir, "state")
}
// logPath is to return containerd's log path.
func (p *ctrdProc) logPath() string {
return filepath.Join(p.workDir, "containerd.log")
}
// isReady checks the containerd is ready or not.
func (p *ctrdProc) isReady() error {
var (
service cri.RuntimeService
err error
ticker = time.NewTicker(1 * time.Second)
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Minute)
)
defer func() {
cancel()
ticker.Stop()
}()
for {
select {
case <-ticker.C:
service, err = remote.NewRuntimeService(p.grpcAddress(), 5*time.Second)
if err != nil {
continue
}
if _, err = service.Status(); err != nil {
continue
}
return nil
case <-ctx.Done():
return fmt.Errorf("context deadline exceeded: %w", err)
}
}
}
// dumpFileContent dumps file content into t.Log.
func dumpFileContent(t *testing.T, filePath string) {
f, err := os.Open(filePath)
require.NoError(t, err)
defer f.Close()
r := bufio.NewReader(f)
for {
line, err := r.ReadString('\n')
switch err {
case nil:
t.Log(strings.TrimSuffix(line, "\n"))
case io.EOF:
return
default:
require.NoError(t, err)
}
}
}

View File

@ -0,0 +1,118 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package integration
import (
"compress/gzip"
"context"
"fmt"
"net/http"
"runtime"
"strings"
"testing"
"github.com/stretchr/testify/require"
"golang.org/x/mod/semver"
exec "golang.org/x/sys/execabs"
"github.com/containerd/containerd/v2/archive"
"github.com/containerd/containerd/v2/version"
)
// downloadPreviousReleaseBinary downloads the latest version of previous release
// into the target dir.
func downloadPreviousReleaseBinary(t *testing.T, targetDir string) {
ver := previousReleaseVersion(t)
targetURL := fmt.Sprintf("https://github.com/containerd/containerd/releases/download/%s/containerd-%s-linux-%s.tar.gz",
ver, strings.TrimPrefix(ver, "v"), runtime.GOARCH,
)
resp, err := http.Get(targetURL) //nolint:gosec
require.NoError(t, err, "failed to http-get %s", targetURL)
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)
tarReader, err := gzip.NewReader(resp.Body)
require.NoError(t, err, "%s should be gzip stream", targetURL)
// NOTE: Use native applier to take release tar.gzip stream as first image layer :)
_, err = archive.Apply(context.Background(), targetDir, tarReader)
require.NoError(t, err, "failed to unpack %s gzip stream into %s", targetURL, targetDir)
}
// previousReleaseVersion returns the latest version of previous release.
func previousReleaseVersion(t *testing.T) string {
majorMinor := ctrdPreviousMajorMinor(t)
tags := gitLsRemoteCtrdTags(t, fmt.Sprintf("refs/tags/%s.*", majorMinor))
require.True(t, len(tags) >= 1)
// sort them and get the latest version
semver.Sort(tags)
return tags[len(tags)-1]
}
// ctrdPreviousMajorMinor gets the current version of running containerd.
//
// TODO(fuweid): We should parse containerd --version to get the result.
func ctrdPreviousMajorMinor(t *testing.T) string {
currentVer := "v" + version.Version
version := semver.MajorMinor(currentVer)
switch version {
case "v2.0":
return "v1.7"
case "v1.7":
return "v1.6"
default:
t.Fatalf("unexpected containerd version: %s", currentVer)
panic("unreachable")
}
}
// gitLsRemoteTags lists containerd tags based on pattern.
func gitLsRemoteCtrdTags(t *testing.T, pattern string) (_tags []string) {
cmd := exec.Command("git", "ls-remote", "--tags", "--exit-code",
"https://github.com/containerd/containerd.git", pattern)
out, err := cmd.Output()
require.NoError(t, err, "failed to list tags by pattern %s", pattern)
// output is like
//
// 137288ad010d39ae6ef578fa53bf9b93d1356c3a refs/tags/v1.6.8
// 9cd3357b7fd7218e4aec3eae239db1f68a5a6ec6 refs/tags/v1.6.8^{}
// cec2382030533cf5797d63a4cdb2b255a9c3c7b6 refs/tags/v1.6.9
// 1c90a442489720eec95342e1789ee8a5e1b9536f refs/tags/v1.6.9^{}
refTags := strings.Fields(string(out))
require.True(t, len(refTags)%2 == 0)
tags := make([]string, 0, len(refTags)/2)
for i := 1; i < len(refTags); i += 2 {
rawTag := refTags[i]
require.True(t, strings.HasPrefix(rawTag, "refs/tags/"))
if strings.HasSuffix(rawTag, "^{}") {
continue
}
rawTag = strings.TrimPrefix(rawTag, "refs/tags/")
tags = append(tags, rawTag)
}
return tags
}