Merge pull request #80037 from dims/remove-support-for-etcd2-from-images-etcd
Remove support for etcd2 from cluster/images/etcd image
This commit is contained in:
@@ -15,7 +15,7 @@
|
||||
# Build the etcd image
|
||||
#
|
||||
# Usage:
|
||||
# [BUNDLED_ETCD_VERSIONS=2.2.1 2.3.7 3.0.17 3.1.12 3.2.24 3.3.10] [REGISTRY=k8s.gcr.io] [ARCH=amd64] [BASEIMAGE=busybox] make (build|push)
|
||||
# [BUNDLED_ETCD_VERSIONS=3.0.17 3.1.12 3.2.24 3.3.10] [REGISTRY=k8s.gcr.io] [ARCH=amd64] [BASEIMAGE=busybox] make (build|push)
|
||||
#
|
||||
# The image contains different etcd versions to simplify
|
||||
# upgrades. Thus be careful when removing any versions from here.
|
||||
@@ -26,7 +26,7 @@
|
||||
# Except from etcd-$(version) and etcdctl-$(version) binaries, we also
|
||||
# need etcd and etcdctl binaries for backward compatibility reasons.
|
||||
# That binary will be set to the last version from $(BUNDLED_ETCD_VERSIONS).
|
||||
BUNDLED_ETCD_VERSIONS?=2.2.1 2.3.7 3.0.17 3.1.12 3.2.24 3.3.10
|
||||
BUNDLED_ETCD_VERSIONS?=3.0.17 3.1.12 3.2.24 3.3.10
|
||||
|
||||
# LATEST_ETCD_VERSION identifies the most recent etcd version available.
|
||||
LATEST_ETCD_VERSION?=3.3.10
|
||||
@@ -34,7 +34,7 @@ LATEST_ETCD_VERSION?=3.3.10
|
||||
# REVISION provides a version number fo this image and all it's bundled
|
||||
# artifacts. It should start at zero for each LATEST_ETCD_VERSION and increment
|
||||
# for each revision of this image at that etcd version.
|
||||
REVISION?=1
|
||||
REVISION?=2
|
||||
|
||||
# IMAGE_TAG Uniquely identifies k8s.gcr.io/etcd docker image with a tag of the form "<etcd-version>-<revision>".
|
||||
IMAGE_TAG=$(LATEST_ETCD_VERSION)-$(REVISION)
|
||||
|
@@ -34,7 +34,7 @@ version.
|
||||
Upgrades to any target version are supported. The data will be automatically upgraded
|
||||
in steps to each minor version until the target version is reached.
|
||||
|
||||
Downgrades to the previous minor version of the 3.x series and from 3.0 to 2.3.7 are supported.
|
||||
Downgrades to the previous minor version of the 3.x series is supported.
|
||||
|
||||
#### Permissions
|
||||
|
||||
|
@@ -17,17 +17,15 @@
|
||||
# NOTES
|
||||
# This script performs etcd upgrade based on the following environmental
|
||||
# variables:
|
||||
# TARGET_STORAGE - API of etcd to be used (supported: 'etcd2', 'etcd3')
|
||||
# TARGET_VERSION - etcd release to be used (supported: '2.2.1', '2.3.7', '3.0.17', '3.1.12', '3.2.24', "3.3.10")
|
||||
# TARGET_STORAGE - API of etcd to be used (supported: 'etcd3')
|
||||
# TARGET_VERSION - etcd release to be used (supported: '3.0.17', '3.1.12', '3.2.24', "3.3.10")
|
||||
# DATA_DIRECTORY - directory with etcd data
|
||||
#
|
||||
# The current etcd version and storage format is detected based on the
|
||||
# contents of "${DATA_DIRECTORY}/version.txt" file (if the file doesn't
|
||||
# exist, we default it to "2.2.1/etcd2".
|
||||
# exist, we default it to "3.0.17/etcd2".
|
||||
#
|
||||
# The update workflow support the following upgrade steps:
|
||||
# - 2.2.1/etcd2 -> 2.3.7/etcd2
|
||||
# - 2.3.7/etcd2 -> 3.0.17/etcd2
|
||||
# - 3.0.17/etcd3 -> 3.1.12/etcd3
|
||||
# - 3.1.12/etcd3 -> 3.2.24/etcd3
|
||||
# - 3.2.24/etcd3 -> 3.3.10/etcd3
|
||||
@@ -43,7 +41,7 @@ set -o nounset
|
||||
|
||||
# NOTE: BUNDLED_VERSION has to match release binaries present in the
|
||||
# etcd image (to make this script work correctly).
|
||||
BUNDLED_VERSIONS="2.2.1, 2.3.7, 3.0.17, 3.1.12, 3.2.24, 3.3.10"
|
||||
BUNDLED_VERSIONS="3.0.17, 3.1.12, 3.2.24, 3.3.10"
|
||||
|
||||
ETCD_NAME="${ETCD_NAME:-etcd-$(hostname)}"
|
||||
if [ -z "${DATA_DIRECTORY:-}" ]; then
|
||||
|
@@ -20,28 +20,13 @@ go_library(
|
||||
"migrate_client.go",
|
||||
"migrate_server.go",
|
||||
"migrator.go",
|
||||
"rollback_v2.go",
|
||||
"versions.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/cluster/images/etcd/migrate",
|
||||
deps = [
|
||||
"//third_party/forked/etcd221/wal:go_default_library",
|
||||
"//vendor/github.com/blang/semver:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/client:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/etcdserver:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/etcdserver/etcdserverpb:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/etcdserver/membership:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/mvcc/backend:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/mvcc/mvccpb:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/pkg/pbutil:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/pkg/types:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/raft/raftpb:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/snap:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/store:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/wal:go_default_library",
|
||||
"//vendor/github.com/coreos/etcd/wal/walpb:go_default_library",
|
||||
"//vendor/github.com/coreos/go-semver/semver:go_default_library",
|
||||
"//vendor/github.com/spf13/cobra:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
|
@@ -40,8 +40,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
testSupportedVersions = MustParseSupportedVersions("2.2.1, 2.3.7, 3.0.17, 3.1.12")
|
||||
testVersionOldest = &EtcdVersion{semver.MustParse("2.2.1")}
|
||||
testSupportedVersions = MustParseSupportedVersions("3.0.17, 3.1.12")
|
||||
testVersionPrevious = &EtcdVersion{semver.MustParse("3.0.17")}
|
||||
testVersionLatest = &EtcdVersion{semver.MustParse("3.1.12")}
|
||||
)
|
||||
@@ -55,15 +54,13 @@ func TestMigrate(t *testing.T) {
|
||||
protocol string
|
||||
}{
|
||||
// upgrades
|
||||
{"v2-v3-up", 1, "2.2.1/etcd2", "3.0.17/etcd3", "https"},
|
||||
{"v3-v3-up", 1, "3.0.17/etcd3", "3.1.12/etcd3", "https"},
|
||||
{"oldest-newest-up", 1, "2.2.1/etcd2", "3.1.12/etcd3", "https"},
|
||||
{"oldest-newest-up", 1, "3.0.17/etcd3", "3.1.12/etcd3", "https"},
|
||||
|
||||
// warning: v2->v3 ha upgrades not currently supported.
|
||||
{"ha-v3-v3-up", 3, "3.0.17/etcd3", "3.1.12/etcd3", "https"},
|
||||
|
||||
// downgrades
|
||||
{"v3-v2-down", 1, "3.0.17/etcd3", "2.2.1/etcd2", "https"},
|
||||
{"v3-v3-down", 1, "3.1.12/etcd3", "3.0.17/etcd3", "https"},
|
||||
|
||||
// warning: ha downgrades not yet supported.
|
||||
|
@@ -95,11 +95,9 @@ func (m *Migrator) MigrateIfNeeded(target *EtcdVersionPair) error {
|
||||
}
|
||||
return nil
|
||||
case current.storageVersion == storageEtcd2 && target.storageVersion == storageEtcd3:
|
||||
klog.Info("upgrading from etcd2 storage to etcd3 storage")
|
||||
current, err = m.etcd2ToEtcd3Upgrade(current, target)
|
||||
return fmt.Errorf("upgrading from etcd2 storage to etcd3 storage is not supported")
|
||||
case current.version.Major == 3 && target.version.Major == 2:
|
||||
klog.Info("downgrading from etcd 3.x to 2.x")
|
||||
current, err = m.rollbackToEtcd2(current, target)
|
||||
return fmt.Errorf("downgrading from etcd 3.x to 2.x is not supported")
|
||||
case current.version.Major == target.version.Major && current.version.Minor < target.version.Minor:
|
||||
stepVersion := m.cfg.supportedVersions.NextVersionPair(current)
|
||||
klog.Infof("upgrading etcd from %s to %s", current, stepVersion)
|
||||
@@ -114,18 +112,6 @@ func (m *Migrator) MigrateIfNeeded(target *EtcdVersionPair) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Migrator) backupEtcd2(current *EtcdVersion) error {
|
||||
backupDir := fmt.Sprintf("%s/%s", m.dataDirectory, "migration-backup")
|
||||
klog.Info("Backup etcd before starting migration")
|
||||
err := os.Mkdir(backupDir, 0666)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create backup directory before starting migration: %v", err)
|
||||
}
|
||||
m.client.Backup(current, backupDir)
|
||||
klog.Infof("Backup done in %s", backupDir)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Migrator) rollbackEtcd3MinorVersion(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
|
||||
if target.version.Minor != current.version.Minor-1 {
|
||||
return nil, fmt.Errorf("rollback from %s to %s not supported, only rollbacks to the previous minor version are supported", current.version, target.version)
|
||||
@@ -191,56 +177,6 @@ func (m *Migrator) rollbackEtcd3MinorVersion(current *EtcdVersionPair, target *E
|
||||
return target, nil
|
||||
}
|
||||
|
||||
func (m *Migrator) rollbackToEtcd2(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
|
||||
if !(current.version.Major == 3 && current.version.Minor == 0 && target.version.Major == 2 && target.version.Minor == 2) {
|
||||
return nil, fmt.Errorf("etcd3 -> etcd2 downgrade is supported only between 3.0.x and 2.2.x, got current %s target %s", current, target)
|
||||
}
|
||||
klog.Info("Backup and remove all existing v2 data")
|
||||
err := m.dataDirectory.Backup()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = RollbackV3ToV2(m.dataDirectory.path, time.Hour)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("rollback to etcd 2.x failed: %v", err)
|
||||
}
|
||||
return target, nil
|
||||
|
||||
}
|
||||
|
||||
func (m *Migrator) etcd2ToEtcd3Upgrade(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
|
||||
if current.storageVersion != storageEtcd2 || target.version.Major != 3 || target.storageVersion != storageEtcd3 {
|
||||
return nil, fmt.Errorf("etcd2 to etcd3 upgrade is supported only for x.x.x/etcd2 to 3.0.x/etcd3, got current %s target %s", current, target)
|
||||
}
|
||||
runner := m.newServer()
|
||||
|
||||
klog.Info("Performing etcd2 -> etcd3 migration")
|
||||
err := m.client.Migrate(target.version)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
klog.Info("Attaching leases to TTL entries")
|
||||
|
||||
// Now attach lease to all keys.
|
||||
// To do it, we temporarily start etcd on a random port (so that
|
||||
// apiserver actually cannot access it).
|
||||
err = runner.Start(target.version)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
err = runner.Stop()
|
||||
}()
|
||||
|
||||
// Create a lease and attach all keys to it.
|
||||
err = m.client.AttachLease(1 * time.Hour)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return target, err
|
||||
}
|
||||
|
||||
func (m *Migrator) minorVersionUpgrade(current *EtcdVersionPair, target *EtcdVersionPair) (*EtcdVersionPair, error) {
|
||||
runner := m.newServer()
|
||||
|
||||
|
@@ -1,347 +0,0 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
// Uncomment when you want to rollback to 2.2.1 version.
|
||||
oldwal "k8s.io/kubernetes/third_party/forked/etcd221/wal"
|
||||
// Uncomment when you want to rollback to 2.3.7 version.
|
||||
// oldwal "k8s.io/kubernetes/third_party/forked/etcd237/wal"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
"github.com/coreos/etcd/etcdserver/membership"
|
||||
"github.com/coreos/etcd/mvcc/backend"
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
"github.com/coreos/etcd/pkg/pbutil"
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/snap"
|
||||
"github.com/coreos/etcd/store"
|
||||
"github.com/coreos/etcd/wal"
|
||||
"github.com/coreos/etcd/wal/walpb"
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
const rollbackVersion = "2.2.0"
|
||||
|
||||
// RollbackV3ToV2 rolls back an etcd 3.0.x data directory to the 2.x.x version specified by rollbackVersion.
|
||||
func RollbackV3ToV2(migrateDatadir string, ttl time.Duration) error {
|
||||
dbpath := path.Join(migrateDatadir, "member", "snap", "db")
|
||||
klog.Infof("Rolling db file %s back to etcd 2.x", dbpath)
|
||||
|
||||
// etcd3 store backend. We will use it to parse v3 data files and extract information.
|
||||
be := backend.NewDefaultBackend(dbpath)
|
||||
tx := be.BatchTx()
|
||||
|
||||
// etcd2 store backend. We will use v3 data to update this and then save snapshot to disk.
|
||||
st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
|
||||
expireTime := time.Now().Add(ttl)
|
||||
|
||||
tx.Lock()
|
||||
err := tx.UnsafeForEach([]byte("key"), func(k, v []byte) error {
|
||||
kv := &mvccpb.KeyValue{}
|
||||
kv.Unmarshal(v)
|
||||
|
||||
// This is compact key.
|
||||
if !strings.HasPrefix(string(kv.Key), "/") {
|
||||
return nil
|
||||
}
|
||||
|
||||
ttlOpt := store.TTLOptionSet{}
|
||||
if kv.Lease != 0 {
|
||||
ttlOpt = store.TTLOptionSet{ExpireTime: expireTime}
|
||||
}
|
||||
|
||||
if !isTombstone(k) {
|
||||
sk := path.Join(strings.Trim(etcdserver.StoreKeysPrefix, "/"), string(kv.Key))
|
||||
_, err := st.Set(sk, false, string(kv.Value), ttlOpt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
st.Delete(string(kv.Key), false, false)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tx.Unlock()
|
||||
|
||||
if err := traverseAndDeleteEmptyDir(st, "/"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// rebuild cluster state.
|
||||
metadata, hardstate, oldSt, err := rebuild(migrateDatadir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// In the following, it's low level logic that saves metadata and data into v2 snapshot.
|
||||
backupPath := migrateDatadir + ".rollback.backup"
|
||||
if err := os.Rename(migrateDatadir, backupPath); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := os.MkdirAll(path.Join(migrateDatadir, "member", "snap"), 0777); err != nil {
|
||||
return err
|
||||
}
|
||||
walDir := path.Join(migrateDatadir, "member", "wal")
|
||||
|
||||
w, err := oldwal.Create(walDir, metadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = w.SaveSnapshot(walpb.Snapshot{Index: hardstate.Commit, Term: hardstate.Term})
|
||||
w.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
event, err := oldSt.Get(etcdserver.StoreClusterPrefix, true, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// nodes (members info) for ConfState
|
||||
nodes := []uint64{}
|
||||
traverseMetadata(event.Node, func(n *store.NodeExtern) {
|
||||
if n.Key != etcdserver.StoreClusterPrefix {
|
||||
// update store metadata
|
||||
v := ""
|
||||
if !n.Dir {
|
||||
v = *n.Value
|
||||
}
|
||||
if n.Key == path.Join(etcdserver.StoreClusterPrefix, "version") {
|
||||
v = rollbackVersion
|
||||
}
|
||||
if _, err := st.Set(n.Key, n.Dir, v, store.TTLOptionSet{}); err != nil {
|
||||
klog.Error(err)
|
||||
}
|
||||
|
||||
// update nodes
|
||||
fields := strings.Split(n.Key, "/")
|
||||
if len(fields) == 4 && fields[2] == "members" {
|
||||
nodeID, err := strconv.ParseUint(fields[3], 16, 64)
|
||||
if err != nil {
|
||||
klog.Fatalf("failed to parse member ID (%s): %v", fields[3], err)
|
||||
}
|
||||
nodes = append(nodes, nodeID)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
data, err := st.Save()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
raftSnap := raftpb.Snapshot{
|
||||
Data: data,
|
||||
Metadata: raftpb.SnapshotMetadata{
|
||||
Index: hardstate.Commit,
|
||||
Term: hardstate.Term,
|
||||
ConfState: raftpb.ConfState{
|
||||
Nodes: nodes,
|
||||
},
|
||||
},
|
||||
}
|
||||
snapshotter := snap.New(path.Join(migrateDatadir, "member", "snap"))
|
||||
if err := snapshotter.SaveSnap(raftSnap); err != nil {
|
||||
return err
|
||||
}
|
||||
klog.Info("Finished successfully")
|
||||
return nil
|
||||
}
|
||||
|
||||
func traverseMetadata(head *store.NodeExtern, handleFunc func(*store.NodeExtern)) {
|
||||
q := []*store.NodeExtern{head}
|
||||
|
||||
for len(q) > 0 {
|
||||
n := q[0]
|
||||
q = q[1:]
|
||||
|
||||
handleFunc(n)
|
||||
|
||||
q = append(q, n.Nodes...)
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
revBytesLen = 8 + 1 + 8
|
||||
markedRevBytesLen = revBytesLen + 1
|
||||
markBytePosition = markedRevBytesLen - 1
|
||||
|
||||
markTombstone byte = 't'
|
||||
)
|
||||
|
||||
func isTombstone(b []byte) bool {
|
||||
return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
|
||||
}
|
||||
|
||||
func traverseAndDeleteEmptyDir(st store.Store, dir string) error {
|
||||
e, err := st.Get(dir, true, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(e.Node.Nodes) == 0 {
|
||||
st.Delete(dir, true, true)
|
||||
return nil
|
||||
}
|
||||
for _, node := range e.Node.Nodes {
|
||||
if !node.Dir {
|
||||
klog.V(2).Infof("key: %s", node.Key[len(etcdserver.StoreKeysPrefix):])
|
||||
} else {
|
||||
err := traverseAndDeleteEmptyDir(st, node.Key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func rebuild(datadir string) ([]byte, *raftpb.HardState, store.Store, error) {
|
||||
waldir := path.Join(datadir, "member", "wal")
|
||||
snapdir := path.Join(datadir, "member", "snap")
|
||||
|
||||
ss := snap.New(snapdir)
|
||||
snapshot, err := ss.Load()
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
var walsnap walpb.Snapshot
|
||||
if snapshot != nil {
|
||||
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||
}
|
||||
|
||||
w, err := wal.OpenForRead(waldir, walsnap)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
meta, hardstate, ents, err := w.ReadAll()
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
|
||||
if snapshot != nil {
|
||||
err := st.Recovery(snapshot.Data)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
cluster := membership.NewCluster("")
|
||||
cluster.SetStore(st)
|
||||
cluster.Recover(func(*semver.Version) {})
|
||||
|
||||
applier := etcdserver.NewApplierV2(st, cluster)
|
||||
for _, ent := range ents {
|
||||
if ent.Type == raftpb.EntryConfChange {
|
||||
var cc raftpb.ConfChange
|
||||
pbutil.MustUnmarshal(&cc, ent.Data)
|
||||
switch cc.Type {
|
||||
case raftpb.ConfChangeAddNode:
|
||||
m := new(membership.Member)
|
||||
if err := json.Unmarshal(cc.Context, m); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
cluster.AddMember(m)
|
||||
case raftpb.ConfChangeRemoveNode:
|
||||
id := types.ID(cc.NodeID)
|
||||
cluster.RemoveMember(id)
|
||||
case raftpb.ConfChangeUpdateNode:
|
||||
m := new(membership.Member)
|
||||
if err := json.Unmarshal(cc.Context, m); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
var raftReq pb.InternalRaftRequest
|
||||
if !pbutil.MaybeUnmarshal(&raftReq, ent.Data) { // backward compatible
|
||||
var r pb.Request
|
||||
pbutil.MustUnmarshal(&r, ent.Data)
|
||||
applyRequest(&r, applier)
|
||||
} else {
|
||||
if raftReq.V2 != nil {
|
||||
req := raftReq.V2
|
||||
applyRequest(req, applier)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return meta, &hardstate, st, nil
|
||||
}
|
||||
|
||||
func toTTLOptions(r *pb.Request) store.TTLOptionSet {
|
||||
refresh, _ := pbutil.GetBool(r.Refresh)
|
||||
ttlOptions := store.TTLOptionSet{Refresh: refresh}
|
||||
if r.Expiration != 0 {
|
||||
ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
|
||||
}
|
||||
return ttlOptions
|
||||
}
|
||||
|
||||
func applyRequest(r *pb.Request, applyV2 etcdserver.ApplierV2) {
|
||||
// TODO: find a sane way to perform this cast or avoid it in the first place
|
||||
reqV2 := &etcdserver.RequestV2{
|
||||
ID: r.ID,
|
||||
Method: r.Method,
|
||||
Path: r.Path,
|
||||
Val: r.Val,
|
||||
Dir: r.Dir,
|
||||
PrevValue: r.PrevValue,
|
||||
PrevIndex: r.PrevIndex,
|
||||
PrevExist: r.PrevExist,
|
||||
Expiration: r.Expiration,
|
||||
Wait: r.Wait,
|
||||
Since: r.Since,
|
||||
Recursive: r.Recursive,
|
||||
Sorted: r.Sorted,
|
||||
Quorum: r.Quorum,
|
||||
Time: r.Time,
|
||||
Stream: r.Stream,
|
||||
Refresh: r.Refresh,
|
||||
XXX_unrecognized: r.XXX_unrecognized,
|
||||
}
|
||||
toTTLOptions(r)
|
||||
switch r.Method {
|
||||
case "PUT":
|
||||
applyV2.Put(reqV2)
|
||||
case "DELETE":
|
||||
applyV2.Delete(reqV2)
|
||||
case "POST", "QGET", "SYNC":
|
||||
return
|
||||
default:
|
||||
klog.Fatal("unknown command")
|
||||
}
|
||||
}
|
@@ -29,12 +29,8 @@ func TestSerializeEtcdVersionPair(t *testing.T) {
|
||||
match bool
|
||||
}{
|
||||
{"3.1.2/etcd3", &EtcdVersionPair{&EtcdVersion{semver.MustParse("3.1.2")}, storageEtcd3}, true},
|
||||
{"2.2.1/etcd2", &EtcdVersionPair{&EtcdVersion{semver.MustParse("2.2.1")}, storageEtcd2}, true},
|
||||
{"1.1.1-rc.0/etcd3", &EtcdVersionPair{&EtcdVersion{semver.MustParse("1.1.1-rc.0")}, storageEtcd3}, true},
|
||||
{"10.100.1000/etcd3", &EtcdVersionPair{&EtcdVersion{semver.MustParse("10.100.1000")}, storageEtcd3}, true},
|
||||
|
||||
{"2.2.2/etcd2", &EtcdVersionPair{&EtcdVersion{semver.MustParse("2.2.1")}, storageEtcd2}, false},
|
||||
{"2.2.1/etcd3", &EtcdVersionPair{&EtcdVersion{semver.MustParse("2.2.1")}, storageEtcd2}, false},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
|
@@ -24,7 +24,7 @@
|
||||
# $ etcdctl --endpoints=<address> snapshot save
|
||||
# produced .db file
|
||||
# - version.txt file is in the current directory (if it isn't it will be
|
||||
# defaulted to "2.2.1/etcd2"). Based on this file, the script will
|
||||
# defaulted to "3.0.17/etcd3"). Based on this file, the script will
|
||||
# decide to which version we are restoring (procedures are different
|
||||
# for etcd2 and etcd3).
|
||||
# - in case of etcd2 - *.snap and *.wal files are in current directory
|
||||
@@ -40,7 +40,7 @@ set -o pipefail
|
||||
# Version file contains information about current version in the format:
|
||||
# <etcd binary version>/<etcd api mode> (e.g. "3.0.12/etcd3").
|
||||
#
|
||||
# If the file doesn't exist we assume "2.2.1/etcd2" configuration is
|
||||
# If the file doesn't exist we assume "3.0.17/etcd3" configuration is
|
||||
# the current one and create a file with such configuration.
|
||||
# The restore procedure is chosen based on this information.
|
||||
VERSION_FILE="version.txt"
|
||||
@@ -51,7 +51,7 @@ if [ -n "${VERSION_CONTENTS:-}" ]; then
|
||||
echo "${VERSION_CONTENTS}" > "${VERSION_FILE}"
|
||||
fi
|
||||
if [ ! -f "${VERSION_FILE}" ]; then
|
||||
echo "2.2.1/etcd2" > "${VERSION_FILE}"
|
||||
echo "3.0.17/etcd3" > "${VERSION_FILE}"
|
||||
fi
|
||||
VERSION_CONTENTS="$(cat ${VERSION_FILE})"
|
||||
ETCD_VERSION="$(echo "$VERSION_CONTENTS" | cut -d '/' -f 1)"
|
||||
|
Reference in New Issue
Block a user