Merge pull request #10472 from fuweid/migrate-sandboxes-bucket

core/metadata: migrate sandboxes bucket into v1
This commit is contained in:
Akihiro Suda 2024-07-31 01:01:05 +00:00 committed by GitHub
commit babfebf0a4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 321 additions and 11 deletions

View File

@ -301,6 +301,7 @@ func getIngestBucket(tx *bolt.Tx, namespace, ref string) *bolt.Bucket {
func createSandboxBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) { func createSandboxBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) {
return createBucketIfNotExists( return createBucketIfNotExists(
tx, tx,
bucketKeyVersion,
[]byte(namespace), []byte(namespace),
bucketKeyObjectSandboxes, bucketKeyObjectSandboxes,
) )
@ -309,6 +310,7 @@ func createSandboxBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) {
func getSandboxBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { func getSandboxBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {
return getBucket( return getBucket(
tx, tx,
bucketKeyVersion,
[]byte(namespace), []byte(namespace),
bucketKeyObjectSandboxes, bucketKeyObjectSandboxes,
) )

View File

@ -50,7 +50,7 @@ const (
// dbVersion represents updates to the schema // dbVersion represents updates to the schema
// version which are additions and compatible with // version which are additions and compatible with
// prior version of the same schema. // prior version of the same schema.
dbVersion = 3 dbVersion = 4
) )
// DBOpt configures how we set up the DB // DBOpt configures how we set up the DB

View File

@ -140,6 +140,42 @@ func TestMigrations(t *testing.T) {
bref: "", bref: "",
}, },
} }
testSandboxes := []struct {
id string
keyValues [][3]string // {bucket, key, value}
}{
{
id: "sb1",
keyValues: [][3]string{
{
"", // is not sub bucket
"created", "2dayago",
},
{
"", // is not sub bucket
"updated", "1dayago",
},
{
"extension",
"labels", strings.Repeat("whoknows", 10),
},
},
},
{
id: "sb2",
keyValues: [][3]string{
{
"", // is not sub bucket
"sandboxer", "default",
},
{
"labels", "hello", "panic",
},
},
},
}
migrationTests := []struct { migrationTests := []struct {
name string name string
init func(*bolt.Tx) error init func(*bolt.Tx) error
@ -282,7 +318,6 @@ func TestMigrations(t *testing.T) {
return nil return nil
}, },
}, },
{ {
name: "NoOp", name: "NoOp",
init: func(tx *bolt.Tx) error { init: func(tx *bolt.Tx) error {
@ -292,6 +327,65 @@ func TestMigrations(t *testing.T) {
return nil return nil
}, },
}, },
{
name: "MigrateSandboxes",
init: func(tx *bolt.Tx) error {
allsbbkt, err := createBucketIfNotExists(tx, []byte("kubernetes"), bucketKeyObjectSandboxes)
if err != nil {
return err
}
for _, sbDef := range testSandboxes {
sbbkt, err := allsbbkt.CreateBucket([]byte(sbDef.id))
if err != nil {
return err
}
for _, keyValues := range sbDef.keyValues {
bkt := sbbkt
if keyValues[0] != "" {
bkt, err = sbbkt.CreateBucketIfNotExists([]byte(keyValues[0]))
if err != nil {
return err
}
}
if err = bkt.Put([]byte(keyValues[1]), []byte(keyValues[2])); err != nil {
return err
}
}
}
return nil
},
check: func(tx *bolt.Tx) error {
allsbbkt := getSandboxBucket(tx, "kubernetes")
for _, sbDef := range testSandboxes {
sbbkt := allsbbkt.Bucket([]byte(sbDef.id))
for _, keyValues := range sbDef.keyValues {
bkt := sbbkt
if keyValues[0] != "" {
bkt = sbbkt.Bucket([]byte(keyValues[0]))
}
key := []byte(keyValues[1])
expected := keyValues[2]
value := string(bkt.Get(key))
if value != expected {
return fmt.Errorf("expected %s, but got %s in sandbox %s", expected, value, sbDef.id)
}
}
}
allsbbkt = getBucket(tx, []byte("kubernetes"), bucketKeyObjectSandboxes)
if allsbbkt != nil {
return errors.New("old sandboxes bucket still exists")
}
return nil
},
},
} }
if len(migrationTests) != len(migrations) { if len(migrationTests) != len(migrations) {

View File

@ -16,7 +16,12 @@
package metadata package metadata
import bolt "go.etcd.io/bbolt" import (
"bytes"
"fmt"
bolt "go.etcd.io/bbolt"
)
type migration struct { type migration struct {
schema string schema string
@ -50,6 +55,11 @@ var migrations = []migration{
version: 3, version: 3,
migrate: noOpMigration, migrate: noOpMigration,
}, },
{
schema: "v1",
version: 4,
migrate: migrateSandboxes,
},
} }
// addChildLinks Adds children key to the snapshotters to enforce snapshot // addChildLinks Adds children key to the snapshotters to enforce snapshot
@ -160,6 +170,87 @@ func migrateIngests(tx *bolt.Tx) error {
return nil return nil
} }
// migrateSandboxes moves sandboxes from root bucket into v1 bucket.
func migrateSandboxes(tx *bolt.Tx) error {
v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion)
if err != nil {
return err
}
deletingBuckets := [][]byte{}
if merr := tx.ForEach(func(ns []byte, nsbkt *bolt.Bucket) error {
// Skip v1 bucket, even if users created sandboxes in v1 namespace.
if bytes.Equal(bucketKeyVersion, ns) {
return nil
}
deletingBuckets = append(deletingBuckets, ns)
allsbbkt := nsbkt.Bucket(bucketKeyObjectSandboxes)
if allsbbkt == nil {
return nil
}
tnsbkt, err := v1bkt.CreateBucketIfNotExists(ns)
if err != nil {
return fmt.Errorf("failed to create namespace %s in bucket %s: %w",
ns, bucketKeyVersion, err)
}
tallsbbkt, err := tnsbkt.CreateBucketIfNotExists(bucketKeyObjectSandboxes)
if err != nil {
return fmt.Errorf("failed to create bucket sandboxes in namespace %s: %w", ns, err)
}
return allsbbkt.ForEachBucket(func(sb []byte) error {
sbbkt := allsbbkt.Bucket(sb) // single sandbox bucket
tsbbkt, err := tallsbbkt.CreateBucketIfNotExists(sb)
if err != nil {
return fmt.Errorf("failed to create sandbox object %s in namespace %s: %w",
sb, ns, err)
}
// copy single
if cerr := sbbkt.ForEach(func(key, value []byte) error {
if value == nil {
return nil
}
return tsbbkt.Put(key, value)
}); cerr != nil {
return cerr
}
return sbbkt.ForEachBucket(func(subbkt []byte) error {
tsubbkt, err := tsbbkt.CreateBucketIfNotExists(subbkt)
if err != nil {
return fmt.Errorf("failed to create subbucket %s in sandbox %s (namespace %s): %w",
subbkt, sb, ns, err)
}
return sbbkt.Bucket(subbkt).ForEach(func(key, value []byte) error {
if value == nil {
return fmt.Errorf("unexpected bucket %s", key)
}
return tsubbkt.Put(key, value)
})
})
})
}); merr != nil {
return fmt.Errorf("failed to copy sandboxes into v1 bucket: %w", err)
}
for _, ns := range deletingBuckets {
derr := tx.DeleteBucket(ns)
if derr != nil {
return fmt.Errorf("failed to cleanup bucket %s in root: %w", ns, err)
}
}
return nil
}
// noOpMigration was for a database change from boltdb/bolt which is no // noOpMigration was for a database change from boltdb/bolt which is no
// longer being supported, to go.etcd.io/bbolt which is the currently // longer being supported, to go.etcd.io/bbolt which is the currently
// maintained repo for boltdb. // maintained repo for boltdb.

View File

@ -56,7 +56,7 @@ version = 3
require.NoError(t, err) require.NoError(t, err)
t.Logf("Starting containerd") t.Logf("Starting containerd")
currentProc := newCtrdProc(t, "containerd", workDir) currentProc := newCtrdProc(t, "containerd", workDir, nil)
require.NoError(t, currentProc.isReady()) require.NoError(t, currentProc.isReady())
t.Cleanup(func() { t.Cleanup(func() {
t.Log("Cleanup all the pods") t.Log("Cleanup all the pods")

View File

@ -0,0 +1,117 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package integration
import (
"fmt"
"path/filepath"
"syscall"
"testing"
"time"
"github.com/containerd/continuity/fs"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
)
func TestIssue10467(t *testing.T) {
latestVersion := "v1.7.20"
releaseBinDir := t.TempDir()
downloadReleaseBinary(t, releaseBinDir, latestVersion)
t.Logf("Install config for release %s", latestVersion)
workDir := t.TempDir()
previousReleaseCtrdConfig(t, releaseBinDir, workDir)
t.Log("Starting the previous release's containerd")
previousCtrdBinPath := filepath.Join(releaseBinDir, "bin", "containerd")
previousProc := newCtrdProc(t, previousCtrdBinPath, workDir, []string{"ENABLE_CRI_SANDBOXES=yes"})
boltdbPath := filepath.Join(workDir, "root", "io.containerd.metadata.v1.bolt", "meta.db")
ctrdLogPath := previousProc.logPath()
t.Cleanup(func() {
if t.Failed() {
dumpFileContent(t, ctrdLogPath)
}
})
require.NoError(t, previousProc.isReady())
needToCleanup := true
t.Cleanup(func() {
if t.Failed() && needToCleanup {
t.Logf("Try to cleanup leaky pods")
cleanupPods(t, previousProc.criRuntimeService(t))
}
})
t.Log("Prepare pods for current release")
upgradeCaseFunc, hookFunc := shouldManipulateContainersInPodAfterUpgrade(t, previousProc.criRuntimeService(t), previousProc.criImageService(t))
needToCleanup = false
require.Nil(t, hookFunc)
t.Log("Gracefully stop previous release's containerd process")
require.NoError(t, previousProc.kill(syscall.SIGTERM))
require.NoError(t, previousProc.wait(5*time.Minute))
t.Logf("%s should have bucket k8s.io in root", boltdbPath)
db, err := bbolt.Open(boltdbPath, 0600, &bbolt.Options{ReadOnly: true})
require.NoError(t, err)
require.NoError(t, db.View(func(tx *bbolt.Tx) error {
if tx.Bucket([]byte("k8s.io")) == nil {
return fmt.Errorf("expected k8s.io bucket")
}
return nil
}))
require.NoError(t, db.Close())
t.Log("Install default config for current release")
currentReleaseCtrdDefaultConfig(t, workDir)
t.Log("Starting the current release's containerd")
currentProc := newCtrdProc(t, "containerd", workDir, nil)
require.NoError(t, currentProc.isReady())
t.Cleanup(func() {
t.Log("Cleanup all the pods")
cleanupPods(t, currentProc.criRuntimeService(t))
t.Log("Stopping current release's containerd process")
require.NoError(t, currentProc.kill(syscall.SIGTERM))
require.NoError(t, currentProc.wait(5*time.Minute))
})
t.Logf("%s should not have bucket k8s.io in root after restart", boltdbPath)
copiedBoltdbPath := filepath.Join(t.TempDir(), "meta.db.new")
require.NoError(t, fs.CopyFile(copiedBoltdbPath, boltdbPath))
db, err = bbolt.Open(copiedBoltdbPath, 0600, &bbolt.Options{ReadOnly: true})
require.NoError(t, err)
require.NoError(t, db.View(func(tx *bbolt.Tx) error {
if tx.Bucket([]byte("k8s.io")) != nil {
return fmt.Errorf("unexpected k8s.io bucket")
}
return nil
}))
require.NoError(t, db.Close())
t.Log("Verifing")
upgradeCaseFunc(t, currentProc.criRuntimeService(t), currentProc.criImageService(t))
}

View File

@ -50,7 +50,7 @@ type beforeUpgradeHookFunc func(*testing.T)
// TODO: Support Windows // TODO: Support Windows
func TestUpgrade(t *testing.T) { func TestUpgrade(t *testing.T) {
previousReleaseBinDir := t.TempDir() previousReleaseBinDir := t.TempDir()
downloadPreviousReleaseBinary(t, previousReleaseBinDir) downloadPreviousLatestReleaseBinary(t, previousReleaseBinDir)
t.Run("recover", runUpgradeTestCase(previousReleaseBinDir, shouldRecoverAllThePodsAfterUpgrade)) t.Run("recover", runUpgradeTestCase(previousReleaseBinDir, shouldRecoverAllThePodsAfterUpgrade))
t.Run("exec", runUpgradeTestCase(previousReleaseBinDir, execToExistingContainer)) t.Run("exec", runUpgradeTestCase(previousReleaseBinDir, execToExistingContainer))
@ -73,7 +73,7 @@ func runUpgradeTestCase(
t.Log("Starting the previous release's containerd") t.Log("Starting the previous release's containerd")
previousCtrdBinPath := filepath.Join(previousReleaseBinDir, "bin", "containerd") previousCtrdBinPath := filepath.Join(previousReleaseBinDir, "bin", "containerd")
previousProc := newCtrdProc(t, previousCtrdBinPath, workDir) previousProc := newCtrdProc(t, previousCtrdBinPath, workDir, nil)
ctrdLogPath := previousProc.logPath() ctrdLogPath := previousProc.logPath()
t.Cleanup(func() { t.Cleanup(func() {
@ -107,7 +107,7 @@ func runUpgradeTestCase(
currentReleaseCtrdDefaultConfig(t, workDir) currentReleaseCtrdDefaultConfig(t, workDir)
t.Log("Starting the current release's containerd") t.Log("Starting the current release's containerd")
currentProc := newCtrdProc(t, "containerd", workDir) currentProc := newCtrdProc(t, "containerd", workDir, nil)
require.NoError(t, currentProc.isReady()) require.NoError(t, currentProc.isReady())
t.Cleanup(func() { t.Cleanup(func() {
t.Log("Cleanup all the pods") t.Log("Cleanup all the pods")
@ -658,7 +658,7 @@ func (p *ctrdProc) criImageService(t *testing.T) cri.ImageManagerService {
} }
// newCtrdProc is to start containerd process. // newCtrdProc is to start containerd process.
func newCtrdProc(t *testing.T, ctrdBin string, ctrdWorkDir string) *ctrdProc { func newCtrdProc(t *testing.T, ctrdBin string, ctrdWorkDir string, envs []string) *ctrdProc {
p := &ctrdProc{workDir: ctrdWorkDir} p := &ctrdProc{workDir: ctrdWorkDir}
var args []string var args []string
@ -673,6 +673,7 @@ func newCtrdProc(t *testing.T, ctrdBin string, ctrdWorkDir string) *ctrdProc {
t.Cleanup(func() { f.Close() }) t.Cleanup(func() { f.Close() })
cmd := exec.Command(ctrdBin, args...) cmd := exec.Command(ctrdBin, args...)
cmd.Env = append(os.Environ(), envs...)
cmd.Stdout = f cmd.Stdout = f
cmd.Stderr = f cmd.Stderr = f
cmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGKILL} cmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGKILL}

View File

@ -33,11 +33,16 @@ import (
"github.com/containerd/containerd/v2/version" "github.com/containerd/containerd/v2/version"
) )
// downloadPreviousReleaseBinary downloads the latest version of previous release // downloadPreviousLatestReleaseBinary downloads the latest version of previous
// into the target dir. // release into the target dir.
func downloadPreviousReleaseBinary(t *testing.T, targetDir string) { func downloadPreviousLatestReleaseBinary(t *testing.T, targetDir string) {
ver := previousReleaseVersion(t) ver := previousReleaseVersion(t)
downloadReleaseBinary(t, targetDir, ver)
}
// downloadReleaseBinary downloads containerd binary with a given release.
func downloadReleaseBinary(t *testing.T, targetDir string, ver string) {
targetURL := fmt.Sprintf("https://github.com/containerd/containerd/releases/download/%s/containerd-%s-linux-%s.tar.gz", targetURL := fmt.Sprintf("https://github.com/containerd/containerd/releases/download/%s/containerd-%s-linux-%s.tar.gz",
ver, strings.TrimPrefix(ver, "v"), runtime.GOARCH, ver, strings.TrimPrefix(ver, "v"), runtime.GOARCH,
) )