core/metadata: migrate sandboxes bucket into v1

Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
Wei Fu
2024-07-16 23:10:46 +08:00
parent 1fb1882c7d
commit 4cfeb7b19e
8 changed files with 321 additions and 11 deletions

View File

@@ -301,6 +301,7 @@ func getIngestBucket(tx *bolt.Tx, namespace, ref string) *bolt.Bucket {
func createSandboxBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) {
return createBucketIfNotExists(
tx,
bucketKeyVersion,
[]byte(namespace),
bucketKeyObjectSandboxes,
)
@@ -309,6 +310,7 @@ func createSandboxBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) {
func getSandboxBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {
return getBucket(
tx,
bucketKeyVersion,
[]byte(namespace),
bucketKeyObjectSandboxes,
)

View File

@@ -50,7 +50,7 @@ const (
// dbVersion represents updates to the schema
// version which are additions and compatible with
// prior version of the same schema.
dbVersion = 3
dbVersion = 4
)
// DBOpt configures how we set up the DB

View File

@@ -140,6 +140,42 @@ func TestMigrations(t *testing.T) {
bref: "",
},
}
testSandboxes := []struct {
id string
keyValues [][3]string // {bucket, key, value}
}{
{
id: "sb1",
keyValues: [][3]string{
{
"", // is not sub bucket
"created", "2dayago",
},
{
"", // is not sub bucket
"updated", "1dayago",
},
{
"extension",
"labels", strings.Repeat("whoknows", 10),
},
},
},
{
id: "sb2",
keyValues: [][3]string{
{
"", // is not sub bucket
"sandboxer", "default",
},
{
"labels", "hello", "panic",
},
},
},
}
migrationTests := []struct {
name string
init func(*bolt.Tx) error
@@ -282,7 +318,6 @@ func TestMigrations(t *testing.T) {
return nil
},
},
{
name: "NoOp",
init: func(tx *bolt.Tx) error {
@@ -292,6 +327,65 @@ func TestMigrations(t *testing.T) {
return nil
},
},
{
name: "MigrateSandboxes",
init: func(tx *bolt.Tx) error {
allsbbkt, err := createBucketIfNotExists(tx, []byte("kubernetes"), bucketKeyObjectSandboxes)
if err != nil {
return err
}
for _, sbDef := range testSandboxes {
sbbkt, err := allsbbkt.CreateBucket([]byte(sbDef.id))
if err != nil {
return err
}
for _, keyValues := range sbDef.keyValues {
bkt := sbbkt
if keyValues[0] != "" {
bkt, err = sbbkt.CreateBucketIfNotExists([]byte(keyValues[0]))
if err != nil {
return err
}
}
if err = bkt.Put([]byte(keyValues[1]), []byte(keyValues[2])); err != nil {
return err
}
}
}
return nil
},
check: func(tx *bolt.Tx) error {
allsbbkt := getSandboxBucket(tx, "kubernetes")
for _, sbDef := range testSandboxes {
sbbkt := allsbbkt.Bucket([]byte(sbDef.id))
for _, keyValues := range sbDef.keyValues {
bkt := sbbkt
if keyValues[0] != "" {
bkt = sbbkt.Bucket([]byte(keyValues[0]))
}
key := []byte(keyValues[1])
expected := keyValues[2]
value := string(bkt.Get(key))
if value != expected {
return fmt.Errorf("expected %s, but got %s in sandbox %s", expected, value, sbDef.id)
}
}
}
allsbbkt = getBucket(tx, []byte("kubernetes"), bucketKeyObjectSandboxes)
if allsbbkt != nil {
return errors.New("old sandboxes bucket still exists")
}
return nil
},
},
}
if len(migrationTests) != len(migrations) {

View File

@@ -16,7 +16,12 @@
package metadata
import bolt "go.etcd.io/bbolt"
import (
"bytes"
"fmt"
bolt "go.etcd.io/bbolt"
)
type migration struct {
schema string
@@ -50,6 +55,11 @@ var migrations = []migration{
version: 3,
migrate: noOpMigration,
},
{
schema: "v1",
version: 4,
migrate: migrateSandboxes,
},
}
// addChildLinks Adds children key to the snapshotters to enforce snapshot
@@ -160,6 +170,87 @@ func migrateIngests(tx *bolt.Tx) error {
return nil
}
// migrateSandboxes moves sandboxes from root bucket into v1 bucket.
func migrateSandboxes(tx *bolt.Tx) error {
v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion)
if err != nil {
return err
}
deletingBuckets := [][]byte{}
if merr := tx.ForEach(func(ns []byte, nsbkt *bolt.Bucket) error {
// Skip v1 bucket, even if users created sandboxes in v1 namespace.
if bytes.Equal(bucketKeyVersion, ns) {
return nil
}
deletingBuckets = append(deletingBuckets, ns)
allsbbkt := nsbkt.Bucket(bucketKeyObjectSandboxes)
if allsbbkt == nil {
return nil
}
tnsbkt, err := v1bkt.CreateBucketIfNotExists(ns)
if err != nil {
return fmt.Errorf("failed to create namespace %s in bucket %s: %w",
ns, bucketKeyVersion, err)
}
tallsbbkt, err := tnsbkt.CreateBucketIfNotExists(bucketKeyObjectSandboxes)
if err != nil {
return fmt.Errorf("failed to create bucket sandboxes in namespace %s: %w", ns, err)
}
return allsbbkt.ForEachBucket(func(sb []byte) error {
sbbkt := allsbbkt.Bucket(sb) // single sandbox bucket
tsbbkt, err := tallsbbkt.CreateBucketIfNotExists(sb)
if err != nil {
return fmt.Errorf("failed to create sandbox object %s in namespace %s: %w",
sb, ns, err)
}
// copy single
if cerr := sbbkt.ForEach(func(key, value []byte) error {
if value == nil {
return nil
}
return tsbbkt.Put(key, value)
}); cerr != nil {
return cerr
}
return sbbkt.ForEachBucket(func(subbkt []byte) error {
tsubbkt, err := tsbbkt.CreateBucketIfNotExists(subbkt)
if err != nil {
return fmt.Errorf("failed to create subbucket %s in sandbox %s (namespace %s): %w",
subbkt, sb, ns, err)
}
return sbbkt.Bucket(subbkt).ForEach(func(key, value []byte) error {
if value == nil {
return fmt.Errorf("unexpected bucket %s", key)
}
return tsubbkt.Put(key, value)
})
})
})
}); merr != nil {
return fmt.Errorf("failed to copy sandboxes into v1 bucket: %w", err)
}
for _, ns := range deletingBuckets {
derr := tx.DeleteBucket(ns)
if derr != nil {
return fmt.Errorf("failed to cleanup bucket %s in root: %w", ns, err)
}
}
return nil
}
// noOpMigration was for a database change from boltdb/bolt which is no
// longer being supported, to go.etcd.io/bbolt which is the currently
// maintained repo for boltdb.