Pipe newFunc to etcd3 storage layer

This commit is contained in:
wojtekt 2020-08-31 11:58:45 +02:00
parent 37cf1d6aaa
commit fbd65a265a
17 changed files with 48 additions and 35 deletions

View File

@ -258,7 +258,7 @@ func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
if err != nil { if err != nil {
klog.Fatalf("Error determining service IP ranges: %v", err) klog.Fatalf("Error determining service IP ranges: %v", err)
} }
leaseStorage, _, err := storagefactory.Create(*config) leaseStorage, _, err := storagefactory.Create(*config, nil)
if err != nil { if err != nil {
klog.Fatalf("Error creating storage factory: %v", err) klog.Fatalf("Error creating storage factory: %v", err)
} }

View File

@ -30,7 +30,7 @@ import (
func TestPodLogValidates(t *testing.T) { func TestPodLogValidates(t *testing.T) {
config, server := registrytest.NewEtcdStorage(t, "") config, server := registrytest.NewEtcdStorage(t, "")
defer server.Terminate(t) defer server.Terminate(t)
s, destroyFunc, err := generic.NewRawStorage(config) s, destroyFunc, err := generic.NewRawStorage(config, nil)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }

View File

@ -62,7 +62,7 @@ var _ rangeallocation.RangeRegistry = &Etcd{}
// NewEtcd returns an allocator that is backed by Etcd and can manage // NewEtcd returns an allocator that is backed by Etcd and can manage
// persisting the snapshot state of allocation after each allocation is made. // persisting the snapshot state of allocation after each allocation is made.
func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.GroupResource, config *storagebackend.Config) (*Etcd, error) { func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.GroupResource, config *storagebackend.Config) (*Etcd, error) {
storage, d, err := generic.NewRawStorage(config) storage, d, err := generic.NewRawStorage(config, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -54,7 +54,7 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, ipallocator.Interfa
if err != nil { if err != nil {
t.Fatalf("unexpected error creating etcd: %v", err) t.Fatalf("unexpected error creating etcd: %v", err)
} }
s, d, err := generic.NewRawStorage(etcdStorage) s, d, err := generic.NewRawStorage(etcdStorage, nil)
if err != nil { if err != nil {
t.Fatalf("Couldn't create storage: %v", err) t.Fatalf("Couldn't create storage: %v", err)
} }

View File

@ -57,7 +57,7 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, portallocator.Inter
if err != nil { if err != nil {
t.Fatalf("unexpected error creating etcd: %v", err) t.Fatalf("unexpected error creating etcd: %v", err)
} }
s, d, err := generic.NewRawStorage(etcdStorage) s, d, err := generic.NewRawStorage(etcdStorage, nil)
if err != nil { if err != nil {
t.Fatalf("Couldn't create storage: %v", err) t.Fatalf("Couldn't create storage: %v", err)
} }

View File

@ -38,7 +38,7 @@ import (
func NewDryRunnableTestStorage(t *testing.T) (DryRunnableStorage, func()) { func NewDryRunnableTestStorage(t *testing.T) (DryRunnableStorage, func()) {
server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion) sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion)
s, destroy, err := factory.Create(*sc) s, destroy, err := factory.Create(*sc, nil)
if err != nil { if err != nil {
t.Fatalf("Error creating storage: %v", err) t.Fatalf("Error creating storage: %v", err)
} }

View File

@ -45,7 +45,7 @@ func StorageWithCacher() generic.StorageDecorator {
triggerFuncs storage.IndexerFuncs, triggerFuncs storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) { indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
s, d, err := generic.NewRawStorage(storageConfig) s, d, err := generic.NewRawStorage(storageConfig, newFunc)
if err != nil { if err != nil {
return s, d, err return s, d, err
} }

View File

@ -1601,8 +1601,11 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
strategy := &testRESTStrategy{scheme, names.SimpleNameGenerator, true, false, true} strategy := &testRESTStrategy{scheme, names.SimpleNameGenerator, true, false, true}
newFunc := func() runtime.Object { return &example.Pod{} }
newListFunc := func() runtime.Object { return &example.PodList{} }
sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion) sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc) s, dFunc, err := factory.Create(*sc, newFunc)
if err != nil { if err != nil {
t.Fatalf("Error creating storage: %v", err) t.Fatalf("Error creating storage: %v", err)
} }
@ -1617,8 +1620,8 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
ResourcePrefix: podPrefix, ResourcePrefix: podPrefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) }, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
GetAttrsFunc: getPodAttrs, GetAttrsFunc: getPodAttrs,
NewFunc: func() runtime.Object { return &example.Pod{} }, NewFunc: newFunc,
NewListFunc: func() runtime.Object { return &example.PodList{} }, NewListFunc: newListFunc,
Codec: sc.Codec, Codec: sc.Codec,
} }
cacher, err := cacherstorage.NewCacherFromConfig(config) cacher, err := cacherstorage.NewCacherFromConfig(config)

View File

@ -47,12 +47,12 @@ func UndecoratedStorage(
getAttrsFunc storage.AttrFunc, getAttrsFunc storage.AttrFunc,
trigger storage.IndexerFuncs, trigger storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) { indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
return NewRawStorage(config) return NewRawStorage(config, newFunc)
} }
// NewRawStorage creates the low level kv storage. This is a work-around for current // NewRawStorage creates the low level kv storage. This is a work-around for current
// two layer of same storage interface. // two layer of same storage interface.
// TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method. // TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method.
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error) { func NewRawStorage(config *storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, factory.DestroyFunc, error) {
return factory.Create(*config) return factory.Create(*config, newFunc)
} }

View File

@ -83,11 +83,11 @@ type objState struct {
} }
// New returns an etcd3 implementation of storage.Interface. // New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface { func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
return newStore(c, pagingEnabled, codec, prefix, transformer) return newStore(c, newFunc, pagingEnabled, codec, prefix, transformer)
} }
func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store { func newStore(c *clientv3.Client, _ func() runtime.Object, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
versioner := APIObjectVersioner{} versioner := APIObjectVersioner{}
result := &store{ result := &store{
client: c, client: c,

View File

@ -99,6 +99,10 @@ func (p *prefixTransformer) resetReads() {
p.reads = 0 p.reads = 0
} }
func newPod() runtime.Object {
return &example.Pod{}
}
func TestCreate(t *testing.T) { func TestCreate(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
@ -818,7 +822,7 @@ func TestTransformationFailure(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t) defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) store := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background() ctx := context.Background()
preset := []struct { preset := []struct {
@ -895,8 +899,8 @@ func TestList(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t) defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
disablePagingStore := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) disablePagingStore := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background() ctx := context.Background()
// Setup storage with the following structure: // Setup storage with the following structure:
@ -1394,7 +1398,7 @@ func TestListContinuation(t *testing.T) {
etcdClient := cluster.RandClient() etcdClient := cluster.RandClient()
recorder := &clientRecorder{KV: etcdClient.KV} recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder etcdClient.KV = recorder
store := newStore(etcdClient, true, codec, "", transformer) store := newStore(etcdClient, newPod, true, codec, "", transformer)
ctx := context.Background() ctx := context.Background()
// Setup storage with the following structure: // Setup storage with the following structure:
@ -1556,7 +1560,7 @@ func TestListContinuationWithFilter(t *testing.T) {
etcdClient := cluster.RandClient() etcdClient := cluster.RandClient()
recorder := &clientRecorder{KV: etcdClient.KV} recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder etcdClient.KV = recorder
store := newStore(etcdClient, true, codec, "", transformer) store := newStore(etcdClient, newPod, true, codec, "", transformer)
ctx := context.Background() ctx := context.Background()
preset := []struct { preset := []struct {
@ -1659,7 +1663,7 @@ func TestListInconsistentContinuation(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t) defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background() ctx := context.Background()
// Setup storage with the following structure: // Setup storage with the following structure:
@ -1804,7 +1808,7 @@ func TestListInconsistentContinuation(t *testing.T) {
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background() ctx := context.Background()
// As 30s is the default timeout for testing in glboal configuration, // As 30s is the default timeout for testing in glboal configuration,
// we cannot wait longer than that in a single time: change it to 10 // we cannot wait longer than that in a single time: change it to 10
@ -1844,7 +1848,7 @@ func TestPrefix(t *testing.T) {
"/registry": "/registry", "/registry": "/registry",
} }
for configuredPrefix, effectivePrefix := range testcases { for configuredPrefix, effectivePrefix := range testcases {
store := newStore(cluster.RandClient(), true, codec, configuredPrefix, transformer) store := newStore(cluster.RandClient(), nil, true, codec, configuredPrefix, transformer)
if store.pathPrefix != effectivePrefix { if store.pathPrefix != effectivePrefix {
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix) t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix)
} }
@ -2011,7 +2015,7 @@ func TestConsistentList(t *testing.T) {
transformer := &fancyTransformer{ transformer := &fancyTransformer{
transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)}, transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)},
} }
store := newStore(cluster.RandClient(), true, codec, "", transformer) store := newStore(cluster.RandClient(), newPod, true, codec, "", transformer)
transformer.store = store transformer.store = store
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {

View File

@ -225,13 +225,13 @@ func TestWatchError(t *testing.T) {
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)} codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t) defer cluster.Terminate(t)
invalidStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")}) invalidStore := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte("test!")})
ctx := context.Background() ctx := context.Background()
w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
validStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")}) validStore := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte("test!")})
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate( validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) { func(runtime.Object) (runtime.Object, error) {
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil

View File

@ -34,6 +34,7 @@ go_library(
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory", importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory",
importpath = "k8s.io/apiserver/pkg/storage/storagebackend/factory", importpath = "k8s.io/apiserver/pkg/storage/storagebackend/factory",
deps = [ deps = [
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library",

View File

@ -31,6 +31,7 @@ import (
"go.etcd.io/etcd/pkg/transport" "go.etcd.io/etcd/pkg/transport"
"google.golang.org/grpc" "google.golang.org/grpc"
"k8s.io/apimachinery/pkg/runtime"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/egressselector" "k8s.io/apiserver/pkg/server/egressselector"
@ -217,7 +218,7 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration
}, nil }, nil
} }
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { func newETCD3Storage(c storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval) stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
@ -249,7 +250,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
if transformer == nil { if transformer == nil {
transformer = value.IdentityTransformer transformer = value.IdentityTransformer
} }
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging), destroyFunc, nil
} }
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the // startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the

View File

@ -19,6 +19,7 @@ package factory
import ( import (
"fmt" "fmt"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend"
) )
@ -27,12 +28,12 @@ import (
type DestroyFunc func() type DestroyFunc func()
// Create creates a storage backend based on given config. // Create creates a storage backend based on given config.
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { func Create(c storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
switch c.Type { switch c.Type {
case "etcd2": case "etcd2":
return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type) return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c) return newETCD3Storage(c, newFunc)
default: default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type) return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
} }

View File

@ -75,7 +75,7 @@ func TestTLSConnection(t *testing.T) {
}, },
Codec: codec, Codec: codec,
} }
storage, destroyFunc, err := newETCD3Storage(cfg) storage, destroyFunc, err := newETCD3Storage(cfg, nil)
defer destroyFunc() defer destroyFunc()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -99,9 +99,12 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha
return source return source
} }
func newPod() runtime.Object { return &example.Pod{} }
func newPodList() runtime.Object { return &example.PodList{} }
func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, value.IdentityTransformer, true) storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true)
return server, storage return server, storage
} }
@ -118,8 +121,8 @@ func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstor
ResourcePrefix: prefix, ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
GetAttrsFunc: GetAttrs, GetAttrsFunc: GetAttrs,
NewFunc: func() runtime.Object { return &example.Pod{} }, NewFunc: newPod,
NewListFunc: func() runtime.Object { return &example.PodList{} }, NewListFunc: newPodList,
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
Clock: clock, Clock: clock,
} }