Make volume binder resilient to races: unit tests
This commit is contained in:
@@ -29,11 +29,13 @@ import (
|
||||
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
storage "k8s.io/api/storage/v1"
|
||||
apierrs "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
@@ -136,6 +138,7 @@ type volumeReactor struct {
|
||||
fakeClaimWatch *watch.FakeWatcher
|
||||
lock sync.Mutex
|
||||
errors []reactorError
|
||||
watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher
|
||||
}
|
||||
|
||||
// reactorError is an error that is returned by test reactor (=simulated
|
||||
@@ -189,11 +192,34 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
|
||||
|
||||
// Store the updated object to appropriate places.
|
||||
r.volumes[volume.Name] = volume
|
||||
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
|
||||
w.Add(volume)
|
||||
}
|
||||
r.changedObjects = append(r.changedObjects, volume)
|
||||
r.changedSinceLastSync++
|
||||
klog.V(4).Infof("created volume %s", volume.Name)
|
||||
return true, volume, nil
|
||||
|
||||
case action.Matches("create", "persistentvolumeclaims"):
|
||||
obj := action.(core.UpdateAction).GetObject()
|
||||
claim := obj.(*v1.PersistentVolumeClaim)
|
||||
|
||||
// check the claim does not exist
|
||||
_, found := r.claims[claim.Name]
|
||||
if found {
|
||||
return true, nil, fmt.Errorf("Cannot create claim %s: claim already exists", claim.Name)
|
||||
}
|
||||
|
||||
// Store the updated object to appropriate places.
|
||||
r.claims[claim.Name] = claim
|
||||
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
|
||||
w.Add(claim)
|
||||
}
|
||||
r.changedObjects = append(r.changedObjects, claim)
|
||||
r.changedSinceLastSync++
|
||||
klog.V(4).Infof("created claim %s", claim.Name)
|
||||
return true, claim, nil
|
||||
|
||||
case action.Matches("update", "persistentvolumes"):
|
||||
obj := action.(core.UpdateAction).GetObject()
|
||||
volume := obj.(*v1.PersistentVolume)
|
||||
@@ -206,6 +232,10 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
|
||||
if storedVer != requestedVer {
|
||||
return true, obj, versionConflictError
|
||||
}
|
||||
if reflect.DeepEqual(storedVolume, volume) {
|
||||
klog.V(4).Infof("nothing updated volume %s", volume.Name)
|
||||
return true, volume, nil
|
||||
}
|
||||
// Don't modify the existing object
|
||||
volume = volume.DeepCopy()
|
||||
volume.ResourceVersion = strconv.Itoa(storedVer + 1)
|
||||
@@ -214,6 +244,9 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
|
||||
}
|
||||
|
||||
// Store the updated object to appropriate places.
|
||||
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
|
||||
w.Modify(volume)
|
||||
}
|
||||
r.volumes[volume.Name] = volume
|
||||
r.changedObjects = append(r.changedObjects, volume)
|
||||
r.changedSinceLastSync++
|
||||
@@ -232,6 +265,10 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
|
||||
if storedVer != requestedVer {
|
||||
return true, obj, versionConflictError
|
||||
}
|
||||
if reflect.DeepEqual(storedClaim, claim) {
|
||||
klog.V(4).Infof("nothing updated claim %s", claim.Name)
|
||||
return true, claim, nil
|
||||
}
|
||||
// Don't modify the existing object
|
||||
claim = claim.DeepCopy()
|
||||
claim.ResourceVersion = strconv.Itoa(storedVer + 1)
|
||||
@@ -240,6 +277,9 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
|
||||
}
|
||||
|
||||
// Store the updated object to appropriate places.
|
||||
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
|
||||
w.Modify(claim)
|
||||
}
|
||||
r.claims[claim.Name] = claim
|
||||
r.changedObjects = append(r.changedObjects, claim)
|
||||
r.changedSinceLastSync++
|
||||
@@ -251,18 +291,32 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
|
||||
volume, found := r.volumes[name]
|
||||
if found {
|
||||
klog.V(4).Infof("GetVolume: found %s", volume.Name)
|
||||
return true, volume, nil
|
||||
return true, volume.DeepCopy(), nil
|
||||
} else {
|
||||
klog.V(4).Infof("GetVolume: volume %s not found", name)
|
||||
return true, nil, fmt.Errorf("Cannot find volume %s", name)
|
||||
}
|
||||
|
||||
case action.Matches("get", "persistentvolumeclaims"):
|
||||
name := action.(core.GetAction).GetName()
|
||||
claim, found := r.claims[name]
|
||||
if found {
|
||||
klog.V(4).Infof("GetClaim: found %s", claim.Name)
|
||||
return true, claim.DeepCopy(), nil
|
||||
} else {
|
||||
klog.V(4).Infof("GetClaim: claim %s not found", name)
|
||||
return true, nil, apierrs.NewNotFound(action.GetResource().GroupResource(), name)
|
||||
}
|
||||
|
||||
case action.Matches("delete", "persistentvolumes"):
|
||||
name := action.(core.DeleteAction).GetName()
|
||||
klog.V(4).Infof("deleted volume %s", name)
|
||||
_, found := r.volumes[name]
|
||||
obj, found := r.volumes[name]
|
||||
if found {
|
||||
delete(r.volumes, name)
|
||||
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
|
||||
w.Delete(obj)
|
||||
}
|
||||
r.changedSinceLastSync++
|
||||
return true, nil, nil
|
||||
} else {
|
||||
@@ -272,9 +326,12 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
|
||||
case action.Matches("delete", "persistentvolumeclaims"):
|
||||
name := action.(core.DeleteAction).GetName()
|
||||
klog.V(4).Infof("deleted claim %s", name)
|
||||
_, found := r.volumes[name]
|
||||
obj, found := r.claims[name]
|
||||
if found {
|
||||
delete(r.claims, name)
|
||||
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
|
||||
w.Delete(obj)
|
||||
}
|
||||
r.changedSinceLastSync++
|
||||
return true, nil, nil
|
||||
} else {
|
||||
@@ -285,6 +342,36 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
|
||||
return false, nil, nil
|
||||
}
|
||||
|
||||
// Watch watches objects from the volumeReactor. Watch returns a channel which
|
||||
// will push added / modified / deleted object.
|
||||
func (r *volumeReactor) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
fakewatcher := watch.NewRaceFreeFake()
|
||||
|
||||
if _, exists := r.watchers[gvr]; !exists {
|
||||
r.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher)
|
||||
}
|
||||
r.watchers[gvr][ns] = append(r.watchers[gvr][ns], fakewatcher)
|
||||
return fakewatcher, nil
|
||||
}
|
||||
|
||||
func (r *volumeReactor) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher {
|
||||
watches := []*watch.RaceFreeFakeWatcher{}
|
||||
if r.watchers[gvr] != nil {
|
||||
if w := r.watchers[gvr][ns]; w != nil {
|
||||
watches = append(watches, w...)
|
||||
}
|
||||
if ns != metav1.NamespaceAll {
|
||||
if w := r.watchers[gvr][metav1.NamespaceAll]; w != nil {
|
||||
watches = append(watches, w...)
|
||||
}
|
||||
}
|
||||
}
|
||||
return watches
|
||||
}
|
||||
|
||||
// injectReactError returns an error when the test requested given action to
|
||||
// fail. nil is returned otherwise.
|
||||
func (r *volumeReactor) injectReactError(action core.Action) error {
|
||||
@@ -596,11 +683,14 @@ func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController,
|
||||
fakeVolumeWatch: fakeVolumeWatch,
|
||||
fakeClaimWatch: fakeClaimWatch,
|
||||
errors: errors,
|
||||
watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher),
|
||||
}
|
||||
client.AddReactor("create", "persistentvolumes", reactor.React)
|
||||
client.AddReactor("create", "persistentvolumeclaims", reactor.React)
|
||||
client.AddReactor("update", "persistentvolumes", reactor.React)
|
||||
client.AddReactor("update", "persistentvolumeclaims", reactor.React)
|
||||
client.AddReactor("get", "persistentvolumes", reactor.React)
|
||||
client.AddReactor("get", "persistentvolumeclaims", reactor.React)
|
||||
client.AddReactor("delete", "persistentvolumes", reactor.React)
|
||||
client.AddReactor("delete", "persistentvolumeclaims", reactor.React)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user