Support TTL in genericetcd#Update

This commit is contained in:
Clayton Coleman 2015-03-04 18:22:48 -05:00
parent 9df2ea4aef
commit 143015025a
5 changed files with 49 additions and 34 deletions

View File

@ -322,9 +322,9 @@ func (r *Registry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) er
} }
// TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop. // TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop.
err = r.AtomicUpdate(key, &api.Endpoints{}, true, err = r.AtomicUpdate(key, &api.Endpoints{}, true,
func(input runtime.Object) (runtime.Object, error) { func(input runtime.Object) (runtime.Object, uint64, error) {
// TODO: racy - label query is returning different results for two simultaneous updaters // TODO: racy - label query is returning different results for two simultaneous updaters
return endpoints, nil return endpoints, 0, nil
}) })
return etcderr.InterpretUpdateError(err, "endpoints", endpoints.Name) return etcderr.InterpretUpdateError(err, "endpoints", endpoints.Name)
} }

View File

@ -251,35 +251,49 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
// TODO: expose TTL // TODO: expose TTL
creating := false creating := false
out := e.NewFunc() out := e.NewFunc()
err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, error) { err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, uint64, error) {
version, err := e.Helper.ResourceVersioner.ResourceVersion(existing) version, err := e.Helper.ResourceVersioner.ResourceVersion(existing)
if err != nil { if err != nil {
return nil, err return nil, 0, err
} }
if version == 0 { if version == 0 {
if !e.UpdateStrategy.AllowCreateOnUpdate() { if !e.UpdateStrategy.AllowCreateOnUpdate() {
return nil, kubeerr.NewNotFound(e.EndpointName, name) return nil, 0, kubeerr.NewNotFound(e.EndpointName, name)
} }
creating = true creating = true
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil { if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, err return nil, 0, err
} }
return obj, nil ttl := uint64(0)
if e.TTLFunc != nil {
ttl, err = e.TTLFunc(obj, true)
if err != nil {
return nil, 0, err
}
}
return obj, ttl, nil
} }
creating = false creating = false
newVersion, err := e.Helper.ResourceVersioner.ResourceVersion(obj) newVersion, err := e.Helper.ResourceVersioner.ResourceVersion(obj)
if err != nil { if err != nil {
return nil, err return nil, 0, err
} }
if newVersion != version { if newVersion != version {
// TODO: return the most recent version to a client? // TODO: return the most recent version to a client?
return nil, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version)) return nil, 0, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version))
} }
if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil { if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
return nil, err return nil, 0, err
} }
return obj, nil ttl := uint64(0)
if e.TTLFunc != nil {
ttl, err = e.TTLFunc(obj, false)
if err != nil {
return nil, 0, err
}
}
return obj, ttl, nil
}) })
if err != nil { if err != nil {

View File

@ -160,18 +160,18 @@ func (r *BindingREST) setPodHostTo(ctx api.Context, podID, oldMachine, machine s
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = r.store.Helper.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, error) { err = r.store.Helper.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, uint64, error) {
pod, ok := obj.(*api.Pod) pod, ok := obj.(*api.Pod)
if !ok { if !ok {
return nil, fmt.Errorf("unexpected object: %#v", obj) return nil, 0, fmt.Errorf("unexpected object: %#v", obj)
} }
if pod.Spec.Host != oldMachine || pod.Status.Host != oldMachine { if pod.Spec.Host != oldMachine || pod.Status.Host != oldMachine {
return nil, fmt.Errorf("pod %v is already assigned to host %q or %q", pod.Name, pod.Spec.Host, pod.Status.Host) return nil, 0, fmt.Errorf("pod %v is already assigned to host %q or %q", pod.Name, pod.Spec.Host, pod.Status.Host)
} }
pod.Spec.Host = machine pod.Spec.Host = machine
pod.Status.Host = machine pod.Status.Host = machine
finalPod = pod finalPod = pod
return pod, nil return pod, 0, nil
}) })
return finalPod, err return finalPod, err
} }

View File

@ -357,7 +357,7 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err
// Pass an EtcdUpdateFunc to EtcdHelper.AtomicUpdate to make an atomic etcd update. // Pass an EtcdUpdateFunc to EtcdHelper.AtomicUpdate to make an atomic etcd update.
// See the comment for AtomicUpdate for more detail. // See the comment for AtomicUpdate for more detail.
type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, err error) type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, ttl uint64, err error)
// AtomicUpdate generalizes the pattern that allows for making atomic updates to etcd objects. // AtomicUpdate generalizes the pattern that allows for making atomic updates to etcd objects.
// Note, tryUpdate may be called more than once. // Note, tryUpdate may be called more than once.
@ -365,7 +365,7 @@ type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, err error
// Example: // Example:
// //
// h := &util.EtcdHelper{client, encoding, versioning} // h := &util.EtcdHelper{client, encoding, versioning}
// err := h.AtomicUpdate("myKey", &MyType{}, true, func(input runtime.Object) (runtime.Object, error) { // err := h.AtomicUpdate("myKey", &MyType{}, true, func(input runtime.Object) (runtime.Object, uint64, error) {
// // Before this function is called, currentObj has been reset to etcd's current // // Before this function is called, currentObj has been reset to etcd's current
// // contents for "myKey". // // contents for "myKey".
// //
@ -374,8 +374,9 @@ type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, err error
// // Make a *modification*. // // Make a *modification*.
// cur.Counter++ // cur.Counter++
// //
// // Return the modified object. Return an error to stop iterating. // // Return the modified object. Return an error to stop iterating. Return a non-zero uint64 to set
// return cur, nil // // the TTL on the object.
// return cur, 0, nil
// }) // })
// //
func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate EtcdUpdateFunc) error { func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate EtcdUpdateFunc) error {
@ -391,7 +392,7 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNo
return err return err
} }
ret, err := tryUpdate(obj) ret, ttl, err := tryUpdate(obj)
if err != nil { if err != nil {
return err return err
} }
@ -403,7 +404,7 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNo
// First time this key has been used, try creating new value. // First time this key has been used, try creating new value.
if index == 0 { if index == 0 {
response, err := h.Client.Create(key, string(data), 0) response, err := h.Client.Create(key, string(data), ttl)
if IsEtcdNodeExist(err) { if IsEtcdNodeExist(err) {
continue continue
} }
@ -415,7 +416,7 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNo
return nil return nil
} }
response, err := h.Client.CompareAndSwap(key, string(data), 0, origBody, index) response, err := h.Client.CompareAndSwap(key, string(data), ttl, origBody, index)
if IsEtcdTestFailed(err) { if IsEtcdTestFailed(err) {
continue continue
} }

View File

@ -505,8 +505,8 @@ func TestAtomicUpdate(t *testing.T) {
// Create a new node. // Create a new node.
fakeClient.ExpectNotFoundGet("/some/key") fakeClient.ExpectNotFoundGet("/some/key")
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) { err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
return obj, nil return obj, 0, nil
}) })
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
@ -524,14 +524,14 @@ func TestAtomicUpdate(t *testing.T) {
// Update an existing node. // Update an existing node.
callbackCalled := false callbackCalled := false
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2} objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2}
err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) { err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
callbackCalled = true callbackCalled = true
if in.(*TestResource).Value != 1 { if in.(*TestResource).Value != 1 {
t.Errorf("Callback input was not current set value") t.Errorf("Callback input was not current set value")
} }
return objUpdate, nil return objUpdate, 0, nil
}) })
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
@ -559,8 +559,8 @@ func TestAtomicUpdateNoChange(t *testing.T) {
// Create a new node. // Create a new node.
fakeClient.ExpectNotFoundGet("/some/key") fakeClient.ExpectNotFoundGet("/some/key")
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) { err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
return obj, nil return obj, 0, nil
}) })
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)
@ -569,10 +569,10 @@ func TestAtomicUpdateNoChange(t *testing.T) {
// Update an existing node with the same data // Update an existing node with the same data
callbackCalled := false callbackCalled := false
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) { err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
fakeClient.Err = errors.New("should not be called") fakeClient.Err = errors.New("should not be called")
callbackCalled = true callbackCalled = true
return objUpdate, nil return objUpdate, 0, nil
}) })
if err != nil { if err != nil {
t.Fatalf("Unexpected error %#v", err) t.Fatalf("Unexpected error %#v", err)
@ -591,8 +591,8 @@ func TestAtomicUpdateKeyNotFound(t *testing.T) {
fakeClient.ExpectNotFoundGet("/some/key") fakeClient.ExpectNotFoundGet("/some/key")
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
f := func(in runtime.Object) (runtime.Object, error) { f := func(in runtime.Object) (runtime.Object, uint64, error) {
return obj, nil return obj, 0, nil
} }
ignoreNotFound := false ignoreNotFound := false
@ -627,7 +627,7 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) {
defer wgDone.Done() defer wgDone.Done()
firstCall := true firstCall := true
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) { err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) {
defer func() { firstCall = false }() defer func() { firstCall = false }()
if firstCall { if firstCall {
@ -638,7 +638,7 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) {
currValue := in.(*TestResource).Value currValue := in.(*TestResource).Value
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: currValue + 1} obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: currValue + 1}
return obj, nil return obj, 0, nil
}) })
if err != nil { if err != nil {
t.Errorf("Unexpected error %#v", err) t.Errorf("Unexpected error %#v", err)