storage: Move cacher tests to reside with the cacher code

Doing this allows us to implement some more nuanced cacher manipulations
to be used in testing. For ex: implementing a test-only compaction method
for the watch cache.

Signed-off-by: Madhav Jivrajani <madhav.jiv@gmail.com>
This commit is contained in:
Madhav Jivrajani 2023-06-06 14:11:03 +05:30
parent 70978e4af6
commit 6d66fbc6b6

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package tests package cacher
import ( import (
"context" "context"
@ -24,7 +24,6 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/apimachinery/pkg/api/apitesting"
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -32,27 +31,18 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example" "k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1" examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
"k8s.io/apiserver/pkg/storage/etcd3"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
storagetesting "k8s.io/apiserver/pkg/storage/testing" storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
"k8s.io/utils/clock" "k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing" testingclock "k8s.io/utils/clock/testing"
) )
var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
)
const ( const (
// watchCacheDefaultCapacity syncs watch cache defaultLowerBoundCapacity. // watchCacheDefaultCapacity syncs watch cache defaultLowerBoundCapacity.
watchCacheDefaultCapacity = 100 watchCacheDefaultCapacity = 100
@ -95,27 +85,10 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha
return source return source
} }
func newPod() runtime.Object { return &example.Pod{} } func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*Cacher, storage.Versioner, error) {
func newPodList() runtime.Object { return &example.PodList{} }
func newEtcdTestStorage(t *testing.T, prefix string, pagingEnabled bool) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(
server.V3Client,
apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion),
newPod,
prefix,
schema.GroupResource{Resource: "pods"},
identity.NewEncryptCheckTransformer(),
pagingEnabled,
etcd3.NewDefaultLeaseManagerConfig())
return server, storage
}
func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstorage.Cacher, storage.Versioner, error) {
prefix := "pods" prefix := "pods"
v := storage.APIObjectVersioner{} v := storage.APIObjectVersioner{}
config := cacherstorage.Config{ config := Config{
Storage: s, Storage: s,
Versioner: v, Versioner: v,
GroupResource: schema.GroupResource{Resource: "pods"}, GroupResource: schema.GroupResource{Resource: "pods"},
@ -127,17 +100,10 @@ func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstor
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
Clock: clock, Clock: clock,
} }
cacher, err := cacherstorage.NewCacherFromConfig(config) cacher, err := NewCacherFromConfig(config)
return cacher, v, err return cacher, v, err
} }
func makeTestPod(name string) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
Spec: storagetesting.DeepEqualSafePodSpec(),
}
}
func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *example.Pod { func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *example.Pod {
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
return obj.DeepCopyObject(), nil, nil return obj.DeepCopyObject(), nil, nil
@ -410,16 +376,16 @@ func TestWatchDeprecated(t *testing.T) {
} }
defer cacher.Stop() defer cacher.Stop()
podFoo := makeTestPod("foo") podFoo := makeTestPodWithName("foo")
podBar := makeTestPod("bar") podBar := makeTestPodWithName("bar")
podFooPrime := makeTestPod("foo") podFooPrime := makeTestPodWithName("foo")
podFooPrime.Spec.NodeName = "fakeNode" podFooPrime.Spec.NodeName = "fakeNode"
podFooBis := makeTestPod("foo") podFooBis := makeTestPodWithName("foo")
podFooBis.Spec.NodeName = "anotherFakeNode" podFooBis.Spec.NodeName = "anotherFakeNode"
podFooNS2 := makeTestPod("foo") podFooNS2 := makeTestPodWithName("foo")
podFooNS2.Namespace += "2" podFooNS2.Namespace += "2"
// initialVersion is used to initate the watcher at the beginning of the world, // initialVersion is used to initate the watcher at the beginning of the world,
@ -472,7 +438,7 @@ func TestWatchDeprecated(t *testing.T) {
// Make start and last event duration exceed eventFreshDuration(current 75s) to ensure watch cache won't expand. // Make start and last event duration exceed eventFreshDuration(current 75s) to ensure watch cache won't expand.
for i := 0; i < watchCacheDefaultCapacity; i++ { for i := 0; i < watchCacheDefaultCapacity; i++ {
fakeClock.SetTime(time.Now().Add(time.Duration(i) * time.Minute)) fakeClock.SetTime(time.Now().Add(time.Duration(i) * time.Minute))
podFoo := makeTestPod(fmt.Sprintf("foo-%d", i)) podFoo := makeTestPodWithName(fmt.Sprintf("foo-%d", i))
updatePod(t, etcdStorage, podFoo, nil) updatePod(t, etcdStorage, podFoo, nil)
} }
@ -553,7 +519,7 @@ func withoutPaging(options *setupOptions) {
options.pagingEnabled = false options.pagingEnabled = false
} }
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *cacherstorage.Cacher, tearDownFunc) { func testSetup(t *testing.T, opts ...setupOption) (context.Context, *Cacher, tearDownFunc) {
setupOpts := setupOptions{} setupOpts := setupOptions{}
opts = append([]setupOption{withDefaults}, opts...) opts = append([]setupOption{withDefaults}, opts...)
for _, opt := range opts { for _, opt := range opts {
@ -567,7 +533,7 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *cacherstora
Errors: 1, Errors: 1,
} }
config := cacherstorage.Config{ config := Config{
Storage: wrappedStorage, Storage: wrappedStorage,
Versioner: storage.APIObjectVersioner{}, Versioner: storage.APIObjectVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"}, GroupResource: schema.GroupResource{Resource: "pods"},
@ -580,7 +546,7 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *cacherstora
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
Clock: setupOpts.clock, Clock: setupOpts.clock,
} }
cacher, err := cacherstorage.NewCacherFromConfig(config) cacher, err := NewCacherFromConfig(config)
if err != nil { if err != nil {
t.Fatalf("Failed to initialize cacher: %v", err) t.Fatalf("Failed to initialize cacher: %v", err)
} }