Allow ValueTransformer to indicate resource is stale

Allows a transformer (such as an encrypter) to force an update if a new
key is in use, thus allowing simple writes to the REST layer to
trivially migrate keys.
This commit is contained in:
Clayton Coleman 2017-02-05 22:37:10 -05:00
parent 4313bc6df3
commit bc4b50640b
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
6 changed files with 111 additions and 60 deletions

View File

@ -35,23 +35,27 @@ import (
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd/metrics" "k8s.io/apiserver/pkg/storage/etcd/metrics"
etcdutil "k8s.io/apiserver/pkg/storage/etcd/util"
utilcache "k8s.io/apiserver/pkg/util/cache" utilcache "k8s.io/apiserver/pkg/util/cache"
utiltrace "k8s.io/apiserver/pkg/util/trace" utiltrace "k8s.io/apiserver/pkg/util/trace"
etcdutil "k8s.io/apiserver/pkg/storage/etcd/util"
) )
// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods // ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods
// must be able to undo the transformation caused by the other. // must be able to undo the transformation caused by the other.
type ValueTransformer interface { type ValueTransformer interface {
// TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error. // TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error.
TransformStringFromStorage(string) (string, error) // Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object
// have not changed.
TransformStringFromStorage(string) (value string, stale bool, err error)
// TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error. // TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error.
TransformStringToStorage(string) (string, error) TransformStringToStorage(string) (value string, err error)
} }
type identityTransformer struct{} type identityTransformer struct{}
func (identityTransformer) TransformStringFromStorage(s string) (string, error) { return s, nil } func (identityTransformer) TransformStringFromStorage(s string) (string, bool, error) {
return s, false, nil
}
func (identityTransformer) TransformStringToStorage(s string) (string, error) { return s, nil } func (identityTransformer) TransformStringToStorage(s string) (string, error) { return s, nil }
// IdentityTransformer performs no transformation on the provided values. // IdentityTransformer performs no transformation on the provided values.
@ -148,7 +152,7 @@ func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Ob
if _, err := conversion.EnforcePtr(out); err != nil { if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer") panic("unable to convert output object to pointer")
} }
_, _, err = h.extractObj(response, err, out, false, false) _, _, _, err = h.extractObj(response, err, out, false, false)
} }
return err return err
} }
@ -186,7 +190,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object,
if !etcdutil.IsEtcdNotFound(err) { if !etcdutil.IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update the out object. // if the object that existed prior to the delete is returned by etcd, update the out object.
if err != nil || response.PrevNode != nil { if err != nil || response.PrevNode != nil {
_, _, err = h.extractObj(response, err, out, false, true) _, _, _, err = h.extractObj(response, err, out, false, true)
} }
} }
return toStorageErr(err, key, 0) return toStorageErr(err, key, 0)
@ -195,7 +199,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object,
// Check the preconditions match. // Check the preconditions match.
obj := reflect.New(v.Type()).Interface().(runtime.Object) obj := reflect.New(v.Type()).Interface().(runtime.Object)
for { for {
_, node, res, err := h.bodyAndExtractObj(ctx, key, obj, false) _, node, res, _, err := h.bodyAndExtractObj(ctx, key, obj, false)
if err != nil { if err != nil {
return toStorageErr(err, key, 0) return toStorageErr(err, key, 0)
} }
@ -216,7 +220,7 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object,
if !etcdutil.IsEtcdNotFound(err) { if !etcdutil.IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update the out object. // if the object that existed prior to the delete is returned by etcd, update the out object.
if err != nil || response.PrevNode != nil { if err != nil || response.PrevNode != nil {
_, _, err = h.extractObj(response, err, out, false, true) _, _, _, err = h.extractObj(response, err, out, false, true)
} }
} }
return toStorageErr(err, key, 0) return toStorageErr(err, key, 0)
@ -262,13 +266,13 @@ func (h *etcdHelper) Get(ctx context.Context, key string, resourceVersion string
glog.Errorf("Context is nil") glog.Errorf("Context is nil")
} }
key = path.Join(h.pathPrefix, key) key = path.Join(h.pathPrefix, key)
_, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound) _, _, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound)
return err return err
} }
// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information // bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
// about the response, like the current etcd index and the ttl. // about the response, like the current etcd index and the ttl.
func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) { func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, stale bool, err error) {
if ctx == nil { if ctx == nil {
glog.Errorf("Context is nil") glog.Errorf("Context is nil")
} }
@ -281,13 +285,13 @@ func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr r
response, err := h.etcdKeysAPI.Get(ctx, key, opts) response, err := h.etcdKeysAPI.Get(ctx, key, opts)
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime) metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
if err != nil && !etcdutil.IsEtcdNotFound(err) { if err != nil && !etcdutil.IsEtcdNotFound(err) {
return "", nil, nil, toStorageErr(err, key, 0) return "", nil, nil, false, toStorageErr(err, key, 0)
} }
body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false) body, node, stale, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
return body, node, response, toStorageErr(err, key, 0) return body, node, response, stale, toStorageErr(err, key, 0)
} }
func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, err error) { func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, stale bool, err error) {
if response != nil { if response != nil {
if prevNode { if prevNode {
node = response.PrevNode node = response.PrevNode
@ -299,30 +303,30 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
if ignoreNotFound { if ignoreNotFound {
v, err := conversion.EnforcePtr(objPtr) v, err := conversion.EnforcePtr(objPtr)
if err != nil { if err != nil {
return "", nil, err return "", nil, false, err
} }
v.Set(reflect.Zero(v.Type())) v.Set(reflect.Zero(v.Type()))
return "", nil, nil return "", nil, false, nil
} else if inErr != nil { } else if inErr != nil {
return "", nil, inErr return "", nil, false, inErr
} }
return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response) return "", nil, false, fmt.Errorf("unable to locate a value on the response: %#v", response)
} }
body, err = h.transformer.TransformStringFromStorage(node.Value) body, stale, err = h.transformer.TransformStringFromStorage(node.Value)
if err != nil { if err != nil {
return body, nil, storage.NewInternalError(err.Error()) return body, nil, stale, storage.NewInternalError(err.Error())
} }
out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr) out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr)
if err != nil { if err != nil {
return body, nil, err return body, nil, stale, err
} }
if out != objPtr { if out != objPtr {
return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr)) return body, nil, stale, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
} }
// being unable to set the version does not prevent the object from being extracted // being unable to set the version does not prevent the object from being extracted
_ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex) _ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex)
return body, node, err return body, node, stale, err
} }
// Implements storage.Interface. // Implements storage.Interface.
@ -389,7 +393,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
} }
} else { } else {
body, err := h.transformer.TransformStringFromStorage(node.Value) body, _, err := h.transformer.TransformStringFromStorage(node.Value)
if err != nil { if err != nil {
// omit items from lists and watches that cannot be transformed, but log the error // omit items from lists and watches that cannot be transformed, but log the error
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", node.Key, err)) utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", node.Key, err))
@ -485,7 +489,7 @@ func (h *etcdHelper) GuaranteedUpdate(
key = path.Join(h.pathPrefix, key) key = path.Join(h.pathPrefix, key)
for { for {
obj := reflect.New(v.Type()).Interface().(runtime.Object) obj := reflect.New(v.Type()).Interface().(runtime.Object)
origBody, node, res, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound) origBody, node, res, stale, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound)
if err != nil { if err != nil {
return toStorageErr(err, key, 0) return toStorageErr(err, key, 0)
} }
@ -552,14 +556,15 @@ func (h *etcdHelper) GuaranteedUpdate(
if etcdutil.IsEtcdNodeExist(err) { if etcdutil.IsEtcdNodeExist(err) {
continue continue
} }
_, _, err = h.extractObj(response, err, ptrToType, false, false) _, _, _, err = h.extractObj(response, err, ptrToType, false, false)
return toStorageErr(err, key, 0) return toStorageErr(err, key, 0)
} }
if newBody == origBody {
// If we don't send an update, we simply return the currently existing // If we don't send an update, we simply return the currently existing
// version of the object. // version of the object. However, the value transformer may indicate that
_, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false) // the on disk representation has changed and that we must commit an update.
if newBody == origBody && !stale {
_, _, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false)
return err return err
} }
@ -575,7 +580,7 @@ func (h *etcdHelper) GuaranteedUpdate(
// Try again. // Try again.
continue continue
} }
_, _, err = h.extractObj(response, err, ptrToType, false, false) _, _, _, err = h.extractObj(response, err, ptrToType, false, false)
return toStorageErr(err, key, int64(index)) return toStorageErr(err, key, int64(index))
} }
} }

View File

@ -48,14 +48,15 @@ import (
// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out. // prefixTransformer adds and verifies that all data has the correct prefix on its way in and out.
type prefixTransformer struct { type prefixTransformer struct {
prefix string prefix string
stale bool
err error err error
} }
func (p prefixTransformer) TransformStringFromStorage(s string) (string, error) { func (p prefixTransformer) TransformStringFromStorage(s string) (string, bool, error) {
if !strings.HasPrefix(s, p.prefix) { if !strings.HasPrefix(s, p.prefix) {
return "", fmt.Errorf("value does not have expected prefix: %s", s) return "", false, fmt.Errorf("value does not have expected prefix: %s", s)
} }
return strings.TrimPrefix(s, p.prefix), p.err return strings.TrimPrefix(s, p.prefix), p.stale, p.err
} }
func (p prefixTransformer) TransformStringToStorage(s string) (string, error) { func (p prefixTransformer) TransformStringToStorage(s string) (string, error) {
if len(s) > 0 { if len(s) > 0 {
@ -449,7 +450,8 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix()) helper := newEtcdHelper(server.Client, scheme, codec, etcdtest.PathPrefix())
obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} obj := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { original := &storagetesting.TestResource{}
err := helper.GuaranteedUpdate(context.TODO(), key, original, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
return obj, nil return obj, nil
})) }))
if err != nil { if err != nil {
@ -458,7 +460,27 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
// Update an existing node with the same data // Update an existing node with the same data
callbackCalled := false callbackCalled := false
objUpdate := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1} objUpdate := &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: original.ResourceVersion}, Value: 1}
result := &storagetesting.TestResource{}
err = helper.GuaranteedUpdate(context.TODO(), key, result, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
callbackCalled = true
return objUpdate, nil
}))
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
if !callbackCalled {
t.Errorf("tryUpdate callback should have been called.")
}
if result.ResourceVersion != original.ResourceVersion {
t.Fatalf("updated the object resource version")
}
// Update an existing node with the same data but return stale
helper.transformer = prefixTransformer{prefix: "test!", stale: true}
callbackCalled = false
result = &storagetesting.TestResource{}
objUpdate = &storagetesting.TestResource{ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Value: 1}
err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { err = helper.GuaranteedUpdate(context.TODO(), key, &storagetesting.TestResource{}, true, nil, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) {
callbackCalled = true callbackCalled = true
return objUpdate, nil return objUpdate, nil
@ -469,6 +491,9 @@ func TestGuaranteedUpdateNoChange(t *testing.T) {
if !callbackCalled { if !callbackCalled {
t.Errorf("tryUpdate callback should have been called.") t.Errorf("tryUpdate callback should have been called.")
} }
if result.ResourceVersion == original.ResourceVersion {
t.Errorf("did not update the object resource version")
}
} }
func TestGuaranteedUpdateKeyNotFound(t *testing.T) { func TestGuaranteedUpdateKeyNotFound(t *testing.T) {

View File

@ -319,7 +319,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
return obj, nil return obj, nil
} }
body, err := w.valueTransformer.TransformStringFromStorage(node.Value) body, _, err := w.valueTransformer.TransformStringFromStorage(node.Value)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -36,22 +36,24 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
utiltrace "k8s.io/apiserver/pkg/util/trace"
"k8s.io/apiserver/pkg/storage/etcd" "k8s.io/apiserver/pkg/storage/etcd"
utiltrace "k8s.io/apiserver/pkg/util/trace"
) )
// ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods // ValueTransformer allows a string value to be transformed before being read from or written to the underlying store. The methods
// must be able to undo the transformation caused by the other. // must be able to undo the transformation caused by the other.
type ValueTransformer interface { type ValueTransformer interface {
// TransformStringFromStorage may transform the provided string from its underlying storage representation or return an error. // TransformFromStorage may transform the provided data from its underlying storage representation or return an error.
TransformFromStorage([]byte) ([]byte, error) // Stale is true if the object on disk is stale and a write to etcd should be issued, even if the contents of the object
// TransformStringToStorage may transform the provided string into the appropriate form in storage or return an error. // have not changed.
TransformToStorage([]byte) ([]byte, error) TransformFromStorage([]byte) (data []byte, stale bool, err error)
// TransformToStorage may transform the provided data into the appropriate form in storage or return an error.
TransformToStorage([]byte) (data []byte, err error)
} }
type identityTransformer struct{} type identityTransformer struct{}
func (identityTransformer) TransformFromStorage(b []byte) ([]byte, error) { return b, nil } func (identityTransformer) TransformFromStorage(b []byte) ([]byte, bool, error) { return b, false, nil }
func (identityTransformer) TransformToStorage(b []byte) ([]byte, error) { return b, nil } func (identityTransformer) TransformToStorage(b []byte) ([]byte, error) { return b, nil }
// IdentityTransformer performs no transformation on the provided values. // IdentityTransformer performs no transformation on the provided values.
@ -79,6 +81,7 @@ type objState struct {
meta *storage.ResponseMeta meta *storage.ResponseMeta
rev int64 rev int64
data []byte data []byte
stale bool
} }
// New returns an etcd3 implementation of storage.Interface. // New returns an etcd3 implementation of storage.Interface.
@ -131,7 +134,7 @@ func (s *store) Get(ctx context.Context, key string, resourceVersion string, out
} }
kv := getResp.Kvs[0] kv := getResp.Kvs[0]
data, err := s.transformer.TransformFromStorage(kv.Value) data, _, err := s.transformer.TransformFromStorage(kv.Value)
if err != nil { if err != nil {
return storage.NewInternalError(err.Error()) return storage.NewInternalError(err.Error())
} }
@ -208,7 +211,7 @@ func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime
} }
kv := getResp.Kvs[0] kv := getResp.Kvs[0]
data, err := s.transformer.TransformFromStorage(kv.Value) data, _, err := s.transformer.TransformFromStorage(kv.Value)
if err != nil { if err != nil {
return storage.NewInternalError(err.Error()) return storage.NewInternalError(err.Error())
} }
@ -292,7 +295,7 @@ func (s *store) GuaranteedUpdate(
if err != nil { if err != nil {
return err return err
} }
if bytes.Equal(data, origState.data) { if !origState.stale && bytes.Equal(data, origState.data) {
return decode(s.codec, s.versioner, origState.data, out, origState.rev) return decode(s.codec, s.versioner, origState.data, out, origState.rev)
} }
@ -349,7 +352,7 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
if len(getResp.Kvs) == 0 { if len(getResp.Kvs) == 0 {
return nil return nil
} }
data, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value) data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value)
if err != nil { if err != nil {
return storage.NewInternalError(err.Error()) return storage.NewInternalError(err.Error())
} }
@ -384,7 +387,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
elems := make([]*elemForDecode, 0, len(getResp.Kvs)) elems := make([]*elemForDecode, 0, len(getResp.Kvs))
for _, kv := range getResp.Kvs { for _, kv := range getResp.Kvs {
data, err := s.transformer.TransformFromStorage(kv.Value) data, _, err := s.transformer.TransformFromStorage(kv.Value)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", key, err)) utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", key, err))
continue continue
@ -434,13 +437,14 @@ func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Va
return nil, err return nil, err
} }
} else { } else {
data, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value) data, stale, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value)
if err != nil { if err != nil {
return nil, storage.NewInternalError(err.Error()) return nil, storage.NewInternalError(err.Error())
} }
state.rev = getResp.Kvs[0].ModRevision state.rev = getResp.Kvs[0].ModRevision
state.meta.ResourceVersion = uint64(state.rev) state.meta.ResourceVersion = uint64(state.rev)
state.data = data state.data = data
state.stale = stale
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil { if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
return nil, err return nil, err
} }

View File

@ -52,14 +52,15 @@ func init() {
// prefixTransformer adds and verifies that all data has the correct prefix on its way in and out. // prefixTransformer adds and verifies that all data has the correct prefix on its way in and out.
type prefixTransformer struct { type prefixTransformer struct {
prefix []byte prefix []byte
stale bool
err error err error
} }
func (p prefixTransformer) TransformFromStorage(b []byte) ([]byte, error) { func (p prefixTransformer) TransformFromStorage(b []byte) ([]byte, bool, error) {
if !bytes.HasPrefix(b, p.prefix) { if !bytes.HasPrefix(b, p.prefix) {
return nil, fmt.Errorf("value does not have expected prefix: %s", string(b)) return nil, false, fmt.Errorf("value does not have expected prefix: %s", string(b))
} }
return bytes.TrimPrefix(b, p.prefix), p.err return bytes.TrimPrefix(b, p.prefix), p.stale, p.err
} }
func (p prefixTransformer) TransformToStorage(b []byte) ([]byte, error) { func (p prefixTransformer) TransformToStorage(b []byte) ([]byte, error) {
if len(b) > 0 { if len(b) > 0 {
@ -316,6 +317,7 @@ func TestGuaranteedUpdate(t *testing.T) {
expectNotFoundErr bool expectNotFoundErr bool
expectInvalidObjErr bool expectInvalidObjErr bool
expectNoUpdate bool expectNoUpdate bool
transformStale bool
}{{ // GuaranteedUpdate on non-existing key with ignoreNotFound=false }{{ // GuaranteedUpdate on non-existing key with ignoreNotFound=false
key: "/non-existing", key: "/non-existing",
ignoreNotFound: false, ignoreNotFound: false,
@ -344,6 +346,14 @@ func TestGuaranteedUpdate(t *testing.T) {
expectNotFoundErr: false, expectNotFoundErr: false,
expectInvalidObjErr: false, expectInvalidObjErr: false,
expectNoUpdate: true, expectNoUpdate: true,
}, { // GuaranteedUpdate with same data but stale
key: key,
ignoreNotFound: false,
precondition: nil,
expectNotFoundErr: false,
expectInvalidObjErr: false,
expectNoUpdate: false,
transformStale: true,
}, { // GuaranteedUpdate with UID match }, { // GuaranteedUpdate with UID match
key: key, key: key,
ignoreNotFound: false, ignoreNotFound: false,
@ -366,6 +376,12 @@ func TestGuaranteedUpdate(t *testing.T) {
if tt.expectNoUpdate { if tt.expectNoUpdate {
name = storeObj.Name name = storeObj.Name
} }
originalTransformer := store.transformer.(prefixTransformer)
if tt.transformStale {
transformer := originalTransformer
transformer.stale = true
store.transformer = transformer
}
version := storeObj.ResourceVersion version := storeObj.ResourceVersion
err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition, err := store.GuaranteedUpdate(ctx, tt.key, out, tt.ignoreNotFound, tt.precondition,
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
@ -378,6 +394,7 @@ func TestGuaranteedUpdate(t *testing.T) {
pod.Name = name pod.Name = name
return &pod, nil return &pod, nil
})) }))
store.transformer = originalTransformer
if tt.expectNotFoundErr { if tt.expectNotFoundErr {
if err == nil || !storage.IsNotFound(err) { if err == nil || !storage.IsNotFound(err) {

View File

@ -343,7 +343,7 @@ func (wc *watchChan) sendEvent(e *event) {
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) { func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
if !e.isDeleted { if !e.isDeleted {
data, err := wc.watcher.transformer.TransformFromStorage(e.value) data, _, err := wc.watcher.transformer.TransformFromStorage(e.value)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -358,7 +358,7 @@ func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtim
// we need the object only to compute whether it was filtered out // we need the object only to compute whether it was filtered out
// before). // before).
if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) { if len(e.prevValue) > 0 && (e.isDeleted || !wc.acceptAll()) {
data, err := wc.watcher.transformer.TransformFromStorage(e.prevValue) data, _, err := wc.watcher.transformer.TransformFromStorage(e.prevValue)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }