Generalize watch storage tests
This commit is contained in:
parent
9d50c0a025
commit
8266c4d934
@ -19,13 +19,9 @@ package cacher
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
goruntime "runtime"
|
|
||||||
"strconv"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
@ -33,19 +29,12 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
|
||||||
"k8s.io/apiserver/pkg/apis/example"
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
|
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
|
||||||
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
||||||
"k8s.io/utils/clock"
|
"k8s.io/utils/clock"
|
||||||
testingclock "k8s.io/utils/clock/testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// watchCacheDefaultCapacity syncs watch cache defaultLowerBoundCapacity.
|
|
||||||
watchCacheDefaultCapacity = 100
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -85,41 +74,6 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha
|
|||||||
return source
|
return source
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*Cacher, storage.Versioner, error) {
|
|
||||||
prefix := "pods"
|
|
||||||
v := storage.APIObjectVersioner{}
|
|
||||||
config := Config{
|
|
||||||
Storage: s,
|
|
||||||
Versioner: v,
|
|
||||||
GroupResource: schema.GroupResource{Resource: "pods"},
|
|
||||||
ResourcePrefix: prefix,
|
|
||||||
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
|
||||||
GetAttrsFunc: GetPodAttrs,
|
|
||||||
NewFunc: newPod,
|
|
||||||
NewListFunc: newPodList,
|
|
||||||
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
|
||||||
Clock: clock,
|
|
||||||
}
|
|
||||||
cacher, err := NewCacherFromConfig(config)
|
|
||||||
return cacher, v, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *example.Pod {
|
|
||||||
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
|
||||||
return obj.DeepCopyObject(), nil, nil
|
|
||||||
}
|
|
||||||
key := "pods/" + obj.Namespace + "/" + obj.Name
|
|
||||||
if err := s.GuaranteedUpdate(context.TODO(), key, &example.Pod{}, old == nil, nil, updateFn, nil); err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
obj.ResourceVersion = ""
|
|
||||||
result := &example.Pod{}
|
|
||||||
if err := s.Get(context.TODO(), key, storage.GetOptions{}, result); err != nil {
|
|
||||||
t.Errorf("unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
func checkStorageInvariants(ctx context.Context, t *testing.T, key string) {
|
func checkStorageInvariants(ctx context.Context, t *testing.T, key string) {
|
||||||
// No-op function since cacher simply passes object creation to the underlying storage.
|
// No-op function since cacher simply passes object creation to the underlying storage.
|
||||||
}
|
}
|
||||||
@ -231,7 +185,10 @@ func TestListContinuationWithFilter(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestListInconsistentContinuation(t *testing.T) {
|
func TestListInconsistentContinuation(t *testing.T) {
|
||||||
// TODO(#109831): Enable use of this test and run it.
|
ctx, cacher, terminate := testSetup(t)
|
||||||
|
t.Cleanup(terminate)
|
||||||
|
// TODO(#109831): Enable use of this by setting compaction.
|
||||||
|
storagetesting.RunTestListInconsistentContinuation(ctx, t, cacher, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConsistentList(t *testing.T) {
|
func TestConsistentList(t *testing.T) {
|
||||||
@ -274,28 +231,6 @@ func TestCount(t *testing.T) {
|
|||||||
storagetesting.RunTestCount(ctx, t, cacher)
|
storagetesting.RunTestCount(ctx, t, cacher)
|
||||||
}
|
}
|
||||||
|
|
||||||
func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
|
|
||||||
_, _, line, _ := goruntime.Caller(1)
|
|
||||||
select {
|
|
||||||
case event := <-w.ResultChan():
|
|
||||||
if e, a := eventType, event.Type; e != a {
|
|
||||||
t.Logf("(called from line %d)", line)
|
|
||||||
t.Errorf("Expected: %s, got: %s", eventType, event.Type)
|
|
||||||
}
|
|
||||||
object := event.Object
|
|
||||||
if co, ok := object.(runtime.CacheableObject); ok {
|
|
||||||
object = co.GetObject()
|
|
||||||
}
|
|
||||||
if e, a := eventObject, object; !apiequality.Semantic.DeepDerivative(e, a) {
|
|
||||||
t.Logf("(called from line %d)", line)
|
|
||||||
t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a)
|
|
||||||
}
|
|
||||||
case <-time.After(wait.ForeverTestTimeout):
|
|
||||||
t.Logf("(called from line %d)", line)
|
|
||||||
t.Errorf("Timed out waiting for an event")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWatch(t *testing.T) {
|
func TestWatch(t *testing.T) {
|
||||||
ctx, cacher, terminate := testSetup(t)
|
ctx, cacher, terminate := testSetup(t)
|
||||||
t.Cleanup(terminate)
|
t.Cleanup(terminate)
|
||||||
@ -305,7 +240,7 @@ func TestWatch(t *testing.T) {
|
|||||||
func TestWatchFromZero(t *testing.T) {
|
func TestWatchFromZero(t *testing.T) {
|
||||||
ctx, cacher, terminate := testSetup(t)
|
ctx, cacher, terminate := testSetup(t)
|
||||||
t.Cleanup(terminate)
|
t.Cleanup(terminate)
|
||||||
storagetesting.RunTestWatchFromZero(ctx, t, cacher, nil)
|
storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDeleteTriggerWatch(t *testing.T) {
|
func TestDeleteTriggerWatch(t *testing.T) {
|
||||||
@ -364,96 +299,6 @@ func TestNamespaceScopedWatch(t *testing.T) {
|
|||||||
storagetesting.RunTestNamespaceScopedWatch(ctx, t, cacher)
|
storagetesting.RunTestNamespaceScopedWatch(ctx, t, cacher)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(wojtek-t): We should extend the generic RunTestWatch test to cover the
|
|
||||||
// scenarios that are not yet covered by it and get rid of this test.
|
|
||||||
func TestWatchDeprecated(t *testing.T) {
|
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix(), true)
|
|
||||||
defer server.Terminate(t)
|
|
||||||
fakeClock := testingclock.NewFakeClock(time.Now())
|
|
||||||
cacher, _, err := newTestCacherWithClock(etcdStorage, fakeClock)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Couldn't create cacher: %v", err)
|
|
||||||
}
|
|
||||||
defer cacher.Stop()
|
|
||||||
|
|
||||||
podFoo := makeTestPodWithName("foo")
|
|
||||||
podBar := makeTestPodWithName("bar")
|
|
||||||
|
|
||||||
podFooPrime := makeTestPodWithName("foo")
|
|
||||||
podFooPrime.Spec.NodeName = "fakeNode"
|
|
||||||
|
|
||||||
podFooBis := makeTestPodWithName("foo")
|
|
||||||
podFooBis.Spec.NodeName = "anotherFakeNode"
|
|
||||||
|
|
||||||
podFooNS2 := makeTestPodWithName("foo")
|
|
||||||
podFooNS2.Namespace += "2"
|
|
||||||
|
|
||||||
// initialVersion is used to initate the watcher at the beginning of the world,
|
|
||||||
// which is not defined precisely in etcd.
|
|
||||||
initialVersion, err := cacher.LastSyncResourceVersion()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
startVersion := strconv.Itoa(int(initialVersion))
|
|
||||||
|
|
||||||
// Set up Watch for object "podFoo".
|
|
||||||
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
defer watcher.Stop()
|
|
||||||
|
|
||||||
// Create in another namespace first to make sure events from other namespaces don't get delivered
|
|
||||||
updatePod(t, etcdStorage, podFooNS2, nil)
|
|
||||||
|
|
||||||
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
|
|
||||||
_ = updatePod(t, etcdStorage, podBar, nil)
|
|
||||||
fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated)
|
|
||||||
|
|
||||||
verifyWatchEvent(t, watcher, watch.Added, podFoo)
|
|
||||||
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
|
|
||||||
|
|
||||||
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: storage.Everything})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
defer initialWatcher.Stop()
|
|
||||||
|
|
||||||
verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime)
|
|
||||||
|
|
||||||
// Now test watch from "now".
|
|
||||||
nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Unexpected error: %v", err)
|
|
||||||
}
|
|
||||||
defer nowWatcher.Stop()
|
|
||||||
|
|
||||||
verifyWatchEvent(t, nowWatcher, watch.Added, podFooPrime)
|
|
||||||
|
|
||||||
_ = updatePod(t, etcdStorage, podFooBis, fooUpdated)
|
|
||||||
|
|
||||||
verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis)
|
|
||||||
|
|
||||||
// Add watchCacheDefaultCapacity events to make current watch cache full.
|
|
||||||
// Make start and last event duration exceed eventFreshDuration(current 75s) to ensure watch cache won't expand.
|
|
||||||
for i := 0; i < watchCacheDefaultCapacity; i++ {
|
|
||||||
fakeClock.SetTime(time.Now().Add(time.Duration(i) * time.Minute))
|
|
||||||
podFoo := makeTestPodWithName(fmt.Sprintf("foo-%d", i))
|
|
||||||
updatePod(t, etcdStorage, podFoo, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check whether we get too-old error via the watch channel
|
|
||||||
tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Expected no direct error, got %v", err)
|
|
||||||
}
|
|
||||||
defer tooOldWatcher.Stop()
|
|
||||||
|
|
||||||
// Ensure we get a "Gone" error.
|
|
||||||
expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus
|
|
||||||
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestWatchDispatchBookmarkEvents(t *testing.T) {
|
func TestWatchDispatchBookmarkEvents(t *testing.T) {
|
||||||
ctx, cacher, terminate := testSetup(t)
|
ctx, cacher, terminate := testSetup(t)
|
||||||
t.Cleanup(terminate)
|
t.Cleanup(terminate)
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package cacher
|
package cacher
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
@ -66,13 +67,53 @@ func newEtcdTestStorage(t *testing.T, prefix string, pagingEnabled bool) (*etcd3
|
|||||||
return server, storage
|
return server, storage
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeTestPodWithName(name string) *example.Pod {
|
|
||||||
return &example.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
|
|
||||||
Spec: storagetesting.DeepEqualSafePodSpec(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func computePodKey(obj *example.Pod) string {
|
func computePodKey(obj *example.Pod) string {
|
||||||
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
|
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func compactStorage(c *Cacher) storagetesting.Compaction {
|
||||||
|
return func(ctx context.Context, t *testing.T, resourceVersion string) {
|
||||||
|
versioner := storage.APIObjectVersioner{}
|
||||||
|
rv, err := versioner.ParseResourceVersion(resourceVersion)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = c.watchCache.waitUntilFreshAndBlock(context.TODO(), rv)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("WatchCache didn't caught up to RV: %v", rv)
|
||||||
|
}
|
||||||
|
c.watchCache.RUnlock()
|
||||||
|
|
||||||
|
c.watchCache.Lock()
|
||||||
|
defer c.watchCache.Unlock()
|
||||||
|
c.Lock()
|
||||||
|
defer c.Unlock()
|
||||||
|
|
||||||
|
if c.watchCache.resourceVersion < rv {
|
||||||
|
t.Fatalf("Can't compact into a future version: %v", resourceVersion)
|
||||||
|
}
|
||||||
|
if rv < c.watchCache.listResourceVersion {
|
||||||
|
t.Fatalf("Can't compact into a past version: %v", resourceVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(c.watchers.allWatchers) > 0 || len(c.watchers.valueWatchers) > 0 {
|
||||||
|
// We could consider terminating those watchers, but given
|
||||||
|
// watchcache doesn't really support compaction and we don't
|
||||||
|
// exercise it in tests, we just throw an error here.
|
||||||
|
t.Error("Open watchers are not supported during compaction")
|
||||||
|
}
|
||||||
|
|
||||||
|
for c.watchCache.startIndex < c.watchCache.endIndex {
|
||||||
|
index := c.watchCache.startIndex % c.watchCache.capacity
|
||||||
|
if c.watchCache.cache[index].ResourceVersion > rv {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
c.watchCache.startIndex++
|
||||||
|
}
|
||||||
|
c.watchCache.listResourceVersion = rv
|
||||||
|
|
||||||
|
// TODO(wojtek-t): We should also compact the underlying etcd storage.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -305,7 +305,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
|||||||
|
|
||||||
if err := func() error {
|
if err := func() error {
|
||||||
// TODO: We should consider moving this lock below after the watchCacheEvent
|
// TODO: We should consider moving this lock below after the watchCacheEvent
|
||||||
// is created. In such situation, the only problematic scenario is Replace(
|
// is created. In such situation, the only problematic scenario is Replace()
|
||||||
// happening after getting object from store and before acquiring a lock.
|
// happening after getting object from store and before acquiring a lock.
|
||||||
// Maybe introduce another lock for this purpose.
|
// Maybe introduce another lock for this purpose.
|
||||||
w.Lock()
|
w.Lock()
|
||||||
|
@ -150,7 +150,7 @@ func testCheckEventType(t *testing.T, expectEventType watch.EventType, w watch.I
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testCheckResult(t *testing.T, expectEventType watch.EventType, w watch.Interface, expectObj *example.Pod) {
|
func testCheckResult(t *testing.T, expectEventType watch.EventType, w watch.Interface, expectObj runtime.Object) {
|
||||||
testCheckResultFunc(t, expectEventType, w, func(object runtime.Object) error {
|
testCheckResultFunc(t, expectEventType, w, func(object runtime.Object) error {
|
||||||
ExpectNoDiff(t, "incorrect object", expectObj, object)
|
ExpectNoDiff(t, "incorrect object", expectObj, object)
|
||||||
return nil
|
return nil
|
||||||
|
@ -19,12 +19,15 @@ package testing
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
@ -238,8 +241,8 @@ func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Inter
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Update again
|
// Update again
|
||||||
out = &example.Pod{}
|
newOut := &example.Pod{}
|
||||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
err = store.GuaranteedUpdate(ctx, key, newOut, true, nil, storage.SimpleUpdate(
|
||||||
func(runtime.Object) (runtime.Object, error) {
|
func(runtime.Object) (runtime.Object, error) {
|
||||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}, nil
|
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}, nil
|
||||||
}), nil)
|
}), nil)
|
||||||
@ -248,14 +251,37 @@ func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Inter
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Compact previous versions
|
// Compact previous versions
|
||||||
compaction(ctx, t, out.ResourceVersion)
|
compaction(ctx, t, newOut.ResourceVersion)
|
||||||
|
|
||||||
// Make sure we can still watch from 0 and receive an ADDED event
|
// Make sure we can still watch from 0 and receive an ADDED event
|
||||||
w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||||
|
defer w.Stop()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Watch failed: %v", err)
|
t.Fatalf("Watch failed: %v", err)
|
||||||
}
|
}
|
||||||
testCheckResult(t, watch.Added, w, out)
|
testCheckResult(t, watch.Added, w, newOut)
|
||||||
|
|
||||||
|
// Make sure we can't watch from older resource versions anymoer and get a "Gone" error.
|
||||||
|
tooOldWatcher, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: out.ResourceVersion, Predicate: storage.Everything})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Watch failed: %v", err)
|
||||||
|
}
|
||||||
|
defer tooOldWatcher.Stop()
|
||||||
|
expiredError := errors.NewResourceExpired("").ErrStatus
|
||||||
|
// TODO(wojtek-t): It seems that etcd is currently returning a different error,
|
||||||
|
// being an Internal error of "etcd event received with PrevKv=nil".
|
||||||
|
// We temporary allow both but we should unify here.
|
||||||
|
internalError := metav1.Status{
|
||||||
|
Status: metav1.StatusFailure,
|
||||||
|
Code: http.StatusInternalServerError,
|
||||||
|
Reason: metav1.StatusReasonInternalError,
|
||||||
|
}
|
||||||
|
testCheckResultFunc(t, watch.Error, tooOldWatcher, func(obj runtime.Object) error {
|
||||||
|
if !apiequality.Semantic.DeepDerivative(&expiredError, obj) && !apiequality.Semantic.DeepDerivative(&internalError, obj) {
|
||||||
|
t.Errorf("expected: %#v; got %#v", &expiredError, obj)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func RunTestDeleteTriggerWatch(ctx context.Context, t *testing.T, store storage.Interface) {
|
func RunTestDeleteTriggerWatch(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||||
|
Loading…
Reference in New Issue
Block a user