Merge pull request #75434 from cofyc/fix56098

Refactor PV scheduling library into a separate package
This commit is contained in:
Kubernetes Prow Robot
2019-05-03 22:17:38 -07:00
committed by GitHub
33 changed files with 1314 additions and 1049 deletions

View File

@@ -12,7 +12,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//cmd/kube-scheduler/app/config:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/client/leaderelectionconfig:go_default_library",
"//pkg/master/ports:go_default_library",
"//pkg/scheduler/apis/config:go_default_library",
@@ -29,6 +28,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library",

View File

@@ -30,6 +30,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
@@ -42,7 +43,6 @@ import (
"k8s.io/klog"
kubeschedulerconfigv1alpha1 "k8s.io/kube-scheduler/config/v1alpha1"
schedulerappconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/client/leaderelectionconfig"
"k8s.io/kubernetes/pkg/master/ports"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
@@ -238,7 +238,7 @@ func (o *Options) Config() (*schedulerappconfig.Config, error) {
// Prepare event clients.
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, corev1.EventSource{Component: c.ComponentConfig.SchedulerName})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: c.ComponentConfig.SchedulerName})
// Set up leader election if enabled.
var leaderElectionConfig *leaderelection.LeaderElectionConfig

View File

@@ -107,6 +107,8 @@ pkg/controller/volume/expand
pkg/controller/volume/persistentvolume
pkg/controller/volume/persistentvolume/config/v1alpha1
pkg/controller/volume/persistentvolume/options
pkg/controller/volume/persistentvolume/testing
pkg/controller/volume/scheduling
pkg/credentialprovider
pkg/credentialprovider/gcp
pkg/features

View File

@@ -146,6 +146,7 @@ filegroup(
"//pkg/controller/volume/protectionutil:all-srcs",
"//pkg/controller/volume/pvcprotection:all-srcs",
"//pkg/controller/volume/pvprotection:all-srcs",
"//pkg/controller/volume/scheduling:all-srcs",
],
tags = ["automanaged"],
)

View File

@@ -12,12 +12,6 @@ go_library(
"index.go",
"pv_controller.go",
"pv_controller_base.go",
"scheduler_assume_cache.go",
"scheduler_bind_cache_metrics.go",
"scheduler_binder.go",
"scheduler_binder_cache.go",
"scheduler_binder_fake.go",
"util.go",
"volume_host.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/volume/persistentvolume",
@@ -26,6 +20,7 @@ go_library(
"//pkg/controller:go_default_library",
"//pkg/controller/volume/events:go_default_library",
"//pkg/controller/volume/persistentvolume/metrics:go_default_library",
"//pkg/controller/volume/persistentvolume/util:go_default_library",
"//pkg/features:go_default_library",
"//pkg/util/goroutinemap:go_default_library",
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
@@ -39,14 +34,12 @@ go_library(
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
@@ -62,7 +55,6 @@ go_library(
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/errors:go_default_library",
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
@@ -77,15 +69,14 @@ go_test(
"provision_test.go",
"pv_controller_test.go",
"recycle_test.go",
"scheduler_assume_cache_test.go",
"scheduler_binder_cache_test.go",
"scheduler_binder_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/api/testapi:go_default_library",
"//pkg/apis/core:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/volume/persistentvolume/testing:go_default_library",
"//pkg/controller/volume/persistentvolume/util:go_default_library",
"//pkg/features:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
@@ -95,16 +86,12 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
@@ -132,6 +119,8 @@ filegroup(
"//pkg/controller/volume/persistentvolume/config:all-srcs",
"//pkg/controller/volume/persistentvolume/metrics:all-srcs",
"//pkg/controller/volume/persistentvolume/options:all-srcs",
"//pkg/controller/volume/persistentvolume/testing:all-srcs",
"//pkg/controller/volume/persistentvolume/util:all-srcs",
],
tags = ["automanaged"],
)

View File

@@ -22,6 +22,7 @@ import (
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
)
// Test single call to syncVolume, expecting recycling to happen.
@@ -91,11 +92,9 @@ func TestDeleteSync(t *testing.T) {
noclaims,
noclaims,
noevents, noerrors,
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationDelete, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *volumeReactor) {
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationDelete, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) {
// Delete the volume before delete operation starts
reactor.lock.Lock()
delete(reactor.volumes, "volume8-6")
reactor.lock.Unlock()
reactor.DeleteVolume("volume8-6")
}),
},
{
@@ -108,16 +107,12 @@ func TestDeleteSync(t *testing.T) {
noclaims,
newClaimArray("claim8-7", "uid8-7", "10Gi", "volume8-7", v1.ClaimBound, nil),
noevents, noerrors,
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationDelete, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *volumeReactor) {
reactor.lock.Lock()
defer reactor.lock.Unlock()
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationDelete, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) {
// Bind the volume to resurrected claim (this should never
// happen)
claim := newClaim("claim8-7", "uid8-7", "10Gi", "volume8-7", v1.ClaimBound, nil)
reactor.claims[claim.Name] = claim
reactor.AddClaimBoundToVolume(claim)
ctrl.claims.Add(claim)
volume := reactor.volumes["volume8-7"]
volume.Status.Phase = v1.VolumeBound
}),
},
{
@@ -141,7 +136,7 @@ func TestDeleteSync(t *testing.T) {
noclaims,
noclaims,
noevents, noerrors,
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
// Inject external deleter annotation
test.initialVolumes[0].Annotations[annDynamicallyProvisioned] = "external.io/test"
test.expectedVolumes[0].Annotations[annDynamicallyProvisioned] = "external.io/test"
@@ -184,7 +179,7 @@ func TestDeleteSync(t *testing.T) {
newClaimArray("claim8-12", "uid8-12", "10Gi", "volume8-12-2", v1.ClaimBound, nil),
newClaimArray("claim8-12", "uid8-12", "10Gi", "volume8-12-2", v1.ClaimBound, nil),
noevents, noerrors,
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
// Inject external deleter annotation
test.initialVolumes[0].Annotations[annDynamicallyProvisioned] = "external.io/test"
test.expectedVolumes[0].Annotations[annDynamicallyProvisioned] = "external.io/test"

View File

@@ -17,12 +17,9 @@ limitations under the License.
package persistentvolume
import (
"errors"
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@@ -31,27 +28,21 @@ import (
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/features"
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
vol "k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/recyclerclient"
)
@@ -95,366 +86,75 @@ type controllerTest struct {
// event message.
expectedEvents []string
// Errors to produce on matching action
errors []reactorError
errors []pvtesting.ReactorError
// Function to call as the test.
test testCall
}
type testCall func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error
type testCall func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error
const testNamespace = "default"
const mockPluginName = "kubernetes.io/mock-volume"
var versionConflictError = errors.New("VersionError")
var novolumes []*v1.PersistentVolume
var noclaims []*v1.PersistentVolumeClaim
var noevents = []string{}
var noerrors = []reactorError{}
var noerrors = []pvtesting.ReactorError{}
// volumeReactor is a core.Reactor that simulates etcd and API server. It
// stores:
// - Latest version of claims volumes saved by the controller.
// - Queue of all saves (to simulate "volume/claim updated" events). This queue
// contains all intermediate state of an object - e.g. a claim.VolumeName
// is updated first and claim.Phase second. This queue will then contain both
// updates as separate entries.
// - Number of changes since the last call to volumeReactor.syncAll().
// - Optionally, volume and claim fake watchers which should be the same ones
// used by the controller. Any time an event function like deleteVolumeEvent
// is called to simulate an event, the reactor's stores are updated and the
// controller is sent the event via the fake watcher.
// - Optionally, list of error that should be returned by reactor, simulating
// etcd / API server failures. These errors are evaluated in order and every
// error is returned only once. I.e. when the reactor finds matching
// reactorError, it return appropriate error and removes the reactorError from
// the list.
type volumeReactor struct {
volumes map[string]*v1.PersistentVolume
claims map[string]*v1.PersistentVolumeClaim
changedObjects []interface{}
changedSinceLastSync int
ctrl *PersistentVolumeController
fakeVolumeWatch *watch.FakeWatcher
fakeClaimWatch *watch.FakeWatcher
lock sync.Mutex
errors []reactorError
watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher
*pvtesting.VolumeReactor
ctrl *PersistentVolumeController
}
// reactorError is an error that is returned by test reactor (=simulated
// etcd+/API server) when an action performed by the reactor matches given verb
// ("get", "update", "create", "delete" or "*"") on given resource
// ("persistentvolumes", "persistentvolumeclaims" or "*").
type reactorError struct {
verb string
resource string
error error
func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, fakeVolumeWatch, fakeClaimWatch *watch.FakeWatcher, errors []pvtesting.ReactorError) *volumeReactor {
return &volumeReactor{
pvtesting.NewVolumeReactor(client, fakeVolumeWatch, fakeClaimWatch, errors),
ctrl,
}
}
// React is a callback called by fake kubeClient from the controller.
// In other words, every claim/volume change performed by the controller ends
// here.
// This callback checks versions of the updated objects and refuse those that
// are too old (simulating real etcd).
// All updated objects are stored locally to keep track of object versions and
// to evaluate test results.
// All updated objects are also inserted into changedObjects queue and
// optionally sent back to the controller via its watchers.
func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Object, err error) {
r.lock.Lock()
defer r.lock.Unlock()
klog.V(4).Infof("reactor got operation %q on %q", action.GetVerb(), action.GetResource())
// Inject error when requested
err = r.injectReactError(action)
if err != nil {
return true, nil, err
// waitForIdle waits until all tests, controllers and other goroutines do their
// job and no new actions are registered for 10 milliseconds.
func (r *volumeReactor) waitForIdle() {
r.ctrl.runningOperations.WaitForCompletion()
// Check every 10ms if the controller does something and stop if it's
// idle.
oldChanges := -1
for {
time.Sleep(10 * time.Millisecond)
changes := r.GetChangeCount()
if changes == oldChanges {
// No changes for last 10ms -> controller must be idle.
break
}
oldChanges = changes
}
// Test did not request to inject an error, continue simulating API server.
switch {
case action.Matches("create", "persistentvolumes"):
obj := action.(core.UpdateAction).GetObject()
volume := obj.(*v1.PersistentVolume)
// check the volume does not exist
_, found := r.volumes[volume.Name]
if found {
return true, nil, fmt.Errorf("Cannot create volume %s: volume already exists", volume.Name)
}
// mimic apiserver defaulting
if volume.Spec.VolumeMode == nil && utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
volume.Spec.VolumeMode = new(v1.PersistentVolumeMode)
*volume.Spec.VolumeMode = v1.PersistentVolumeFilesystem
}
// Store the updated object to appropriate places.
r.volumes[volume.Name] = volume
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Add(volume)
}
r.changedObjects = append(r.changedObjects, volume)
r.changedSinceLastSync++
klog.V(4).Infof("created volume %s", volume.Name)
return true, volume, nil
case action.Matches("create", "persistentvolumeclaims"):
obj := action.(core.UpdateAction).GetObject()
claim := obj.(*v1.PersistentVolumeClaim)
// check the claim does not exist
_, found := r.claims[claim.Name]
if found {
return true, nil, fmt.Errorf("Cannot create claim %s: claim already exists", claim.Name)
}
// Store the updated object to appropriate places.
r.claims[claim.Name] = claim
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Add(claim)
}
r.changedObjects = append(r.changedObjects, claim)
r.changedSinceLastSync++
klog.V(4).Infof("created claim %s", claim.Name)
return true, claim, nil
case action.Matches("update", "persistentvolumes"):
obj := action.(core.UpdateAction).GetObject()
volume := obj.(*v1.PersistentVolume)
// Check and bump object version
storedVolume, found := r.volumes[volume.Name]
if found {
storedVer, _ := strconv.Atoi(storedVolume.ResourceVersion)
requestedVer, _ := strconv.Atoi(volume.ResourceVersion)
if storedVer != requestedVer {
return true, obj, versionConflictError
}
if reflect.DeepEqual(storedVolume, volume) {
klog.V(4).Infof("nothing updated volume %s", volume.Name)
return true, volume, nil
}
// Don't modify the existing object
volume = volume.DeepCopy()
volume.ResourceVersion = strconv.Itoa(storedVer + 1)
} else {
return true, nil, fmt.Errorf("Cannot update volume %s: volume not found", volume.Name)
}
// Store the updated object to appropriate places.
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Modify(volume)
}
r.volumes[volume.Name] = volume
r.changedObjects = append(r.changedObjects, volume)
r.changedSinceLastSync++
klog.V(4).Infof("saved updated volume %s", volume.Name)
return true, volume, nil
case action.Matches("update", "persistentvolumeclaims"):
obj := action.(core.UpdateAction).GetObject()
claim := obj.(*v1.PersistentVolumeClaim)
// Check and bump object version
storedClaim, found := r.claims[claim.Name]
if found {
storedVer, _ := strconv.Atoi(storedClaim.ResourceVersion)
requestedVer, _ := strconv.Atoi(claim.ResourceVersion)
if storedVer != requestedVer {
return true, obj, versionConflictError
}
if reflect.DeepEqual(storedClaim, claim) {
klog.V(4).Infof("nothing updated claim %s", claim.Name)
return true, claim, nil
}
// Don't modify the existing object
claim = claim.DeepCopy()
claim.ResourceVersion = strconv.Itoa(storedVer + 1)
} else {
return true, nil, fmt.Errorf("Cannot update claim %s: claim not found", claim.Name)
}
// Store the updated object to appropriate places.
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Modify(claim)
}
r.claims[claim.Name] = claim
r.changedObjects = append(r.changedObjects, claim)
r.changedSinceLastSync++
klog.V(4).Infof("saved updated claim %s", claim.Name)
return true, claim, nil
case action.Matches("get", "persistentvolumes"):
name := action.(core.GetAction).GetName()
volume, found := r.volumes[name]
if found {
klog.V(4).Infof("GetVolume: found %s", volume.Name)
return true, volume.DeepCopy(), nil
} else {
klog.V(4).Infof("GetVolume: volume %s not found", name)
return true, nil, fmt.Errorf("Cannot find volume %s", name)
}
case action.Matches("get", "persistentvolumeclaims"):
name := action.(core.GetAction).GetName()
claim, found := r.claims[name]
if found {
klog.V(4).Infof("GetClaim: found %s", claim.Name)
return true, claim.DeepCopy(), nil
} else {
klog.V(4).Infof("GetClaim: claim %s not found", name)
return true, nil, apierrs.NewNotFound(action.GetResource().GroupResource(), name)
}
case action.Matches("delete", "persistentvolumes"):
name := action.(core.DeleteAction).GetName()
klog.V(4).Infof("deleted volume %s", name)
obj, found := r.volumes[name]
if found {
delete(r.volumes, name)
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Delete(obj)
}
r.changedSinceLastSync++
return true, nil, nil
} else {
return true, nil, fmt.Errorf("Cannot delete volume %s: not found", name)
}
case action.Matches("delete", "persistentvolumeclaims"):
name := action.(core.DeleteAction).GetName()
klog.V(4).Infof("deleted claim %s", name)
obj, found := r.claims[name]
if found {
delete(r.claims, name)
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Delete(obj)
}
r.changedSinceLastSync++
return true, nil, nil
} else {
return true, nil, fmt.Errorf("Cannot delete claim %s: not found", name)
}
}
return false, nil, nil
}
// Watch watches objects from the volumeReactor. Watch returns a channel which
// will push added / modified / deleted object.
func (r *volumeReactor) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) {
r.lock.Lock()
defer r.lock.Unlock()
fakewatcher := watch.NewRaceFreeFake()
if _, exists := r.watchers[gvr]; !exists {
r.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher)
// waitTest waits until all tests, controllers and other goroutines do their
// job and list of current volumes/claims is equal to list of expected
// volumes/claims (with ~10 second timeout).
func (r *volumeReactor) waitTest(test controllerTest) error {
// start with 10 ms, multiply by 2 each step, 10 steps = 10.23 seconds
backoff := wait.Backoff{
Duration: 10 * time.Millisecond,
Jitter: 0,
Factor: 2,
Steps: 10,
}
r.watchers[gvr][ns] = append(r.watchers[gvr][ns], fakewatcher)
return fakewatcher, nil
}
err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
// Finish all operations that are in progress
r.ctrl.runningOperations.WaitForCompletion()
func (r *volumeReactor) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher {
watches := []*watch.RaceFreeFakeWatcher{}
if r.watchers[gvr] != nil {
if w := r.watchers[gvr][ns]; w != nil {
watches = append(watches, w...)
// Return 'true' if the reactor reached the expected state
err1 := r.CheckClaims(test.expectedClaims)
err2 := r.CheckVolumes(test.expectedVolumes)
if err1 == nil && err2 == nil {
return true, nil
}
if ns != metav1.NamespaceAll {
if w := r.watchers[gvr][metav1.NamespaceAll]; w != nil {
watches = append(watches, w...)
}
}
}
return watches
}
// injectReactError returns an error when the test requested given action to
// fail. nil is returned otherwise.
func (r *volumeReactor) injectReactError(action core.Action) error {
if len(r.errors) == 0 {
// No more errors to inject, everything should succeed.
return nil
}
for i, expected := range r.errors {
klog.V(4).Infof("trying to match %q %q with %q %q", expected.verb, expected.resource, action.GetVerb(), action.GetResource())
if action.Matches(expected.verb, expected.resource) {
// That's the action we're waiting for, remove it from injectedErrors
r.errors = append(r.errors[:i], r.errors[i+1:]...)
klog.V(4).Infof("reactor found matching error at index %d: %q %q, returning %v", i, expected.verb, expected.resource, expected.error)
return expected.error
}
}
return nil
}
// checkVolumes compares all expectedVolumes with set of volumes at the end of
// the test and reports differences.
func (r *volumeReactor) checkVolumes(expectedVolumes []*v1.PersistentVolume) error {
r.lock.Lock()
defer r.lock.Unlock()
expectedMap := make(map[string]*v1.PersistentVolume)
gotMap := make(map[string]*v1.PersistentVolume)
// Clear any ResourceVersion from both sets
for _, v := range expectedVolumes {
// Don't modify the existing object
v := v.DeepCopy()
v.ResourceVersion = ""
if v.Spec.ClaimRef != nil {
v.Spec.ClaimRef.ResourceVersion = ""
}
expectedMap[v.Name] = v
}
for _, v := range r.volumes {
// We must clone the volume because of golang race check - it was
// written by the controller without any locks on it.
v := v.DeepCopy()
v.ResourceVersion = ""
if v.Spec.ClaimRef != nil {
v.Spec.ClaimRef.ResourceVersion = ""
}
gotMap[v.Name] = v
}
if !reflect.DeepEqual(expectedMap, gotMap) {
// Print ugly but useful diff of expected and received objects for
// easier debugging.
return fmt.Errorf("Volume check failed [A-expected, B-got]: %s", diff.ObjectDiff(expectedMap, gotMap))
}
return nil
}
// checkClaims compares all expectedClaims with set of claims at the end of the
// test and reports differences.
func (r *volumeReactor) checkClaims(expectedClaims []*v1.PersistentVolumeClaim) error {
r.lock.Lock()
defer r.lock.Unlock()
expectedMap := make(map[string]*v1.PersistentVolumeClaim)
gotMap := make(map[string]*v1.PersistentVolumeClaim)
for _, c := range expectedClaims {
// Don't modify the existing object
c = c.DeepCopy()
c.ResourceVersion = ""
expectedMap[c.Name] = c
}
for _, c := range r.claims {
// We must clone the claim because of golang race check - it was
// written by the controller without any locks on it.
c = c.DeepCopy()
c.ResourceVersion = ""
gotMap[c.Name] = c
}
if !reflect.DeepEqual(expectedMap, gotMap) {
// Print ugly but useful diff of expected and received objects for
// easier debugging.
return fmt.Errorf("Claim check failed [A-expected, B-got result]: %s", diff.ObjectDiff(expectedMap, gotMap))
}
return nil
return false, nil
})
return err
}
// checkEvents compares all expectedEvents with events generated during the test
@@ -506,196 +206,6 @@ func checkEvents(t *testing.T, expectedEvents []string, ctrl *PersistentVolumeCo
return err
}
// popChange returns one recorded updated object, either *v1.PersistentVolume
// or *v1.PersistentVolumeClaim. Returns nil when there are no changes.
func (r *volumeReactor) popChange() interface{} {
r.lock.Lock()
defer r.lock.Unlock()
if len(r.changedObjects) == 0 {
return nil
}
// For debugging purposes, print the queue
for _, obj := range r.changedObjects {
switch obj.(type) {
case *v1.PersistentVolume:
vol, _ := obj.(*v1.PersistentVolume)
klog.V(4).Infof("reactor queue: %s", vol.Name)
case *v1.PersistentVolumeClaim:
claim, _ := obj.(*v1.PersistentVolumeClaim)
klog.V(4).Infof("reactor queue: %s", claim.Name)
}
}
// Pop the first item from the queue and return it
obj := r.changedObjects[0]
r.changedObjects = r.changedObjects[1:]
return obj
}
// syncAll simulates the controller periodic sync of volumes and claim. It
// simply adds all these objects to the internal queue of updates. This method
// should be used when the test manually calls syncClaim/syncVolume. Test that
// use real controller loop (ctrl.Run()) will get periodic sync automatically.
func (r *volumeReactor) syncAll() {
r.lock.Lock()
defer r.lock.Unlock()
for _, c := range r.claims {
r.changedObjects = append(r.changedObjects, c)
}
for _, v := range r.volumes {
r.changedObjects = append(r.changedObjects, v)
}
r.changedSinceLastSync = 0
}
func (r *volumeReactor) getChangeCount() int {
r.lock.Lock()
defer r.lock.Unlock()
return r.changedSinceLastSync
}
// waitForIdle waits until all tests, controllers and other goroutines do their
// job and no new actions are registered for 10 milliseconds.
func (r *volumeReactor) waitForIdle() {
r.ctrl.runningOperations.WaitForCompletion()
// Check every 10ms if the controller does something and stop if it's
// idle.
oldChanges := -1
for {
time.Sleep(10 * time.Millisecond)
changes := r.getChangeCount()
if changes == oldChanges {
// No changes for last 10ms -> controller must be idle.
break
}
oldChanges = changes
}
}
// waitTest waits until all tests, controllers and other goroutines do their
// job and list of current volumes/claims is equal to list of expected
// volumes/claims (with ~10 second timeout).
func (r *volumeReactor) waitTest(test controllerTest) error {
// start with 10 ms, multiply by 2 each step, 10 steps = 10.23 seconds
backoff := wait.Backoff{
Duration: 10 * time.Millisecond,
Jitter: 0,
Factor: 2,
Steps: 10,
}
err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
// Finish all operations that are in progress
r.ctrl.runningOperations.WaitForCompletion()
// Return 'true' if the reactor reached the expected state
err1 := r.checkClaims(test.expectedClaims)
err2 := r.checkVolumes(test.expectedVolumes)
if err1 == nil && err2 == nil {
return true, nil
}
return false, nil
})
return err
}
// deleteVolumeEvent simulates that a volume has been deleted in etcd and
// the controller receives 'volume deleted' event.
func (r *volumeReactor) deleteVolumeEvent(volume *v1.PersistentVolume) {
r.lock.Lock()
defer r.lock.Unlock()
// Remove the volume from list of resulting volumes.
delete(r.volumes, volume.Name)
// Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too).
if r.fakeVolumeWatch != nil {
r.fakeVolumeWatch.Delete(volume.DeepCopy())
}
}
// deleteClaimEvent simulates that a claim has been deleted in etcd and the
// controller receives 'claim deleted' event.
func (r *volumeReactor) deleteClaimEvent(claim *v1.PersistentVolumeClaim) {
r.lock.Lock()
defer r.lock.Unlock()
// Remove the claim from list of resulting claims.
delete(r.claims, claim.Name)
// Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too).
if r.fakeClaimWatch != nil {
r.fakeClaimWatch.Delete(claim.DeepCopy())
}
}
// addVolumeEvent simulates that a volume has been added in etcd and the
// controller receives 'volume added' event.
func (r *volumeReactor) addVolumeEvent(volume *v1.PersistentVolume) {
r.lock.Lock()
defer r.lock.Unlock()
r.volumes[volume.Name] = volume
// Generate event. No cloning is needed, this claim is not stored in the
// controller cache yet.
if r.fakeVolumeWatch != nil {
r.fakeVolumeWatch.Add(volume)
}
}
// modifyVolumeEvent simulates that a volume has been modified in etcd and the
// controller receives 'volume modified' event.
func (r *volumeReactor) modifyVolumeEvent(volume *v1.PersistentVolume) {
r.lock.Lock()
defer r.lock.Unlock()
r.volumes[volume.Name] = volume
// Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too).
if r.fakeVolumeWatch != nil {
r.fakeVolumeWatch.Modify(volume.DeepCopy())
}
}
// addClaimEvent simulates that a claim has been deleted in etcd and the
// controller receives 'claim added' event.
func (r *volumeReactor) addClaimEvent(claim *v1.PersistentVolumeClaim) {
r.lock.Lock()
defer r.lock.Unlock()
r.claims[claim.Name] = claim
// Generate event. No cloning is needed, this claim is not stored in the
// controller cache yet.
if r.fakeClaimWatch != nil {
r.fakeClaimWatch.Add(claim)
}
}
func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, fakeVolumeWatch, fakeClaimWatch *watch.FakeWatcher, errors []reactorError) *volumeReactor {
reactor := &volumeReactor{
volumes: make(map[string]*v1.PersistentVolume),
claims: make(map[string]*v1.PersistentVolumeClaim),
ctrl: ctrl,
fakeVolumeWatch: fakeVolumeWatch,
fakeClaimWatch: fakeClaimWatch,
errors: errors,
watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher),
}
client.AddReactor("create", "persistentvolumes", reactor.React)
client.AddReactor("create", "persistentvolumeclaims", reactor.React)
client.AddReactor("update", "persistentvolumes", reactor.React)
client.AddReactor("update", "persistentvolumeclaims", reactor.React)
client.AddReactor("get", "persistentvolumes", reactor.React)
client.AddReactor("get", "persistentvolumeclaims", reactor.React)
client.AddReactor("delete", "persistentvolumes", reactor.React)
client.AddReactor("delete", "persistentvolumeclaims", reactor.React)
return reactor
}
func alwaysReady() bool { return true }
func newTestController(kubeClient clientset.Interface, informerFactory informers.SharedInformerFactory, enableDynamicProvisioning bool) (*PersistentVolumeController, error) {
@@ -915,11 +425,11 @@ func claimWithAccessMode(modes []v1.PersistentVolumeAccessMode, claims []*v1.Per
return claims
}
func testSyncClaim(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
func testSyncClaim(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
return ctrl.syncClaim(test.initialClaims[0])
}
func testSyncClaimError(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
func testSyncClaimError(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
err := ctrl.syncClaim(test.initialClaims[0])
if err != nil {
@@ -928,7 +438,7 @@ func testSyncClaimError(ctrl *PersistentVolumeController, reactor *volumeReactor
return fmt.Errorf("syncClaim succeeded when failure was expected")
}
func testSyncVolume(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
func testSyncVolume(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
return ctrl.syncVolume(test.initialVolumes[0])
}
@@ -957,7 +467,7 @@ var (
// is deleted, recycled or provisioned.
// - calls given testCall
func wrapTestWithPluginCalls(expectedRecycleCalls, expectedDeleteCalls []error, expectedProvisionCalls []provisionCall, toWrap testCall) testCall {
return func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
return func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
plugin := &mockVolumePlugin{
recycleCalls: expectedRecycleCalls,
deleteCalls: expectedDeleteCalls,
@@ -992,7 +502,7 @@ func wrapTestWithProvisionCalls(expectedProvisionCalls []provisionCall, toWrap t
// - configures controller with a volume plugin that emulates CSI migration
// - calls given testCall
func wrapTestWithCSIMigrationProvisionCalls(toWrap testCall) testCall {
return func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
return func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
plugin := &mockVolumePlugin{
isMigratedToCSI: true,
}
@@ -1010,9 +520,9 @@ func wrapTestWithCSIMigrationProvisionCalls(toWrap testCall) testCall {
// injected function to simulate that something is happening when the
// controller waits for the operation lock. Controller is then resumed and we
// check how it behaves.
func wrapTestWithInjectedOperation(toWrap testCall, injectBeforeOperation func(ctrl *PersistentVolumeController, reactor *volumeReactor)) testCall {
func wrapTestWithInjectedOperation(toWrap testCall, injectBeforeOperation func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor)) testCall {
return func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
return func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
// Inject a hook before async operation starts
ctrl.preOperationHook = func(operationName string) {
// Inside the hook, run the function to inject
@@ -1040,13 +550,13 @@ func wrapTestWithInjectedOperation(toWrap testCall, injectBeforeOperation func(c
}
}
func evaluateTestResults(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest, t *testing.T) {
func evaluateTestResults(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest, t *testing.T) {
// Evaluate results
if err := reactor.checkClaims(test.expectedClaims); err != nil {
if err := reactor.CheckClaims(test.expectedClaims); err != nil {
t.Errorf("Test %q: %v", test.name, err)
}
if err := reactor.checkVolumes(test.expectedVolumes); err != nil {
if err := reactor.CheckVolumes(test.expectedVolumes); err != nil {
t.Errorf("Test %q: %v", test.name, err)
}
@@ -1074,12 +584,12 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag
reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors)
for _, claim := range test.initialClaims {
ctrl.claims.Add(claim)
reactor.claims[claim.Name] = claim
}
for _, volume := range test.initialVolumes {
ctrl.volumes.store.Add(volume)
reactor.volumes[volume.Name] = volume
}
reactor.AddClaims(test.initialClaims)
reactor.AddVolumes(test.initialVolumes)
// Inject classes into controller via a custom lister.
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
@@ -1095,7 +605,7 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag
ctrl.podLister = corelisters.NewPodLister(podIndexer)
// Run the tested functions
err = test.test(ctrl, reactor, test)
err = test.test(ctrl, reactor.VolumeReactor, test)
if err != nil {
t.Errorf("Test %q failed: %v", test.name, err)
}
@@ -1106,7 +616,7 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag
t.Errorf("Test %q failed: %v", test.name, err)
}
evaluateTestResults(ctrl, reactor, test, t)
evaluateTestResults(ctrl, reactor.VolumeReactor, test, t)
}
}
@@ -1145,15 +655,15 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors)
for _, claim := range test.initialClaims {
ctrl.claims.Add(claim)
reactor.claims[claim.Name] = claim
}
for _, volume := range test.initialVolumes {
ctrl.volumes.store.Add(volume)
reactor.volumes[volume.Name] = volume
}
reactor.AddClaims(test.initialClaims)
reactor.AddVolumes(test.initialVolumes)
// Run the tested function
err = test.test(ctrl, reactor, test)
err = test.test(ctrl, reactor.VolumeReactor, test)
if err != nil {
t.Errorf("Test %q failed: %v", test.name, err)
}
@@ -1174,16 +684,16 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
// Wait for all goroutines to finish
reactor.waitForIdle()
obj := reactor.popChange()
obj := reactor.PopChange()
if obj == nil {
// Nothing was changed, should we exit?
if firstSync || reactor.changedSinceLastSync > 0 {
if firstSync || reactor.ChangedSinceLastSync() > 0 {
// There were some changes after the last "periodic sync".
// Simulate "periodic sync" of everything (until it produces
// no changes).
firstSync = false
klog.V(4).Infof("test %q: simulating periodical sync of all claims and volumes", test.name)
reactor.syncAll()
reactor.SyncAll()
} else {
// Last sync did not produce any updates, the test reached
// stable state -> finish.
@@ -1201,7 +711,7 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
ctrl.claims.Update(claim)
err = ctrl.syncClaim(claim)
if err != nil {
if err == versionConflictError {
if err == pvtesting.VersionConflictError {
// Ignore version errors
klog.V(4).Infof("test intentionaly ignores version error.")
} else {
@@ -1218,7 +728,7 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
ctrl.volumes.store.Update(volume)
err = ctrl.syncVolume(volume)
if err != nil {
if err == versionConflictError {
if err == pvtesting.VersionConflictError {
// Ignore version errors
klog.V(4).Infof("test intentionaly ignores version error.")
} else {
@@ -1231,7 +741,7 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
continue
}
}
evaluateTestResults(ctrl, reactor, test, t)
evaluateTestResults(ctrl, reactor.VolumeReactor, test, t)
klog.V(4).Infof("test %q finished after %d iterations", test.name, counter)
}
}

View File

@@ -21,13 +21,9 @@ import (
"sort"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
@@ -96,7 +92,7 @@ func (pvIndex *persistentVolumeOrderedIndex) findByClaim(claim *v1.PersistentVol
return nil, err
}
bestVol, err := findMatchingVolume(claim, volumes, nil /* node for topology binding*/, nil /* exclusion map */, delayBinding)
bestVol, err := pvutil.FindMatchingVolume(claim, volumes, nil /* node for topology binding*/, nil /* exclusion map */, delayBinding)
if err != nil {
return nil, err
}
@@ -108,176 +104,6 @@ func (pvIndex *persistentVolumeOrderedIndex) findByClaim(claim *v1.PersistentVol
return nil, nil
}
// findMatchingVolume goes through the list of volumes to find the best matching volume
// for the claim.
//
// This function is used by both the PV controller and scheduler.
//
// delayBinding is true only in the PV controller path. When set, prebound PVs are still returned
// as a match for the claim, but unbound PVs are skipped.
//
// node is set only in the scheduler path. When set, the PV node affinity is checked against
// the node's labels.
//
// excludedVolumes is only used in the scheduler path, and is needed for evaluating multiple
// unbound PVCs for a single Pod at one time. As each PVC finds a matching PV, the chosen
// PV needs to be excluded from future matching.
func findMatchingVolume(
claim *v1.PersistentVolumeClaim,
volumes []*v1.PersistentVolume,
node *v1.Node,
excludedVolumes map[string]*v1.PersistentVolume,
delayBinding bool) (*v1.PersistentVolume, error) {
var smallestVolume *v1.PersistentVolume
var smallestVolumeQty resource.Quantity
requestedQty := claim.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
requestedClass := v1helper.GetPersistentVolumeClaimClass(claim)
var selector labels.Selector
if claim.Spec.Selector != nil {
internalSelector, err := metav1.LabelSelectorAsSelector(claim.Spec.Selector)
if err != nil {
// should be unreachable code due to validation
return nil, fmt.Errorf("error creating internal label selector for claim: %v: %v", claimToClaimKey(claim), err)
}
selector = internalSelector
}
// Go through all available volumes with two goals:
// - find a volume that is either pre-bound by user or dynamically
// provisioned for this claim. Because of this we need to loop through
// all volumes.
// - find the smallest matching one if there is no volume pre-bound to
// the claim.
for _, volume := range volumes {
if _, ok := excludedVolumes[volume.Name]; ok {
// Skip volumes in the excluded list
continue
}
volumeQty := volume.Spec.Capacity[v1.ResourceStorage]
// check if volumeModes do not match (feature gate protected)
isMismatch, err := checkVolumeModeMismatches(&claim.Spec, &volume.Spec)
if err != nil {
return nil, fmt.Errorf("error checking if volumeMode was a mismatch: %v", err)
}
// filter out mismatching volumeModes
if isMismatch {
continue
}
// check if PV's DeletionTimeStamp is set, if so, skip this volume.
if utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection) {
if volume.ObjectMeta.DeletionTimestamp != nil {
continue
}
}
nodeAffinityValid := true
if node != nil {
// Scheduler path, check that the PV NodeAffinity
// is satisfied by the node
err := volumeutil.CheckNodeAffinity(volume, node.Labels)
if err != nil {
nodeAffinityValid = false
}
}
if IsVolumeBoundToClaim(volume, claim) {
// this claim and volume are pre-bound; return
// the volume if the size request is satisfied,
// otherwise continue searching for a match
if volumeQty.Cmp(requestedQty) < 0 {
continue
}
// If PV node affinity is invalid, return no match.
// This means the prebound PV (and therefore PVC)
// is not suitable for this node.
if !nodeAffinityValid {
return nil, nil
}
return volume, nil
}
if node == nil && delayBinding {
// PV controller does not bind this claim.
// Scheduler will handle binding unbound volumes
// Scheduler path will have node != nil
continue
}
// filter out:
// - volumes in non-available phase
// - volumes bound to another claim
// - volumes whose labels don't match the claim's selector, if specified
// - volumes in Class that is not requested
// - volumes whose NodeAffinity does not match the node
if volume.Status.Phase != v1.VolumeAvailable {
// We ignore volumes in non-available phase, because volumes that
// satisfies matching criteria will be updated to available, binding
// them now has high chance of encountering unnecessary failures
// due to API conflicts.
continue
} else if volume.Spec.ClaimRef != nil {
continue
} else if selector != nil && !selector.Matches(labels.Set(volume.Labels)) {
continue
}
if v1helper.GetPersistentVolumeClass(volume) != requestedClass {
continue
}
if !nodeAffinityValid {
continue
}
if node != nil {
// Scheduler path
// Check that the access modes match
if !checkAccessModes(claim, volume) {
continue
}
}
if volumeQty.Cmp(requestedQty) >= 0 {
if smallestVolume == nil || smallestVolumeQty.Cmp(volumeQty) > 0 {
smallestVolume = volume
smallestVolumeQty = volumeQty
}
}
}
if smallestVolume != nil {
// Found a matching volume
return smallestVolume, nil
}
return nil, nil
}
// checkVolumeModeMismatches is a convenience method that checks volumeMode for PersistentVolume
// and PersistentVolumeClaims
func checkVolumeModeMismatches(pvcSpec *v1.PersistentVolumeClaimSpec, pvSpec *v1.PersistentVolumeSpec) (bool, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
return false, nil
}
// In HA upgrades, we cannot guarantee that the apiserver is on a version >= controller-manager.
// So we default a nil volumeMode to filesystem
requestedVolumeMode := v1.PersistentVolumeFilesystem
if pvcSpec.VolumeMode != nil {
requestedVolumeMode = *pvcSpec.VolumeMode
}
pvVolumeMode := v1.PersistentVolumeFilesystem
if pvSpec.VolumeMode != nil {
pvVolumeMode = *pvSpec.VolumeMode
}
return requestedVolumeMode != pvVolumeMode, nil
}
// findBestMatchForClaim is a convenience method that finds a volume by the claim's AccessModes and requests for Storage
func (pvIndex *persistentVolumeOrderedIndex) findBestMatchForClaim(claim *v1.PersistentVolumeClaim, delayBinding bool) (*v1.PersistentVolume, error) {
return pvIndex.findByClaim(claim, delayBinding)
@@ -362,19 +188,3 @@ func claimToClaimKey(claim *v1.PersistentVolumeClaim) string {
func claimrefToClaimKey(claimref *v1.ObjectReference) string {
return fmt.Sprintf("%s/%s", claimref.Namespace, claimref.Name)
}
// Returns true if PV satisfies all the PVC's requested AccessModes
func checkAccessModes(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) bool {
pvModesMap := map[v1.PersistentVolumeAccessMode]bool{}
for _, mode := range volume.Spec.AccessModes {
pvModesMap[mode] = true
}
for _, mode := range claim.Spec.AccessModes {
_, ok := pvModesMap[mode]
if !ok {
return false
}
}
return true
}

View File

@@ -28,6 +28,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
ref "k8s.io/client-go/tools/reference"
"k8s.io/kubernetes/pkg/api/testapi"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume/util"
)
@@ -773,7 +774,7 @@ func createTestVolumes() []*v1.PersistentVolume {
v1.ReadOnlyMany,
},
StorageClassName: classWait,
NodeAffinity: getVolumeNodeAffinity("key1", "value1"),
NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value1"),
VolumeMode: &fs,
},
Status: v1.PersistentVolumeStatus{
@@ -797,7 +798,7 @@ func createTestVolumes() []*v1.PersistentVolume {
v1.ReadOnlyMany,
},
StorageClassName: classWait,
NodeAffinity: getVolumeNodeAffinity("key1", "value1"),
NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value1"),
VolumeMode: &fs,
},
Status: v1.PersistentVolumeStatus{
@@ -822,7 +823,7 @@ func createTestVolumes() []*v1.PersistentVolume {
},
StorageClassName: classWait,
ClaimRef: &v1.ObjectReference{Name: "claim02", Namespace: "myns"},
NodeAffinity: getVolumeNodeAffinity("key1", "value1"),
NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value1"),
VolumeMode: &fs,
},
Status: v1.PersistentVolumeStatus{
@@ -846,7 +847,7 @@ func createTestVolumes() []*v1.PersistentVolume {
v1.ReadOnlyMany,
},
StorageClassName: classWait,
NodeAffinity: getVolumeNodeAffinity("key1", "value3"),
NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value3"),
VolumeMode: &fs,
},
Status: v1.PersistentVolumeStatus{
@@ -870,7 +871,7 @@ func createTestVolumes() []*v1.PersistentVolume {
v1.ReadOnlyMany,
},
StorageClassName: classWait,
NodeAffinity: getVolumeNodeAffinity("key1", "value4"),
NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value4"),
VolumeMode: &fs,
},
Status: v1.PersistentVolumeStatus{
@@ -894,7 +895,7 @@ func createTestVolumes() []*v1.PersistentVolume {
v1.ReadOnlyMany,
},
StorageClassName: classWait,
NodeAffinity: getVolumeNodeAffinity("key1", "value4"),
NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value4"),
VolumeMode: &fs,
},
Status: v1.PersistentVolumeStatus{
@@ -918,7 +919,7 @@ func createTestVolumes() []*v1.PersistentVolume {
v1.ReadOnlyMany,
},
StorageClassName: classWait,
NodeAffinity: getVolumeNodeAffinity("key1", "value4"),
NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value4"),
VolumeMode: &fs,
},
Status: v1.PersistentVolumeStatus{
@@ -942,7 +943,7 @@ func createTestVolumes() []*v1.PersistentVolume {
v1.ReadOnlyMany,
},
StorageClassName: classWait,
NodeAffinity: getVolumeNodeAffinity("key1", "value4"),
NodeAffinity: pvutil.GetVolumeNodeAffinity("key1", "value4"),
VolumeMode: &fs,
},
},
@@ -968,24 +969,6 @@ func testVolume(name, size string) *v1.PersistentVolume {
}
}
func getVolumeNodeAffinity(key string, value string) *v1.VolumeNodeAffinity {
return &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: key,
Operator: v1.NodeSelectorOpIn,
Values: []string{value},
},
},
},
},
},
}
}
func createVolumeModeBlockTestVolume() *v1.PersistentVolume {
blockMode := v1.PersistentVolumeBlock
@@ -1163,7 +1146,7 @@ func TestVolumeModeCheck(t *testing.T) {
for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.BlockVolume, scenario.enableBlock)()
expectedMismatch, err := checkVolumeModeMismatches(&scenario.pvc.Spec, &scenario.vol.Spec)
expectedMismatch, err := pvutil.CheckVolumeModeMismatches(&scenario.pvc.Spec, &scenario.vol.Spec)
if err != nil {
t.Errorf("Unexpected failure for checkVolumeModeMismatches: %v", err)
}
@@ -1608,7 +1591,7 @@ func TestFindMatchVolumeWithNode(t *testing.T) {
}
for name, scenario := range scenarios {
volume, err := findMatchingVolume(scenario.claim, volumes, scenario.node, scenario.excludedVolumes, true)
volume, err := pvutil.FindMatchingVolume(scenario.claim, volumes, scenario.node, scenario.excludedVolumes, true)
if err != nil {
t.Errorf("Unexpected error matching volume by claim: %v", err)
}
@@ -1662,7 +1645,7 @@ func TestCheckAccessModes(t *testing.T) {
}
for name, scenario := range scenarios {
result := checkAccessModes(scenario.claim, volume)
result := pvutil.CheckAccessModes(scenario.claim, volume)
if result != scenario.shouldSucceed {
t.Errorf("Test %q failed: Expected %v, got %v", name, scenario.shouldSucceed, result)
}

View File

@@ -25,6 +25,7 @@ import (
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "k8s.io/kubernetes/pkg/apis/core"
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
)
var class1Parameters = map[string]string{
@@ -191,13 +192,11 @@ func TestProvisionSync(t *testing.T) {
// The claim would be bound in next syncClaim
newClaimArray("claim11-7", "uid11-7", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner),
noevents, noerrors,
wrapTestWithInjectedOperation(wrapTestWithProvisionCalls([]provisionCall{}, testSyncClaim), func(ctrl *PersistentVolumeController, reactor *volumeReactor) {
wrapTestWithInjectedOperation(wrapTestWithProvisionCalls([]provisionCall{}, testSyncClaim), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) {
// Create a volume before provisionClaimOperation starts.
// This similates a parallel controller provisioning the volume.
reactor.lock.Lock()
volume := newVolume("pvc-uid11-7", "1Gi", "uid11-7", "claim11-7", v1.VolumeBound, v1.PersistentVolumeReclaimDelete, classGold, annBoundByController, annDynamicallyProvisioned)
reactor.volumes[volume.Name] = volume
reactor.lock.Unlock()
reactor.AddVolume(volume)
}),
},
{
@@ -210,11 +209,11 @@ func TestProvisionSync(t *testing.T) {
// Binding will be completed in the next syncClaim
newClaimArray("claim11-8", "uid11-8", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner),
[]string{"Normal ProvisioningSucceeded"},
[]reactorError{
[]pvtesting.ReactorError{
// Inject error to the first
// kubeclient.PersistentVolumes.Create() call. All other calls
// will succeed.
{"create", "persistentvolumes", errors.New("Mock creation error")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error")},
},
wrapTestWithProvisionCalls([]provisionCall{provision1Success}, testSyncClaim),
},
@@ -227,14 +226,14 @@ func TestProvisionSync(t *testing.T) {
newClaimArray("claim11-9", "uid11-9", "1Gi", "", v1.ClaimPending, &classGold),
newClaimArray("claim11-9", "uid11-9", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner),
[]string{"Warning ProvisioningFailed"},
[]reactorError{
[]pvtesting.ReactorError{
// Inject error to five kubeclient.PersistentVolumes.Create()
// calls
{"create", "persistentvolumes", errors.New("Mock creation error1")},
{"create", "persistentvolumes", errors.New("Mock creation error2")},
{"create", "persistentvolumes", errors.New("Mock creation error3")},
{"create", "persistentvolumes", errors.New("Mock creation error4")},
{"create", "persistentvolumes", errors.New("Mock creation error5")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error1")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error2")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error3")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error4")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error5")},
},
wrapTestWithPluginCalls(
nil, // recycle calls
@@ -252,14 +251,14 @@ func TestProvisionSync(t *testing.T) {
newClaimArray("claim11-10", "uid11-10", "1Gi", "", v1.ClaimPending, &classGold),
newClaimArray("claim11-10", "uid11-10", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner),
[]string{"Warning ProvisioningFailed", "Warning ProvisioningCleanupFailed"},
[]reactorError{
[]pvtesting.ReactorError{
// Inject error to five kubeclient.PersistentVolumes.Create()
// calls
{"create", "persistentvolumes", errors.New("Mock creation error1")},
{"create", "persistentvolumes", errors.New("Mock creation error2")},
{"create", "persistentvolumes", errors.New("Mock creation error3")},
{"create", "persistentvolumes", errors.New("Mock creation error4")},
{"create", "persistentvolumes", errors.New("Mock creation error5")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error1")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error2")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error3")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error4")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error5")},
},
// No deleteCalls are configured, which results into no deleter plugin available for the volume
wrapTestWithProvisionCalls([]provisionCall{provision1Success}, testSyncClaim),
@@ -273,14 +272,14 @@ func TestProvisionSync(t *testing.T) {
newClaimArray("claim11-11", "uid11-11", "1Gi", "", v1.ClaimPending, &classGold),
newClaimArray("claim11-11", "uid11-11", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner),
[]string{"Warning ProvisioningFailed", "Warning ProvisioningCleanupFailed"},
[]reactorError{
[]pvtesting.ReactorError{
// Inject error to five kubeclient.PersistentVolumes.Create()
// calls
{"create", "persistentvolumes", errors.New("Mock creation error1")},
{"create", "persistentvolumes", errors.New("Mock creation error2")},
{"create", "persistentvolumes", errors.New("Mock creation error3")},
{"create", "persistentvolumes", errors.New("Mock creation error4")},
{"create", "persistentvolumes", errors.New("Mock creation error5")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error1")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error2")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error3")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error4")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error5")},
},
wrapTestWithPluginCalls(
nil, // recycle calls
@@ -303,14 +302,14 @@ func TestProvisionSync(t *testing.T) {
newClaimArray("claim11-12", "uid11-12", "1Gi", "", v1.ClaimPending, &classGold),
newClaimArray("claim11-12", "uid11-12", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner),
[]string{"Warning ProvisioningFailed"},
[]reactorError{
[]pvtesting.ReactorError{
// Inject error to five kubeclient.PersistentVolumes.Create()
// calls
{"create", "persistentvolumes", errors.New("Mock creation error1")},
{"create", "persistentvolumes", errors.New("Mock creation error2")},
{"create", "persistentvolumes", errors.New("Mock creation error3")},
{"create", "persistentvolumes", errors.New("Mock creation error4")},
{"create", "persistentvolumes", errors.New("Mock creation error5")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error1")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error2")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error3")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error4")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error5")},
},
wrapTestWithPluginCalls(
nil, // recycle calls
@@ -397,11 +396,11 @@ func TestProvisionSync(t *testing.T) {
newClaimArray("claim11-19", "uid11-19", "1Gi", "", v1.ClaimPending, &classGold),
newClaimArray("claim11-19", "uid11-19", "1Gi", "", v1.ClaimPending, &classGold, annStorageProvisioner),
noevents,
[]reactorError{
[]pvtesting.ReactorError{
// Inject errors to simulate crashed API server during
// kubeclient.PersistentVolumes.Create()
{"create", "persistentvolumes", errors.New("Mock creation error1")},
{"create", "persistentvolumes", apierrs.NewAlreadyExists(api.Resource("persistentvolumes"), "")},
{Verb: "create", Resource: "persistentvolumes", Error: errors.New("Mock creation error1")},
{Verb: "create", Resource: "persistentvolumes", Error: apierrs.NewAlreadyExists(api.Resource("persistentvolumes"), "")},
},
wrapTestWithPluginCalls(
nil, // recycle calls

View File

@@ -43,6 +43,7 @@ import (
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller/volume/events"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
@@ -276,7 +277,7 @@ func checkVolumeSatisfyClaim(volume *v1.PersistentVolume, claim *v1.PersistentVo
return fmt.Errorf("storageClassName does not match")
}
isMismatch, err := checkVolumeModeMismatches(&claim.Spec, &volume.Spec)
isMismatch, err := pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec)
if err != nil {
return fmt.Errorf("error checking volumeMode: %v", err)
}
@@ -284,7 +285,7 @@ func checkVolumeSatisfyClaim(volume *v1.PersistentVolume, claim *v1.PersistentVo
return fmt.Errorf("incompatible volumeMode")
}
if !checkAccessModes(claim, volume) {
if !pvutil.CheckAccessModes(claim, volume) {
return fmt.Errorf("incompatible accessMode")
}
@@ -309,7 +310,7 @@ func (ctrl *PersistentVolumeController) shouldDelayBinding(claim *v1.PersistentV
}
// If claim is in delay binding mode.
return IsDelayBindingMode(claim, ctrl.classLister)
return pvutil.IsDelayBindingMode(claim, ctrl.classLister)
}
// syncUnboundClaim is the main controller method to decide what to do with an
@@ -407,7 +408,7 @@ func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVol
}
// OBSERVATION: pvc is "Bound", pv is "Bound"
return nil
} else if IsVolumeBoundToClaim(volume, claim) {
} else if pvutil.IsVolumeBoundToClaim(volume, claim) {
// User asked for a PV that is claimed by this PVC
// OBSERVATION: pvc is "Pending", pv is "Bound"
klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume already bound, finishing the binding", claimToClaimKey(claim))
@@ -610,7 +611,7 @@ func (ctrl *PersistentVolumeController) syncVolume(volume *v1.PersistentVolume)
}
return nil
} else if claim.Spec.VolumeName == "" {
if isMismatch, err := checkVolumeModeMismatches(&claim.Spec, &volume.Spec); err != nil || isMismatch {
if isMismatch, err := pvutil.CheckVolumeModeMismatches(&claim.Spec, &volume.Spec); err != nil || isMismatch {
// Binding for the volume won't be called in syncUnboundClaim,
// because findBestMatchForClaim won't return the volume due to volumeMode mismatch.
volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name)
@@ -851,7 +852,7 @@ func (ctrl *PersistentVolumeController) updateVolumePhaseWithEvent(volume *v1.Pe
func (ctrl *PersistentVolumeController) bindVolumeToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, error) {
klog.V(4).Infof("updating PersistentVolume[%s]: binding to %q", volume.Name, claimToClaimKey(claim))
volumeClone, dirty, err := GetBindVolumeToClaim(volume, claim)
volumeClone, dirty, err := pvutil.GetBindVolumeToClaim(volume, claim)
if err != nil {
return nil, err
}

View File

@@ -30,6 +30,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller"
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
)
var (
@@ -60,9 +61,9 @@ func TestControllerSync(t *testing.T) {
newClaimArray("claim5-2", "uid5-2", "1Gi", "volume5-2", v1.ClaimBound, nil, annBoundByController, annBindCompleted),
noevents, noerrors,
// Custom test function that generates an add event
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
claim := newClaim("claim5-2", "uid5-2", "1Gi", "", v1.ClaimPending, nil)
reactor.addClaimEvent(claim)
reactor.AddClaimEvent(claim)
return nil
},
},
@@ -75,10 +76,10 @@ func TestControllerSync(t *testing.T) {
noclaims,
noevents, noerrors,
// Custom test function that generates a delete event
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
obj := ctrl.claims.List()[0]
claim := obj.(*v1.PersistentVolumeClaim)
reactor.deleteClaimEvent(claim)
reactor.DeleteClaimEvent(claim)
return nil
},
},
@@ -91,10 +92,10 @@ func TestControllerSync(t *testing.T) {
newClaimArray("claim5-4", "uid5-4", "1Gi", "volume5-4", v1.ClaimLost, nil, annBoundByController, annBindCompleted),
[]string{"Warning ClaimLost"}, noerrors,
// Custom test function that generates a delete event
func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor, test controllerTest) error {
obj := ctrl.volumes.store.List()[0]
volume := obj.(*v1.PersistentVolume)
reactor.deleteVolumeEvent(volume)
reactor.DeleteVolumeEvent(volume)
return nil
},
},
@@ -120,13 +121,13 @@ func TestControllerSync(t *testing.T) {
reactor := newVolumeReactor(client, ctrl, fakeVolumeWatch, fakeClaimWatch, test.errors)
for _, claim := range test.initialClaims {
reactor.claims[claim.Name] = claim
reactor.AddClaim(claim)
go func(claim *v1.PersistentVolumeClaim) {
fakeClaimWatch.Add(claim)
}(claim)
}
for _, volume := range test.initialVolumes {
reactor.volumes[volume.Name] = volume
reactor.AddVolume(volume)
go func(volume *v1.PersistentVolume) {
fakeVolumeWatch.Add(volume)
}(volume)
@@ -148,7 +149,7 @@ func TestControllerSync(t *testing.T) {
klog.V(4).Infof("controller synced, starting test")
// Call the tested function
err = test.test(ctrl, reactor, test)
err = test.test(ctrl, reactor.VolumeReactor, test)
if err != nil {
t.Errorf("Test %q initial test call failed: %v", test.name, err)
}
@@ -162,7 +163,7 @@ func TestControllerSync(t *testing.T) {
}
close(stopCh)
evaluateTestResults(ctrl, reactor, test, t)
evaluateTestResults(ctrl, reactor.VolumeReactor, test, t)
}
}

View File

@@ -23,6 +23,7 @@ import (
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
)
// Test single call to syncVolume, expecting recycling to happen.
@@ -130,11 +131,9 @@ func TestRecycleSync(t *testing.T) {
noclaims,
noclaims,
noevents, noerrors,
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *volumeReactor) {
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) {
// Delete the volume before recycle operation starts
reactor.lock.Lock()
delete(reactor.volumes, "volume6-6")
reactor.lock.Unlock()
reactor.DeleteVolume("volume6-6")
}),
},
{
@@ -147,14 +146,9 @@ func TestRecycleSync(t *testing.T) {
noclaims,
noclaims,
noevents, noerrors,
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *volumeReactor) {
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) {
// Mark the volume as Available before the recycler starts
reactor.lock.Lock()
volume := reactor.volumes["volume6-7"]
volume.Spec.ClaimRef = nil
volume.Status.Phase = v1.VolumeAvailable
volume.Annotations = nil
reactor.lock.Unlock()
reactor.MarkVolumeAvaiable("volume6-7")
}),
},
{
@@ -164,17 +158,13 @@ func TestRecycleSync(t *testing.T) {
// user.
"6-8 - prebound volume is deleted before recycling",
newVolumeArray("volume6-8", "1Gi", "uid6-8", "claim6-8", v1.VolumeBound, v1.PersistentVolumeReclaimRecycle, classEmpty),
newVolumeArray("volume6-8", "1Gi", "", "claim6-8", v1.VolumeAvailable, v1.PersistentVolumeReclaimRecycle, classEmpty),
newVolumeArray("volume6-8", "1Gi", "", "", v1.VolumeAvailable, v1.PersistentVolumeReclaimRecycle, classEmpty),
noclaims,
noclaims,
noevents, noerrors,
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *volumeReactor) {
wrapTestWithInjectedOperation(wrapTestWithReclaimCalls(operationRecycle, []error{}, testSyncVolume), func(ctrl *PersistentVolumeController, reactor *pvtesting.VolumeReactor) {
// Mark the volume as Available before the recycler starts
reactor.lock.Lock()
volume := reactor.volumes["volume6-8"]
volume.Spec.ClaimRef.UID = ""
volume.Status.Phase = v1.VolumeAvailable
reactor.lock.Unlock()
reactor.MarkVolumeAvaiable("volume6-8")
}),
},
{

View File

@@ -0,0 +1,36 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["testing.go"],
importpath = "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing",
visibility = ["//visibility:public"],
deps = [
"//pkg/features:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@@ -0,0 +1,589 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"errors"
"fmt"
"reflect"
"strconv"
"sync"
v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/features"
)
var VersionConflictError = errors.New("VersionError")
// VolumeReactor is a core.Reactor that simulates etcd and API server. It
// stores:
// - Latest version of claims volumes saved by the controller.
// - Queue of all saves (to simulate "volume/claim updated" events). This queue
// contains all intermediate state of an object - e.g. a claim.VolumeName
// is updated first and claim.Phase second. This queue will then contain both
// updates as separate entries.
// - Number of changes since the last call to VolumeReactor.syncAll().
// - Optionally, volume and claim fake watchers which should be the same ones
// used by the controller. Any time an event function like deleteVolumeEvent
// is called to simulate an event, the reactor's stores are updated and the
// controller is sent the event via the fake watcher.
// - Optionally, list of error that should be returned by reactor, simulating
// etcd / API server failures. These errors are evaluated in order and every
// error is returned only once. I.e. when the reactor finds matching
// ReactorError, it return appropriate error and removes the ReactorError from
// the list.
type VolumeReactor struct {
volumes map[string]*v1.PersistentVolume
claims map[string]*v1.PersistentVolumeClaim
changedObjects []interface{}
changedSinceLastSync int
fakeVolumeWatch *watch.FakeWatcher
fakeClaimWatch *watch.FakeWatcher
lock sync.RWMutex
errors []ReactorError
watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher
}
// ReactorError is an error that is returned by test reactor (=simulated
// etcd+/API server) when an action performed by the reactor matches given verb
// ("get", "update", "create", "delete" or "*"") on given resource
// ("persistentvolumes", "persistentvolumeclaims" or "*").
type ReactorError struct {
Verb string
Resource string
Error error
}
// React is a callback called by fake kubeClient from the controller.
// In other words, every claim/volume change performed by the controller ends
// here.
// This callback checks versions of the updated objects and refuse those that
// are too old (simulating real etcd).
// All updated objects are stored locally to keep track of object versions and
// to evaluate test results.
// All updated objects are also inserted into changedObjects queue and
// optionally sent back to the controller via its watchers.
func (r *VolumeReactor) React(action core.Action) (handled bool, ret runtime.Object, err error) {
r.lock.Lock()
defer r.lock.Unlock()
klog.V(4).Infof("reactor got operation %q on %q", action.GetVerb(), action.GetResource())
// Inject error when requested
err = r.injectReactError(action)
if err != nil {
return true, nil, err
}
// Test did not request to inject an error, continue simulating API server.
switch {
case action.Matches("create", "persistentvolumes"):
obj := action.(core.UpdateAction).GetObject()
volume := obj.(*v1.PersistentVolume)
// check the volume does not exist
_, found := r.volumes[volume.Name]
if found {
return true, nil, fmt.Errorf("Cannot create volume %s: volume already exists", volume.Name)
}
// mimic apiserver defaulting
if volume.Spec.VolumeMode == nil && utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
volume.Spec.VolumeMode = new(v1.PersistentVolumeMode)
*volume.Spec.VolumeMode = v1.PersistentVolumeFilesystem
}
// Store the updated object to appropriate places.
r.volumes[volume.Name] = volume
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Add(volume)
}
r.changedObjects = append(r.changedObjects, volume)
r.changedSinceLastSync++
klog.V(4).Infof("created volume %s", volume.Name)
return true, volume, nil
case action.Matches("create", "persistentvolumeclaims"):
obj := action.(core.UpdateAction).GetObject()
claim := obj.(*v1.PersistentVolumeClaim)
// check the claim does not exist
_, found := r.claims[claim.Name]
if found {
return true, nil, fmt.Errorf("Cannot create claim %s: claim already exists", claim.Name)
}
// Store the updated object to appropriate places.
r.claims[claim.Name] = claim
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Add(claim)
}
r.changedObjects = append(r.changedObjects, claim)
r.changedSinceLastSync++
klog.V(4).Infof("created claim %s", claim.Name)
return true, claim, nil
case action.Matches("update", "persistentvolumes"):
obj := action.(core.UpdateAction).GetObject()
volume := obj.(*v1.PersistentVolume)
// Check and bump object version
storedVolume, found := r.volumes[volume.Name]
if found {
storedVer, _ := strconv.Atoi(storedVolume.ResourceVersion)
requestedVer, _ := strconv.Atoi(volume.ResourceVersion)
if storedVer != requestedVer {
return true, obj, VersionConflictError
}
if reflect.DeepEqual(storedVolume, volume) {
klog.V(4).Infof("nothing updated volume %s", volume.Name)
return true, volume, nil
}
// Don't modify the existing object
volume = volume.DeepCopy()
volume.ResourceVersion = strconv.Itoa(storedVer + 1)
} else {
return true, nil, fmt.Errorf("Cannot update volume %s: volume not found", volume.Name)
}
// Store the updated object to appropriate places.
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Modify(volume)
}
r.volumes[volume.Name] = volume
r.changedObjects = append(r.changedObjects, volume)
r.changedSinceLastSync++
klog.V(4).Infof("saved updated volume %s", volume.Name)
return true, volume, nil
case action.Matches("update", "persistentvolumeclaims"):
obj := action.(core.UpdateAction).GetObject()
claim := obj.(*v1.PersistentVolumeClaim)
// Check and bump object version
storedClaim, found := r.claims[claim.Name]
if found {
storedVer, _ := strconv.Atoi(storedClaim.ResourceVersion)
requestedVer, _ := strconv.Atoi(claim.ResourceVersion)
if storedVer != requestedVer {
return true, obj, VersionConflictError
}
if reflect.DeepEqual(storedClaim, claim) {
klog.V(4).Infof("nothing updated claim %s", claim.Name)
return true, claim, nil
}
// Don't modify the existing object
claim = claim.DeepCopy()
claim.ResourceVersion = strconv.Itoa(storedVer + 1)
} else {
return true, nil, fmt.Errorf("Cannot update claim %s: claim not found", claim.Name)
}
// Store the updated object to appropriate places.
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Modify(claim)
}
r.claims[claim.Name] = claim
r.changedObjects = append(r.changedObjects, claim)
r.changedSinceLastSync++
klog.V(4).Infof("saved updated claim %s", claim.Name)
return true, claim, nil
case action.Matches("get", "persistentvolumes"):
name := action.(core.GetAction).GetName()
volume, found := r.volumes[name]
if found {
klog.V(4).Infof("GetVolume: found %s", volume.Name)
return true, volume.DeepCopy(), nil
} else {
klog.V(4).Infof("GetVolume: volume %s not found", name)
return true, nil, fmt.Errorf("Cannot find volume %s", name)
}
case action.Matches("get", "persistentvolumeclaims"):
name := action.(core.GetAction).GetName()
claim, found := r.claims[name]
if found {
klog.V(4).Infof("GetClaim: found %s", claim.Name)
return true, claim.DeepCopy(), nil
} else {
klog.V(4).Infof("GetClaim: claim %s not found", name)
return true, nil, apierrs.NewNotFound(action.GetResource().GroupResource(), name)
}
case action.Matches("delete", "persistentvolumes"):
name := action.(core.DeleteAction).GetName()
klog.V(4).Infof("deleted volume %s", name)
obj, found := r.volumes[name]
if found {
delete(r.volumes, name)
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Delete(obj)
}
r.changedSinceLastSync++
return true, nil, nil
} else {
return true, nil, fmt.Errorf("Cannot delete volume %s: not found", name)
}
case action.Matches("delete", "persistentvolumeclaims"):
name := action.(core.DeleteAction).GetName()
klog.V(4).Infof("deleted claim %s", name)
obj, found := r.claims[name]
if found {
delete(r.claims, name)
for _, w := range r.getWatches(action.GetResource(), action.GetNamespace()) {
w.Delete(obj)
}
r.changedSinceLastSync++
return true, nil, nil
} else {
return true, nil, fmt.Errorf("Cannot delete claim %s: not found", name)
}
}
return false, nil, nil
}
// Watch watches objects from the VolumeReactor. Watch returns a channel which
// will push added / modified / deleted object.
func (r *VolumeReactor) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) {
r.lock.Lock()
defer r.lock.Unlock()
fakewatcher := watch.NewRaceFreeFake()
if _, exists := r.watchers[gvr]; !exists {
r.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher)
}
r.watchers[gvr][ns] = append(r.watchers[gvr][ns], fakewatcher)
return fakewatcher, nil
}
func (r *VolumeReactor) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.RaceFreeFakeWatcher {
watches := []*watch.RaceFreeFakeWatcher{}
if r.watchers[gvr] != nil {
if w := r.watchers[gvr][ns]; w != nil {
watches = append(watches, w...)
}
if ns != metav1.NamespaceAll {
if w := r.watchers[gvr][metav1.NamespaceAll]; w != nil {
watches = append(watches, w...)
}
}
}
return watches
}
func (r *VolumeReactor) ChangedSinceLastSync() int {
r.lock.RLock()
defer r.lock.RUnlock()
return r.changedSinceLastSync
}
// injectReactError returns an error when the test requested given action to
// fail. nil is returned otherwise.
func (r *VolumeReactor) injectReactError(action core.Action) error {
if len(r.errors) == 0 {
// No more errors to inject, everything should succeed.
return nil
}
for i, expected := range r.errors {
klog.V(4).Infof("trying to match %q %q with %q %q", expected.Verb, expected.Resource, action.GetVerb(), action.GetResource())
if action.Matches(expected.Verb, expected.Resource) {
// That's the action we're waiting for, remove it from injectedErrors
r.errors = append(r.errors[:i], r.errors[i+1:]...)
klog.V(4).Infof("reactor found matching error at index %d: %q %q, returning %v", i, expected.Verb, expected.Resource, expected.Error)
return expected.Error
}
}
return nil
}
// CheckVolumes compares all expectedVolumes with set of volumes at the end of
// the test and reports differences.
func (r *VolumeReactor) CheckVolumes(expectedVolumes []*v1.PersistentVolume) error {
r.lock.Lock()
defer r.lock.Unlock()
expectedMap := make(map[string]*v1.PersistentVolume)
gotMap := make(map[string]*v1.PersistentVolume)
// Clear any ResourceVersion from both sets
for _, v := range expectedVolumes {
// Don't modify the existing object
v := v.DeepCopy()
v.ResourceVersion = ""
if v.Spec.ClaimRef != nil {
v.Spec.ClaimRef.ResourceVersion = ""
}
expectedMap[v.Name] = v
}
for _, v := range r.volumes {
// We must clone the volume because of golang race check - it was
// written by the controller without any locks on it.
v := v.DeepCopy()
v.ResourceVersion = ""
if v.Spec.ClaimRef != nil {
v.Spec.ClaimRef.ResourceVersion = ""
}
gotMap[v.Name] = v
}
if !reflect.DeepEqual(expectedMap, gotMap) {
// Print ugly but useful diff of expected and received objects for
// easier debugging.
return fmt.Errorf("Volume check failed [A-expected, B-got]: %s", diff.ObjectDiff(expectedMap, gotMap))
}
return nil
}
// CheckClaims compares all expectedClaims with set of claims at the end of the
// test and reports differences.
func (r *VolumeReactor) CheckClaims(expectedClaims []*v1.PersistentVolumeClaim) error {
r.lock.Lock()
defer r.lock.Unlock()
expectedMap := make(map[string]*v1.PersistentVolumeClaim)
gotMap := make(map[string]*v1.PersistentVolumeClaim)
for _, c := range expectedClaims {
// Don't modify the existing object
c = c.DeepCopy()
c.ResourceVersion = ""
expectedMap[c.Name] = c
}
for _, c := range r.claims {
// We must clone the claim because of golang race check - it was
// written by the controller without any locks on it.
c = c.DeepCopy()
c.ResourceVersion = ""
gotMap[c.Name] = c
}
if !reflect.DeepEqual(expectedMap, gotMap) {
// Print ugly but useful diff of expected and received objects for
// easier debugging.
return fmt.Errorf("Claim check failed [A-expected, B-got result]: %s", diff.ObjectDiff(expectedMap, gotMap))
}
return nil
}
// PopChange returns one recorded updated object, either *v1.PersistentVolume
// or *v1.PersistentVolumeClaim. Returns nil when there are no changes.
func (r *VolumeReactor) PopChange() interface{} {
r.lock.Lock()
defer r.lock.Unlock()
if len(r.changedObjects) == 0 {
return nil
}
// For debugging purposes, print the queue
for _, obj := range r.changedObjects {
switch obj.(type) {
case *v1.PersistentVolume:
vol, _ := obj.(*v1.PersistentVolume)
klog.V(4).Infof("reactor queue: %s", vol.Name)
case *v1.PersistentVolumeClaim:
claim, _ := obj.(*v1.PersistentVolumeClaim)
klog.V(4).Infof("reactor queue: %s", claim.Name)
}
}
// Pop the first item from the queue and return it
obj := r.changedObjects[0]
r.changedObjects = r.changedObjects[1:]
return obj
}
// SyncAll simulates the controller periodic sync of volumes and claim. It
// simply adds all these objects to the internal queue of updates. This method
// should be used when the test manually calls syncClaim/syncVolume. Test that
// use real controller loop (ctrl.Run()) will get periodic sync automatically.
func (r *VolumeReactor) SyncAll() {
r.lock.Lock()
defer r.lock.Unlock()
for _, c := range r.claims {
r.changedObjects = append(r.changedObjects, c)
}
for _, v := range r.volumes {
r.changedObjects = append(r.changedObjects, v)
}
r.changedSinceLastSync = 0
}
func (r *VolumeReactor) GetChangeCount() int {
r.lock.Lock()
defer r.lock.Unlock()
return r.changedSinceLastSync
}
// DeleteVolumeEvent simulates that a volume has been deleted in etcd and
// the controller receives 'volume deleted' event.
func (r *VolumeReactor) DeleteVolumeEvent(volume *v1.PersistentVolume) {
r.lock.Lock()
defer r.lock.Unlock()
// Remove the volume from list of resulting volumes.
delete(r.volumes, volume.Name)
// Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too).
if r.fakeVolumeWatch != nil {
r.fakeVolumeWatch.Delete(volume.DeepCopy())
}
}
// DeleteClaimEvent simulates that a claim has been deleted in etcd and the
// controller receives 'claim deleted' event.
func (r *VolumeReactor) DeleteClaimEvent(claim *v1.PersistentVolumeClaim) {
r.lock.Lock()
defer r.lock.Unlock()
// Remove the claim from list of resulting claims.
delete(r.claims, claim.Name)
// Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too).
if r.fakeClaimWatch != nil {
r.fakeClaimWatch.Delete(claim.DeepCopy())
}
}
// addVolumeEvent simulates that a volume has been added in etcd and the
// controller receives 'volume added' event.
func (r *VolumeReactor) addVolumeEvent(volume *v1.PersistentVolume) {
r.lock.Lock()
defer r.lock.Unlock()
r.volumes[volume.Name] = volume
// Generate event. No cloning is needed, this claim is not stored in the
// controller cache yet.
if r.fakeVolumeWatch != nil {
r.fakeVolumeWatch.Add(volume)
}
}
// modifyVolumeEvent simulates that a volume has been modified in etcd and the
// controller receives 'volume modified' event.
func (r *VolumeReactor) modifyVolumeEvent(volume *v1.PersistentVolume) {
r.lock.Lock()
defer r.lock.Unlock()
r.volumes[volume.Name] = volume
// Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too).
if r.fakeVolumeWatch != nil {
r.fakeVolumeWatch.Modify(volume.DeepCopy())
}
}
// AddClaimEvent simulates that a claim has been deleted in etcd and the
// controller receives 'claim added' event.
func (r *VolumeReactor) AddClaimEvent(claim *v1.PersistentVolumeClaim) {
r.lock.Lock()
defer r.lock.Unlock()
r.claims[claim.Name] = claim
// Generate event. No cloning is needed, this claim is not stored in the
// controller cache yet.
if r.fakeClaimWatch != nil {
r.fakeClaimWatch.Add(claim)
}
}
func (r *VolumeReactor) AddClaims(claims []*v1.PersistentVolumeClaim) {
r.lock.Lock()
defer r.lock.Unlock()
for _, claim := range claims {
r.claims[claim.Name] = claim
}
}
func (r *VolumeReactor) AddVolumes(volumes []*v1.PersistentVolume) {
r.lock.Lock()
defer r.lock.Unlock()
for _, volume := range volumes {
r.volumes[volume.Name] = volume
}
}
func (r *VolumeReactor) AddClaim(claim *v1.PersistentVolumeClaim) {
r.lock.Lock()
defer r.lock.Unlock()
r.claims[claim.Name] = claim
}
func (r *VolumeReactor) AddVolume(volume *v1.PersistentVolume) {
r.lock.Lock()
defer r.lock.Unlock()
r.volumes[volume.Name] = volume
}
func (r *VolumeReactor) DeleteVolume(name string) {
r.lock.Lock()
defer r.lock.Unlock()
delete(r.volumes, name)
}
func (r *VolumeReactor) AddClaimBoundToVolume(claim *v1.PersistentVolumeClaim) {
r.lock.Lock()
defer r.lock.Unlock()
r.claims[claim.Name] = claim
if volume, ok := r.volumes[claim.Spec.VolumeName]; ok {
volume.Status.Phase = v1.VolumeBound
}
}
func (r *VolumeReactor) MarkVolumeAvaiable(name string) {
r.lock.Lock()
defer r.lock.Unlock()
if volume, ok := r.volumes[name]; ok {
volume.Spec.ClaimRef = nil
volume.Status.Phase = v1.VolumeAvailable
volume.Annotations = nil
}
}
func NewVolumeReactor(client *fake.Clientset, fakeVolumeWatch, fakeClaimWatch *watch.FakeWatcher, errors []ReactorError) *VolumeReactor {
reactor := &VolumeReactor{
volumes: make(map[string]*v1.PersistentVolume),
claims: make(map[string]*v1.PersistentVolumeClaim),
fakeVolumeWatch: fakeVolumeWatch,
fakeClaimWatch: fakeClaimWatch,
errors: errors,
watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher),
}
client.AddReactor("create", "persistentvolumes", reactor.React)
client.AddReactor("create", "persistentvolumeclaims", reactor.React)
client.AddReactor("update", "persistentvolumes", reactor.React)
client.AddReactor("update", "persistentvolumeclaims", reactor.React)
client.AddReactor("get", "persistentvolumes", reactor.React)
client.AddReactor("get", "persistentvolumeclaims", reactor.React)
client.AddReactor("delete", "persistentvolumes", reactor.React)
client.AddReactor("delete", "persistentvolumeclaims", reactor.React)
return reactor
}

View File

@@ -1,103 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"fmt"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/reference"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
)
// IsDelayBindingMode checks if claim is in delay binding mode.
func IsDelayBindingMode(claim *v1.PersistentVolumeClaim, classLister storagelisters.StorageClassLister) (bool, error) {
className := v1helper.GetPersistentVolumeClaimClass(claim)
if className == "" {
return false, nil
}
class, err := classLister.Get(className)
if err != nil {
return false, nil
}
if class.VolumeBindingMode == nil {
return false, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", className)
}
return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil
}
// GetBindVolumeToClaim returns a new volume which is bound to given claim. In
// addition, it returns a bool which indicates whether we made modification on
// original volume.
func GetBindVolumeToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, bool, error) {
dirty := false
// Check if the volume was already bound (either by user or by controller)
shouldSetBoundByController := false
if !IsVolumeBoundToClaim(volume, claim) {
shouldSetBoundByController = true
}
// The volume from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
volumeClone := volume.DeepCopy()
// Bind the volume to the claim if it is not bound yet
if volume.Spec.ClaimRef == nil ||
volume.Spec.ClaimRef.Name != claim.Name ||
volume.Spec.ClaimRef.Namespace != claim.Namespace ||
volume.Spec.ClaimRef.UID != claim.UID {
claimRef, err := reference.GetReference(scheme.Scheme, claim)
if err != nil {
return nil, false, fmt.Errorf("Unexpected error getting claim reference: %v", err)
}
volumeClone.Spec.ClaimRef = claimRef
dirty = true
}
// Set annBoundByController if it is not set yet
if shouldSetBoundByController && !metav1.HasAnnotation(volumeClone.ObjectMeta, annBoundByController) {
metav1.SetMetaDataAnnotation(&volumeClone.ObjectMeta, annBoundByController, "yes")
dirty = true
}
return volumeClone, dirty, nil
}
// IsVolumeBoundToClaim returns true, if given volume is pre-bound or bound
// to specific claim. Both claim.Name and claim.Namespace must be equal.
// If claim.UID is present in volume.Spec.ClaimRef, it must be equal too.
func IsVolumeBoundToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) bool {
if volume.Spec.ClaimRef == nil {
return false
}
if claim.Name != volume.Spec.ClaimRef.Name || claim.Namespace != volume.Spec.ClaimRef.Namespace {
return false
}
if volume.Spec.ClaimRef.UID != "" && claim.UID != volume.Spec.ClaimRef.UID {
return false
}
return true
}

View File

@@ -0,0 +1,36 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["util.go"],
importpath = "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util",
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/reference:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@@ -0,0 +1,342 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"fmt"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes/scheme"
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/reference"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
const (
// AnnBindCompleted Annotation applies to PVCs. It indicates that the lifecycle
// of the PVC has passed through the initial setup. This information changes how
// we interpret some observations of the state of the objects. Value of this
// Annotation does not matter.
AnnBindCompleted = "pv.kubernetes.io/bind-completed"
// AnnBoundByController annotation applies to PVs and PVCs. It indicates that
// the binding (PV->PVC or PVC->PV) was installed by the controller. The
// absence of this annotation means the binding was done by the user (i.e.
// pre-bound). Value of this annotation does not matter.
// External PV binders must bind PV the same way as PV controller, otherwise PV
// controller may not handle it correctly.
AnnBoundByController = "pv.kubernetes.io/bound-by-controller"
// AnnSelectedNode annotation is added to a PVC that has been triggered by scheduler to
// be dynamically provisioned. Its value is the name of the selected node.
AnnSelectedNode = "volume.kubernetes.io/selected-node"
// NotSupportedProvisioner is a special provisioner name which can be set
// in storage class to indicate dynamic provisioning is not supported by
// the storage.
NotSupportedProvisioner = "kubernetes.io/no-provisioner"
)
// IsDelayBindingMode checks if claim is in delay binding mode.
func IsDelayBindingMode(claim *v1.PersistentVolumeClaim, classLister storagelisters.StorageClassLister) (bool, error) {
className := v1helper.GetPersistentVolumeClaimClass(claim)
if className == "" {
return false, nil
}
class, err := classLister.Get(className)
if err != nil {
return false, nil
}
if class.VolumeBindingMode == nil {
return false, fmt.Errorf("VolumeBindingMode not set for StorageClass %q", className)
}
return *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer, nil
}
// GetBindVolumeToClaim returns a new volume which is bound to given claim. In
// addition, it returns a bool which indicates whether we made modification on
// original volume.
func GetBindVolumeToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) (*v1.PersistentVolume, bool, error) {
dirty := false
// Check if the volume was already bound (either by user or by controller)
shouldSetBoundByController := false
if !IsVolumeBoundToClaim(volume, claim) {
shouldSetBoundByController = true
}
// The volume from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
volumeClone := volume.DeepCopy()
// Bind the volume to the claim if it is not bound yet
if volume.Spec.ClaimRef == nil ||
volume.Spec.ClaimRef.Name != claim.Name ||
volume.Spec.ClaimRef.Namespace != claim.Namespace ||
volume.Spec.ClaimRef.UID != claim.UID {
claimRef, err := reference.GetReference(scheme.Scheme, claim)
if err != nil {
return nil, false, fmt.Errorf("Unexpected error getting claim reference: %v", err)
}
volumeClone.Spec.ClaimRef = claimRef
dirty = true
}
// Set AnnBoundByController if it is not set yet
if shouldSetBoundByController && !metav1.HasAnnotation(volumeClone.ObjectMeta, AnnBoundByController) {
metav1.SetMetaDataAnnotation(&volumeClone.ObjectMeta, AnnBoundByController, "yes")
dirty = true
}
return volumeClone, dirty, nil
}
// IsVolumeBoundToClaim returns true, if given volume is pre-bound or bound
// to specific claim. Both claim.Name and claim.Namespace must be equal.
// If claim.UID is present in volume.Spec.ClaimRef, it must be equal too.
func IsVolumeBoundToClaim(volume *v1.PersistentVolume, claim *v1.PersistentVolumeClaim) bool {
if volume.Spec.ClaimRef == nil {
return false
}
if claim.Name != volume.Spec.ClaimRef.Name || claim.Namespace != volume.Spec.ClaimRef.Namespace {
return false
}
if volume.Spec.ClaimRef.UID != "" && claim.UID != volume.Spec.ClaimRef.UID {
return false
}
return true
}
// FindMatchingVolume goes through the list of volumes to find the best matching volume
// for the claim.
//
// This function is used by both the PV controller and scheduler.
//
// delayBinding is true only in the PV controller path. When set, prebound PVs are still returned
// as a match for the claim, but unbound PVs are skipped.
//
// node is set only in the scheduler path. When set, the PV node affinity is checked against
// the node's labels.
//
// excludedVolumes is only used in the scheduler path, and is needed for evaluating multiple
// unbound PVCs for a single Pod at one time. As each PVC finds a matching PV, the chosen
// PV needs to be excluded from future matching.
func FindMatchingVolume(
claim *v1.PersistentVolumeClaim,
volumes []*v1.PersistentVolume,
node *v1.Node,
excludedVolumes map[string]*v1.PersistentVolume,
delayBinding bool) (*v1.PersistentVolume, error) {
var smallestVolume *v1.PersistentVolume
var smallestVolumeQty resource.Quantity
requestedQty := claim.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
requestedClass := v1helper.GetPersistentVolumeClaimClass(claim)
var selector labels.Selector
if claim.Spec.Selector != nil {
internalSelector, err := metav1.LabelSelectorAsSelector(claim.Spec.Selector)
if err != nil {
// should be unreachable code due to validation
return nil, fmt.Errorf("error creating internal label selector for claim: %v: %v", claimToClaimKey(claim), err)
}
selector = internalSelector
}
// Go through all available volumes with two goals:
// - find a volume that is either pre-bound by user or dynamically
// provisioned for this claim. Because of this we need to loop through
// all volumes.
// - find the smallest matching one if there is no volume pre-bound to
// the claim.
for _, volume := range volumes {
if _, ok := excludedVolumes[volume.Name]; ok {
// Skip volumes in the excluded list
continue
}
volumeQty := volume.Spec.Capacity[v1.ResourceStorage]
// check if volumeModes do not match (feature gate protected)
isMismatch, err := CheckVolumeModeMismatches(&claim.Spec, &volume.Spec)
if err != nil {
return nil, fmt.Errorf("error checking if volumeMode was a mismatch: %v", err)
}
// filter out mismatching volumeModes
if isMismatch {
continue
}
// check if PV's DeletionTimeStamp is set, if so, skip this volume.
if utilfeature.DefaultFeatureGate.Enabled(features.StorageObjectInUseProtection) {
if volume.ObjectMeta.DeletionTimestamp != nil {
continue
}
}
nodeAffinityValid := true
if node != nil {
// Scheduler path, check that the PV NodeAffinity
// is satisfied by the node
err := volumeutil.CheckNodeAffinity(volume, node.Labels)
if err != nil {
nodeAffinityValid = false
}
}
if IsVolumeBoundToClaim(volume, claim) {
// this claim and volume are pre-bound; return
// the volume if the size request is satisfied,
// otherwise continue searching for a match
if volumeQty.Cmp(requestedQty) < 0 {
continue
}
// If PV node affinity is invalid, return no match.
// This means the prebound PV (and therefore PVC)
// is not suitable for this node.
if !nodeAffinityValid {
return nil, nil
}
return volume, nil
}
if node == nil && delayBinding {
// PV controller does not bind this claim.
// Scheduler will handle binding unbound volumes
// Scheduler path will have node != nil
continue
}
// filter out:
// - volumes in non-available phase
// - volumes bound to another claim
// - volumes whose labels don't match the claim's selector, if specified
// - volumes in Class that is not requested
// - volumes whose NodeAffinity does not match the node
if volume.Status.Phase != v1.VolumeAvailable {
// We ignore volumes in non-available phase, because volumes that
// satisfies matching criteria will be updated to available, binding
// them now has high chance of encountering unnecessary failures
// due to API conflicts.
continue
} else if volume.Spec.ClaimRef != nil {
continue
} else if selector != nil && !selector.Matches(labels.Set(volume.Labels)) {
continue
}
if v1helper.GetPersistentVolumeClass(volume) != requestedClass {
continue
}
if !nodeAffinityValid {
continue
}
if node != nil {
// Scheduler path
// Check that the access modes match
if !CheckAccessModes(claim, volume) {
continue
}
}
if volumeQty.Cmp(requestedQty) >= 0 {
if smallestVolume == nil || smallestVolumeQty.Cmp(volumeQty) > 0 {
smallestVolume = volume
smallestVolumeQty = volumeQty
}
}
}
if smallestVolume != nil {
// Found a matching volume
return smallestVolume, nil
}
return nil, nil
}
// CheckVolumeModeMismatches is a convenience method that checks volumeMode for PersistentVolume
// and PersistentVolumeClaims
func CheckVolumeModeMismatches(pvcSpec *v1.PersistentVolumeClaimSpec, pvSpec *v1.PersistentVolumeSpec) (bool, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
return false, nil
}
// In HA upgrades, we cannot guarantee that the apiserver is on a version >= controller-manager.
// So we default a nil volumeMode to filesystem
requestedVolumeMode := v1.PersistentVolumeFilesystem
if pvcSpec.VolumeMode != nil {
requestedVolumeMode = *pvcSpec.VolumeMode
}
pvVolumeMode := v1.PersistentVolumeFilesystem
if pvSpec.VolumeMode != nil {
pvVolumeMode = *pvSpec.VolumeMode
}
return requestedVolumeMode != pvVolumeMode, nil
}
// CheckAccessModes returns true if PV satisfies all the PVC's requested AccessModes
func CheckAccessModes(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) bool {
pvModesMap := map[v1.PersistentVolumeAccessMode]bool{}
for _, mode := range volume.Spec.AccessModes {
pvModesMap[mode] = true
}
for _, mode := range claim.Spec.AccessModes {
_, ok := pvModesMap[mode]
if !ok {
return false
}
}
return true
}
func claimToClaimKey(claim *v1.PersistentVolumeClaim) string {
return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name)
}
// GetVolumeNodeAffinity returns a VolumeNodeAffinity for given key and value.
func GetVolumeNodeAffinity(key string, value string) *v1.VolumeNodeAffinity {
return &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: key,
Operator: v1.NodeSelectorOpIn,
Values: []string{value},
},
},
},
},
},
}
}

View File

@@ -0,0 +1,76 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"scheduler_assume_cache.go",
"scheduler_bind_cache_metrics.go",
"scheduler_binder.go",
"scheduler_binder_cache.go",
"scheduler_binder_fake.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/volume/scheduling",
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/controller/volume/persistentvolume/util:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"scheduler_assume_cache_test.go",
"scheduler_binder_cache_test.go",
"scheduler_binder_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/api/testapi:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/volume/persistentvolume/testing:go_default_library",
"//pkg/controller/volume/persistentvolume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/storage/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
package scheduling
import (
"fmt"

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
package scheduling
import (
"fmt"
@@ -22,6 +22,7 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
)
func makePV(name, version, storageClass string) *v1.PersistentVolume {
@@ -456,7 +457,7 @@ func TestAssumeUpdatePVCCache(t *testing.T) {
// Assume PVC
newPVC := pvc.DeepCopy()
newPVC.Annotations[annSelectedNode] = "test-node"
newPVC.Annotations[pvutil.AnnSelectedNode] = "test-node"
if err := cache.Assume(newPVC); err != nil {
t.Fatalf("failed to assume PVC: %v", err)
}

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
package scheduling
import (
"github.com/prometheus/client_golang/prometheus"

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
package scheduling
import (
"fmt"
@@ -32,6 +32,7 @@ import (
storagelisters "k8s.io/client-go/listers/storage/v1"
"k8s.io/klog"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
@@ -211,7 +212,7 @@ func (b *volumeBinder) FindPodVolumes(pod *v1.Pod, node *v1.Node) (unboundVolume
// Filter out claims to provision
for _, claim := range claimsToBind {
if selectedNode, ok := claim.Annotations[annSelectedNode]; ok {
if selectedNode, ok := claim.Annotations[pvutil.AnnSelectedNode]; ok {
if selectedNode != node.Name {
// Fast path, skip unmatched node
return false, boundVolumesSatisfied, nil
@@ -274,7 +275,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
// Assume PV
newBindings := []*bindingInfo{}
for _, binding := range claimsToBind {
newPV, dirty, err := GetBindVolumeToClaim(binding.pv, binding.pvc)
newPV, dirty, err := pvutil.GetBindVolumeToClaim(binding.pv, binding.pvc)
klog.V(5).Infof("AssumePodVolumes: GetBindVolumeToClaim for pod %q, PV %q, PVC %q. newPV %p, dirty %v, err: %v",
podName,
binding.pv.Name,
@@ -303,7 +304,7 @@ func (b *volumeBinder) AssumePodVolumes(assumedPod *v1.Pod, nodeName string) (al
// The claims from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
claimClone := claim.DeepCopy()
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, annSelectedNode, nodeName)
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, pvutil.AnnSelectedNode, nodeName)
err = b.pvcCache.Assume(claimClone)
if err != nil {
b.revertAssumedPVs(newBindings)
@@ -511,7 +512,7 @@ func (b *volumeBinder) checkBindings(pod *v1.Pod, bindings []*bindingInfo, claim
if pvc.Annotations == nil {
return false, fmt.Errorf("selectedNode annotation reset for PVC %q", pvc.Name)
}
selectedNode := pvc.Annotations[annSelectedNode]
selectedNode := pvc.Annotations[pvutil.AnnSelectedNode]
if selectedNode != pod.Spec.NodeName {
return false, fmt.Errorf("selectedNode annotation value %q not set to scheduled node %q", selectedNode, pod.Spec.NodeName)
}
@@ -582,7 +583,7 @@ func (b *volumeBinder) isPVCBound(namespace, pvcName string) (bool, *v1.Persiste
}
func (b *volumeBinder) isPVCFullyBound(pvc *v1.PersistentVolumeClaim) bool {
return pvc.Spec.VolumeName != "" && metav1.HasAnnotation(pvc.ObjectMeta, annBindCompleted)
return pvc.Spec.VolumeName != "" && metav1.HasAnnotation(pvc.ObjectMeta, pvutil.AnnBindCompleted)
}
// arePodVolumesBound returns true if all volumes are fully bound
@@ -614,7 +615,7 @@ func (b *volumeBinder) getPodVolumes(pod *v1.Pod) (boundClaims []*v1.PersistentV
if volumeBound {
boundClaims = append(boundClaims, pvc)
} else {
delayBindingMode, err := IsDelayBindingMode(pvc, b.classLister)
delayBindingMode, err := pvutil.IsDelayBindingMode(pvc, b.classLister)
if err != nil {
return nil, nil, nil, err
}
@@ -674,7 +675,7 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*v1.Persi
pvcName := getPVCName(pvc)
// Find a matching PV
pv, err := findMatchingVolume(pvc, allPVs, node, chosenPVs, true)
pv, err := pvutil.FindMatchingVolume(pvc, allPVs, node, chosenPVs, true)
if err != nil {
return false, nil, nil, err
}
@@ -717,7 +718,7 @@ func (b *volumeBinder) checkVolumeProvisions(pod *v1.Pod, claimsToProvision []*v
return false, nil, fmt.Errorf("failed to find storage class %q", className)
}
provisioner := class.Provisioner
if provisioner == "" || provisioner == notSupportedProvisioner {
if provisioner == "" || provisioner == pvutil.NotSupportedProvisioner {
klog.V(4).Infof("storage class %q of claim %q does not support dynamic provisioning", className, pvcName)
return false, nil, nil
}
@@ -775,3 +776,7 @@ func (a byPVCSize) Less(i, j int) bool {
// return true if iSize is less than jSize
return iSize.Cmp(jSize) == -1
}
func claimToClaimKey(claim *v1.PersistentVolumeClaim) string {
return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name)
}

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
package scheduling
import (
"sync"

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
package scheduling
import (
"reflect"

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
package scheduling
import "k8s.io/api/core/v1"

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
package scheduling
import (
"context"
@@ -39,6 +39,8 @@ import (
"k8s.io/klog"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/controller"
pvtesting "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing"
pvutil "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/util"
)
var (
@@ -97,7 +99,7 @@ var (
type testEnv struct {
client clientset.Interface
reactor *volumeReactor
reactor *pvtesting.VolumeReactor
binder SchedulerVolumeBinder
internalBinder *volumeBinder
internalNodeInformer coreinformers.NodeInformer
@@ -107,7 +109,7 @@ type testEnv struct {
func newTestBinder(t *testing.T, stopCh <-chan struct{}) *testEnv {
client := &fake.Clientset{}
reactor := newVolumeReactor(client, nil, nil, nil, nil)
reactor := pvtesting.NewVolumeReactor(client, nil, nil, nil)
// TODO refactor all tests to use real watch mechanism, see #72327
client.AddWatchReactor("*", func(action k8stesting.Action) (handled bool, ret watch.Interface, err error) {
gvr := action.GetResource()
@@ -238,11 +240,11 @@ func (env *testEnv) initClaims(cachedPVCs []*v1.PersistentVolumeClaim, apiPVCs [
for _, pvc := range cachedPVCs {
internalPVCCache.add(pvc)
if apiPVCs == nil {
env.reactor.claims[pvc.Name] = pvc
env.reactor.AddClaim(pvc)
}
}
for _, pvc := range apiPVCs {
env.reactor.claims[pvc.Name] = pvc
env.reactor.AddClaim(pvc)
}
}
@@ -251,11 +253,11 @@ func (env *testEnv) initVolumes(cachedPVs []*v1.PersistentVolume, apiPVs []*v1.P
for _, pv := range cachedPVs {
internalPVCache.add(pv)
if apiPVs == nil {
env.reactor.volumes[pv.Name] = pv
env.reactor.AddVolume(pv)
}
}
for _, pv := range apiPVs {
env.reactor.volumes[pv.Name] = pv
env.reactor.AddVolume(pv)
}
}
@@ -421,8 +423,8 @@ func (env *testEnv) validateAssume(t *testing.T, name string, pod *v1.Pod, bindi
t.Errorf("Test %q failed: GetPVC %q returned error: %v", name, pvcKey, err)
continue
}
if pvc.Annotations[annSelectedNode] != nodeLabelValue {
t.Errorf("Test %q failed: expected annSelectedNode of pvc %q to be %q, but got %q", name, pvcKey, nodeLabelValue, pvc.Annotations[annSelectedNode])
if pvc.Annotations[pvutil.AnnSelectedNode] != nodeLabelValue {
t.Errorf("Test %q failed: expected pvutil.AnnSelectedNode of pvc %q to be %q, but got %q", name, pvcKey, nodeLabelValue, pvc.Annotations[pvutil.AnnSelectedNode])
}
}
}
@@ -447,8 +449,8 @@ func (env *testEnv) validateFailedAssume(t *testing.T, name string, pod *v1.Pod,
t.Errorf("Test %q failed: GetPVC %q returned error: %v", name, pvcKey, err)
continue
}
if pvc.Annotations[annSelectedNode] != "" {
t.Errorf("Test %q failed: expected annSelectedNode of pvc %q empty, but got %q", name, pvcKey, pvc.Annotations[annSelectedNode])
if pvc.Annotations[pvutil.AnnSelectedNode] != "" {
t.Errorf("Test %q failed: expected pvutil.AnnSelectedNode of pvc %q empty, but got %q", name, pvcKey, pvc.Annotations[pvutil.AnnSelectedNode])
}
}
}
@@ -476,7 +478,7 @@ func (env *testEnv) validateBind(
}
// Check reactor for API updates
if err := env.reactor.checkVolumes(expectedAPIPVs); err != nil {
if err := env.reactor.CheckVolumes(expectedAPIPVs); err != nil {
t.Errorf("Test %q failed: API reactor validation failed: %v", name, err)
}
}
@@ -504,7 +506,7 @@ func (env *testEnv) validateProvision(
}
// Check reactor for API updates
if err := env.reactor.checkClaims(expectedAPIPVCs); err != nil {
if err := env.reactor.CheckClaims(expectedAPIPVCs); err != nil {
t.Errorf("Test %q failed: API reactor validation failed: %v", name, err)
}
}
@@ -543,10 +545,10 @@ func makeTestPVC(name, size, node string, pvcBoundState int, pvName, resourceVer
switch pvcBoundState {
case pvcSelectedNode:
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annSelectedNode, node)
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnSelectedNode, node)
// don't fallthrough
case pvcBound:
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, annBindCompleted, "yes")
metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, pvutil.AnnBindCompleted, "yes")
fallthrough
case pvcPrebound:
pvc.Spec.VolumeName = pvName
@@ -595,7 +597,7 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV
},
}
if node != "" {
pv.Spec.NodeAffinity = getVolumeNodeAffinity(nodeLabelKey, node)
pv.Spec.NodeAffinity = pvutil.GetVolumeNodeAffinity(nodeLabelKey, node)
}
if boundToPVC != nil {
@@ -607,7 +609,7 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV
Namespace: boundToPVC.Namespace,
UID: boundToPVC.UID,
}
metav1.SetMetaDataAnnotation(&pv.ObjectMeta, annBoundByController, "yes")
metav1.SetMetaDataAnnotation(&pv.ObjectMeta, pvutil.AnnBoundByController, "yes")
}
return pv
@@ -615,7 +617,7 @@ func makeTestPV(name, node, capacity, version string, boundToPVC *v1.PersistentV
func pvcSetSelectedNode(pvc *v1.PersistentVolumeClaim, node string) *v1.PersistentVolumeClaim {
newPVC := pvc.DeepCopy()
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annSelectedNode, node)
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, pvutil.AnnSelectedNode, node)
return newPVC
}
@@ -691,7 +693,7 @@ func makeBinding(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) *bindin
func addProvisionAnn(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim {
res := pvc.DeepCopy()
// Add provision related annotations
metav1.SetMetaDataAnnotation(&res.ObjectMeta, annSelectedNode, nodeLabelValue)
metav1.SetMetaDataAnnotation(&res.ObjectMeta, pvutil.AnnSelectedNode, nodeLabelValue)
return res
}
@@ -1488,7 +1490,7 @@ func TestBindPodVolumes(t *testing.T) {
// Update PVC to be fully bound to PV
newPVC := pvc.DeepCopy()
newPVC.Spec.VolumeName = pv.Name
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes")
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, pvutil.AnnBindCompleted, "yes")
if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil {
t.Errorf("failed to update PVC %q: %v", newPVC.Name, err)
}
@@ -1512,7 +1514,7 @@ func TestBindPodVolumes(t *testing.T) {
return
}
newPVC.Spec.VolumeName = dynamicPV.Name
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes")
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, pvutil.AnnBindCompleted, "yes")
if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil {
t.Errorf("failed to update PVC %q: %v", newPVC.Name, err)
}
@@ -1588,7 +1590,7 @@ func TestBindPodVolumes(t *testing.T) {
// Update PVC to be fully bound to a PV with a different node
newPVC := pvcs[0].DeepCopy()
newPVC.Spec.VolumeName = pvNode2.Name
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, annBindCompleted, "yes")
metav1.SetMetaDataAnnotation(&newPVC.ObjectMeta, pvutil.AnnBindCompleted, "yes")
if _, err := testEnv.client.CoreV1().PersistentVolumeClaims(newPVC.Namespace).Update(newPVC); err != nil {
t.Errorf("failed to update PVC %q: %v", newPVC.Name, err)
}

View File

@@ -49,8 +49,7 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/controller/volume/persistentvolume:go_default_library",
"//pkg/controller/volume/scheduling:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithm/predicates:go_default_library",
"//pkg/scheduler/algorithm/priorities:go_default_library",
@@ -75,6 +74,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",

View File

@@ -10,7 +10,7 @@ go_library(
],
importpath = "k8s.io/kubernetes/pkg/scheduler/metrics",
deps = [
"//pkg/controller/volume/persistentvolume:go_default_library",
"//pkg/controller/volume/scheduling:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
],
)

View File

@@ -21,7 +21,7 @@ import (
"time"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling"
)
const (
@@ -234,7 +234,7 @@ func Register() {
prometheus.MustRegister(metric)
}
persistentvolume.RegisterVolumeSchedulingMetrics()
volumescheduling.RegisterVolumeSchedulingMetrics()
})
}

View File

@@ -37,11 +37,11 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
corelister "k8s.io/client-go/listers/core/v1"
clientcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
@@ -194,7 +194,7 @@ func TestSchedulerCreation(t *testing.T) {
informerFactory.Core().V1().Services(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets(),
informerFactory.Storage().V1().StorageClasses(),
eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}),
eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "scheduler"}),
kubeschedulerconfig.SchedulerAlgorithmSource{Provider: &testSource},
stopCh,
EmptyPluginRegistry,
@@ -302,8 +302,8 @@ func TestScheduler(t *testing.T) {
return item.sendPod
},
Framework: fwk,
Recorder: eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"}),
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "scheduler"}),
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
})
called := make(chan struct{})
events := eventBroadcaster.StartEventWatcher(func(e *v1.Event) {
@@ -679,7 +679,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
PodConditionUpdater: fakePodConditionUpdater{},
PodPreemptor: fakePodPreemptor{},
Framework: framework,
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
}
if recorder != nil {
@@ -736,7 +736,7 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
PodPreemptor: fakePodPreemptor{},
StopEverything: stop,
Framework: framework,
VolumeBinder: volumebinder.NewFakeVolumeBinder(&persistentvolume.FakeVolumeBinderConfig{AllBound: true}),
VolumeBinder: volumebinder.NewFakeVolumeBinder(&volumescheduling.FakeVolumeBinderConfig{AllBound: true}),
})
return sched, bindingChan
@@ -760,7 +760,7 @@ func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBi
predicates.CheckVolumeBindingPred: predicates.NewVolumeBindingPredicate(fakeVolumeBinder),
}
recorder := broadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "scheduler"})
s, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, informerFactory, predicateMap, recorder)
informerFactory.Start(stop)
informerFactory.WaitForCacheSync(stop)
@@ -794,11 +794,11 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
expectAssumeCalled bool
expectBindCalled bool
eventReason string
volumeBinderConfig *persistentvolume.FakeVolumeBinderConfig
volumeBinderConfig *volumescheduling.FakeVolumeBinderConfig
}{
{
name: "all bound",
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
AllBound: true,
FindUnboundSatsified: true,
FindBoundSatsified: true,
@@ -809,7 +809,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
},
{
name: "bound/invalid pv affinity",
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
AllBound: true,
FindUnboundSatsified: true,
FindBoundSatsified: false,
@@ -819,7 +819,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
},
{
name: "unbound/no matches",
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
FindUnboundSatsified: false,
FindBoundSatsified: true,
},
@@ -828,7 +828,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
},
{
name: "bound and unbound unsatisfied",
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
FindUnboundSatsified: false,
FindBoundSatsified: false,
},
@@ -837,7 +837,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
},
{
name: "unbound/found matches/bind succeeds",
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
FindUnboundSatsified: true,
FindBoundSatsified: true,
},
@@ -848,7 +848,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
},
{
name: "predicate error",
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
FindErr: findErr,
},
eventReason: "FailedScheduling",
@@ -856,7 +856,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
},
{
name: "assume error",
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
FindUnboundSatsified: true,
FindBoundSatsified: true,
AssumeErr: assumeErr,
@@ -867,7 +867,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
},
{
name: "bind error",
volumeBinderConfig: &persistentvolume.FakeVolumeBinderConfig{
volumeBinderConfig: &volumescheduling.FakeVolumeBinderConfig{
FindUnboundSatsified: true,
FindBoundSatsified: true,
BindErr: bindErr,
@@ -883,7 +883,7 @@ func TestSchedulerWithVolumeBinding(t *testing.T) {
t.Run(item.name, func(t *testing.T) {
stop := make(chan struct{})
fakeVolumeBinder := volumebinder.NewFakeVolumeBinder(item.volumeBinderConfig)
internalBinder, ok := fakeVolumeBinder.Binder.(*persistentvolume.FakeVolumeBinder)
internalBinder, ok := fakeVolumeBinder.Binder.(*volumescheduling.FakeVolumeBinder)
if !ok {
t.Fatalf("Failed to get fake volume binder")
}

View File

@@ -6,7 +6,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/scheduler/volumebinder",
visibility = ["//visibility:public"],
deps = [
"//pkg/controller/volume/persistentvolume:go_default_library",
"//pkg/controller/volume/scheduling:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/informers/storage/v1:go_default_library",

View File

@@ -23,12 +23,12 @@ import (
coreinformers "k8s.io/client-go/informers/core/v1"
storageinformers "k8s.io/client-go/informers/storage/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
volumescheduling "k8s.io/kubernetes/pkg/controller/volume/scheduling"
)
// VolumeBinder sets up the volume binding library
type VolumeBinder struct {
Binder persistentvolume.SchedulerVolumeBinder
Binder volumescheduling.SchedulerVolumeBinder
}
// NewVolumeBinder sets up the volume binding library and binding queue
@@ -41,14 +41,14 @@ func NewVolumeBinder(
bindTimeout time.Duration) *VolumeBinder {
return &VolumeBinder{
Binder: persistentvolume.NewVolumeBinder(client, nodeInformer, pvcInformer, pvInformer, storageClassInformer, bindTimeout),
Binder: volumescheduling.NewVolumeBinder(client, nodeInformer, pvcInformer, pvInformer, storageClassInformer, bindTimeout),
}
}
// NewFakeVolumeBinder sets up a fake volume binder and binding queue
func NewFakeVolumeBinder(config *persistentvolume.FakeVolumeBinderConfig) *VolumeBinder {
func NewFakeVolumeBinder(config *volumescheduling.FakeVolumeBinderConfig) *VolumeBinder {
return &VolumeBinder{
Binder: persistentvolume.NewFakeVolumeBinder(config),
Binder: volumescheduling.NewFakeVolumeBinder(config),
}
}