Merge pull request #69527 from liggitt/remove-quorum
Remove deprecated --etcd-quorum-read flag
This commit is contained in:
@@ -98,7 +98,6 @@ func TestAddFlags(t *testing.T) {
|
||||
"--enable-logs-handler=false",
|
||||
"--enable-swagger-ui=true",
|
||||
"--endpoint-reconciler-type=" + string(reconcilers.LeaseEndpointReconcilerType),
|
||||
"--etcd-quorum-read=false",
|
||||
"--etcd-keyfile=/var/run/kubernetes/etcd.key",
|
||||
"--etcd-certfile=/var/run/kubernetes/etcdce.crt",
|
||||
"--etcd-cafile=/var/run/kubernetes/etcdca.crt",
|
||||
@@ -146,7 +145,6 @@ func TestAddFlags(t *testing.T) {
|
||||
ServerList: nil,
|
||||
Prefix: "/registry",
|
||||
DeserializationCacheSize: 0,
|
||||
Quorum: false,
|
||||
KeyFile: "/var/run/kubernetes/etcd.key",
|
||||
CAFile: "/var/run/kubernetes/etcdca.crt",
|
||||
CertFile: "/var/run/kubernetes/etcdce.crt",
|
||||
|
@@ -162,10 +162,6 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
|
||||
fs.StringVar(&s.StorageConfig.CAFile, "etcd-cafile", s.StorageConfig.CAFile,
|
||||
"SSL Certificate Authority file used to secure etcd communication.")
|
||||
|
||||
fs.BoolVar(&s.StorageConfig.Quorum, "etcd-quorum-read", s.StorageConfig.Quorum,
|
||||
"If true, enable quorum read. It defaults to true and is strongly recommended not setting to false.")
|
||||
fs.MarkDeprecated("etcd-quorum-read", "This flag is deprecated and the ability to switch off quorum read will be removed in a future release.")
|
||||
|
||||
fs.StringVar(&s.EncryptionProviderConfigFilepath, "experimental-encryption-provider-config", s.EncryptionProviderConfigFilepath,
|
||||
"The file containing configuration for encryption providers to be used for storing secrets in etcd")
|
||||
|
||||
|
@@ -40,7 +40,6 @@ func TestEtcdOptionsValidate(t *testing.T) {
|
||||
ServerList: nil,
|
||||
Prefix: "/registry",
|
||||
DeserializationCacheSize: 0,
|
||||
Quorum: false,
|
||||
KeyFile: "/var/run/kubernetes/etcd.key",
|
||||
CAFile: "/var/run/kubernetes/etcdca.crt",
|
||||
CertFile: "/var/run/kubernetes/etcdce.crt",
|
||||
@@ -64,7 +63,6 @@ func TestEtcdOptionsValidate(t *testing.T) {
|
||||
ServerList: []string{"http://127.0.0.1"},
|
||||
Prefix: "/registry",
|
||||
DeserializationCacheSize: 0,
|
||||
Quorum: false,
|
||||
KeyFile: "/var/run/kubernetes/etcd.key",
|
||||
CAFile: "/var/run/kubernetes/etcdca.crt",
|
||||
CertFile: "/var/run/kubernetes/etcdce.crt",
|
||||
@@ -88,7 +86,6 @@ func TestEtcdOptionsValidate(t *testing.T) {
|
||||
ServerList: []string{"http://127.0.0.1"},
|
||||
Prefix: "/registry",
|
||||
DeserializationCacheSize: 0,
|
||||
Quorum: false,
|
||||
KeyFile: "/var/run/kubernetes/etcd.key",
|
||||
CAFile: "/var/run/kubernetes/etcdca.crt",
|
||||
CertFile: "/var/run/kubernetes/etcdce.crt",
|
||||
@@ -112,7 +109,6 @@ func TestEtcdOptionsValidate(t *testing.T) {
|
||||
ServerList: []string{"http://127.0.0.1"},
|
||||
Prefix: "/registry",
|
||||
DeserializationCacheSize: 0,
|
||||
Quorum: false,
|
||||
KeyFile: "/var/run/kubernetes/etcd.key",
|
||||
CAFile: "/var/run/kubernetes/etcdca.crt",
|
||||
CertFile: "/var/run/kubernetes/etcdce.crt",
|
||||
|
@@ -82,16 +82,10 @@ type objState struct {
|
||||
|
||||
// New returns an etcd3 implementation of storage.Interface.
|
||||
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
|
||||
return newStore(c, true, pagingEnabled, codec, prefix, transformer)
|
||||
return newStore(c, pagingEnabled, codec, prefix, transformer)
|
||||
}
|
||||
|
||||
// NewWithNoQuorumRead returns etcd3 implementation of storage.Interface
|
||||
// where Get operations don't require quorum read.
|
||||
func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
|
||||
return newStore(c, false, pagingEnabled, codec, prefix, transformer)
|
||||
}
|
||||
|
||||
func newStore(c *clientv3.Client, quorumRead, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
|
||||
func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
|
||||
versioner := etcd.APIObjectVersioner{}
|
||||
result := &store{
|
||||
client: c,
|
||||
@@ -106,11 +100,6 @@ func newStore(c *clientv3.Client, quorumRead, pagingEnabled bool, codec runtime.
|
||||
watcher: newWatcher(c, codec, versioner, transformer),
|
||||
leaseManager: newDefaultLeaseManager(c),
|
||||
}
|
||||
if !quorumRead {
|
||||
// In case of non-quorum reads, we can set WithSerializable()
|
||||
// options for all Get operations.
|
||||
result.getOps = append(result.getOps, clientv3.WithSerializable())
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
|
@@ -677,7 +677,7 @@ func TestTransformationFailure(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), false, false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
ctx := context.Background()
|
||||
|
||||
preset := []struct {
|
||||
@@ -754,8 +754,8 @@ func TestList(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
disablePagingStore := newStore(cluster.RandClient(), false, false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
store := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
disablePagingStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
ctx := context.Background()
|
||||
|
||||
// Setup storage with the following structure:
|
||||
@@ -1074,7 +1074,7 @@ func TestListContinuation(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
store := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
ctx := context.Background()
|
||||
|
||||
// Setup storage with the following structure:
|
||||
@@ -1185,7 +1185,7 @@ func TestListInconsistentContinuation(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
store := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
ctx := context.Background()
|
||||
|
||||
// Setup storage with the following structure:
|
||||
@@ -1330,7 +1330,7 @@ func TestListInconsistentContinuation(t *testing.T) {
|
||||
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
store := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
ctx := context.Background()
|
||||
// As 30s is the default timeout for testing in glboal configuration,
|
||||
// we cannot wait longer than that in a single time: change it to 10
|
||||
@@ -1366,7 +1366,7 @@ func TestPrefix(t *testing.T) {
|
||||
"/registry": "/registry",
|
||||
}
|
||||
for configuredPrefix, effectivePrefix := range testcases {
|
||||
store := newStore(cluster.RandClient(), false, true, codec, configuredPrefix, transformer)
|
||||
store := newStore(cluster.RandClient(), true, codec, configuredPrefix, transformer)
|
||||
if store.pathPrefix != effectivePrefix {
|
||||
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix)
|
||||
}
|
||||
|
@@ -225,13 +225,13 @@ func TestWatchError(t *testing.T) {
|
||||
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
invalidStore := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||
invalidStore := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||
ctx := context.Background()
|
||||
w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
validStore := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||
validStore := newStore(cluster.RandClient(), true, codec, "", prefixTransformer{prefix: []byte("test!")})
|
||||
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
|
||||
func(runtime.Object) (runtime.Object, error) {
|
||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
|
||||
|
@@ -42,8 +42,6 @@ type Config struct {
|
||||
KeyFile string
|
||||
CertFile string
|
||||
CAFile string
|
||||
// Quorum indicates that whether read operations should be quorum-level consistent.
|
||||
Quorum bool
|
||||
// Paging indicates whether the server implementation should allow paging (if it is
|
||||
// supported). This is generally configured by feature gating, or by a specific
|
||||
// resource type not wishing to allow paging, and is not intended for end users to
|
||||
@@ -74,6 +72,5 @@ func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
|
||||
DeserializationCacheSize: 0,
|
||||
Codec: codec,
|
||||
CompactionInterval: DefaultCompactInterval,
|
||||
Quorum: true,
|
||||
}
|
||||
}
|
||||
|
@@ -123,8 +123,5 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
|
||||
if transformer == nil {
|
||||
transformer = value.IdentityTransformer
|
||||
}
|
||||
if c.Quorum {
|
||||
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
|
||||
}
|
||||
return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
|
||||
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
|
||||
}
|
||||
|
@@ -32,10 +32,6 @@ func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
|
||||
case "etcd2":
|
||||
return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
|
||||
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
|
||||
// TODO: We have the following features to implement:
|
||||
// - Support secure connection by using key, cert, and CA files.
|
||||
// - Honor "https" scheme to support secure connection in gRPC.
|
||||
// - Support non-quorum read.
|
||||
return newETCD3Storage(c)
|
||||
default:
|
||||
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
|
||||
|
Reference in New Issue
Block a user