Allow update without resource version
This commit is contained in:
@@ -207,7 +207,7 @@ func (h *EtcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er
|
||||
}
|
||||
if h.Versioner != nil {
|
||||
// being unable to set the version does not prevent the object from being extracted
|
||||
_ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node)
|
||||
_ = h.Versioner.UpdateObject(obj.Interface().(runtime.Object), node.Expiration, node.ModifiedIndex)
|
||||
}
|
||||
v.Set(reflect.Append(v, obj.Elem()))
|
||||
if node.ModifiedIndex != 0 {
|
||||
@@ -377,7 +377,7 @@ func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
|
||||
body = node.Value
|
||||
err = h.Codec.DecodeInto([]byte(body), objPtr)
|
||||
if h.Versioner != nil {
|
||||
_ = h.Versioner.UpdateObject(objPtr, node)
|
||||
_ = h.Versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex)
|
||||
// being unable to set the version does not prevent the object from being extracted
|
||||
}
|
||||
return body, node, err
|
||||
@@ -492,6 +492,11 @@ type ResponseMeta struct {
|
||||
// zero or negative in some cases (objects may be expired after the requested
|
||||
// expiration time due to server lag).
|
||||
TTL int64
|
||||
// Expiration is the time at which the node that contained the returned object will expire and be deleted.
|
||||
// This can be nil if there is no expiration time set for the node.
|
||||
Expiration *time.Time
|
||||
// The resource version of the node that contained the returned object.
|
||||
ResourceVersion uint64
|
||||
}
|
||||
|
||||
// Pass an EtcdUpdateFunc to EtcdHelper.GuaranteedUpdate to make an etcd update that is guaranteed to succeed.
|
||||
@@ -525,7 +530,7 @@ func SimpleUpdate(fn SimpleEtcdUpdateFunc) EtcdUpdateFunc {
|
||||
// cur.Counter++
|
||||
//
|
||||
// // Return the modified object. Return an error to stop iterating. Return a uint64 to alter
|
||||
// // the TTL on the object, or nil to keep it the same value.
|
||||
// // the TTL on the object, or nil to keep it the same value.
|
||||
// return cur, nil, nil
|
||||
// })
|
||||
//
|
||||
@@ -545,8 +550,12 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
|
||||
meta := ResponseMeta{}
|
||||
if node != nil {
|
||||
meta.TTL = node.TTL
|
||||
if node.Expiration != nil {
|
||||
meta.Expiration = node.Expiration
|
||||
}
|
||||
meta.ResourceVersion = node.ModifiedIndex
|
||||
}
|
||||
|
||||
// Get the object to be written by calling tryUpdate.
|
||||
ret, newTTL, err := tryUpdate(obj, meta)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -589,9 +598,11 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
// Swap origBody with data, if origBody is the latest etcd data.
|
||||
response, err := h.Client.CompareAndSwap(key, string(data), ttl, origBody, index)
|
||||
recordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
|
||||
if IsEtcdTestFailed(err) {
|
||||
// Try again.
|
||||
continue
|
||||
}
|
||||
_, _, err = h.extractObj(response, err, ptrToType, false, false)
|
||||
|
@@ -270,7 +270,7 @@ func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
|
||||
|
||||
// ensure resource version is set on the object we load from etcd
|
||||
if w.versioner != nil {
|
||||
if err := w.versioner.UpdateObject(obj, node); err != nil {
|
||||
if err := w.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex); err != nil {
|
||||
glog.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err)
|
||||
}
|
||||
}
|
||||
|
@@ -18,12 +18,11 @@ package tools
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
// APIObjectVersioner implements versioning and extracting etcd node information
|
||||
@@ -31,18 +30,17 @@ import (
|
||||
type APIObjectVersioner struct{}
|
||||
|
||||
// UpdateObject implements EtcdVersioner
|
||||
func (a APIObjectVersioner) UpdateObject(obj runtime.Object, node *etcd.Node) error {
|
||||
func (a APIObjectVersioner) UpdateObject(obj runtime.Object, expiration *time.Time, resourceVersion uint64) error {
|
||||
objectMeta, err := api.ObjectMetaFor(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ttl := node.Expiration; ttl != nil {
|
||||
objectMeta.DeletionTimestamp = &util.Time{*node.Expiration}
|
||||
if expiration != nil {
|
||||
objectMeta.DeletionTimestamp = &util.Time{*expiration}
|
||||
}
|
||||
version := node.ModifiedIndex
|
||||
versionString := ""
|
||||
if version != 0 {
|
||||
versionString = strconv.FormatUint(version, 10)
|
||||
if resourceVersion != 0 {
|
||||
versionString = strconv.FormatUint(resourceVersion, 10)
|
||||
}
|
||||
objectMeta.ResourceVersion = versionString
|
||||
return nil
|
||||
|
@@ -22,7 +22,6 @@ import (
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
func TestObjectVersioner(t *testing.T) {
|
||||
@@ -34,7 +33,7 @@ func TestObjectVersioner(t *testing.T) {
|
||||
t.Errorf("unexpected version: %d %v", ver, err)
|
||||
}
|
||||
obj := &TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "a"}}
|
||||
if err := v.UpdateObject(obj, &etcd.Node{ModifiedIndex: 5}); err != nil {
|
||||
if err := v.UpdateObject(obj, nil, 5); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if obj.ResourceVersion != "5" || obj.DeletionTimestamp != nil {
|
||||
@@ -42,7 +41,7 @@ func TestObjectVersioner(t *testing.T) {
|
||||
}
|
||||
now := util.Time{time.Now()}
|
||||
obj = &TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "a"}}
|
||||
if err := v.UpdateObject(obj, &etcd.Node{ModifiedIndex: 5, Expiration: &now.Time}); err != nil {
|
||||
if err := v.UpdateObject(obj, &now.Time, 5); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if obj.ResourceVersion != "5" || *obj.DeletionTimestamp != now {
|
||||
|
@@ -19,6 +19,7 @@ package tools
|
||||
import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -66,7 +67,7 @@ type EtcdVersioner interface {
|
||||
// UpdateObject sets etcd storage metadata into an API object. Returns an error if the object
|
||||
// cannot be updated correctly. May return nil if the requested object does not need metadata
|
||||
// from etcd.
|
||||
UpdateObject(obj runtime.Object, node *etcd.Node) error
|
||||
UpdateObject(obj runtime.Object, expiration *time.Time, resourceVersion uint64) error
|
||||
// UpdateList sets the resource version into an API list object. Returns an error if the object
|
||||
// cannot be updated correctly. May return nil if the requested object does not need metadata
|
||||
// from etcd.
|
||||
|
Reference in New Issue
Block a user