Refactored persistent volume controllers to new packaging structure

This commit is contained in:
markturansky
2015-08-10 21:54:48 -04:00
parent 9036f2cf82
commit 8e0d391b1e
8 changed files with 8 additions and 8 deletions

View File

@@ -0,0 +1,411 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumeclaimbinder
import (
"fmt"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// PersistentVolumeClaimBinder is a controller that synchronizes PersistentVolumeClaims.
type PersistentVolumeClaimBinder struct {
volumeIndex *persistentVolumeOrderedIndex
volumeController *framework.Controller
claimController *framework.Controller
client binderClient
stopChannels map[string]chan struct{}
lock sync.RWMutex
}
// NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder
func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time.Duration) *PersistentVolumeClaimBinder {
volumeIndex := NewPersistentVolumeOrderedIndex()
binderClient := NewBinderClient(kubeClient)
binder := &PersistentVolumeClaimBinder{
volumeIndex: volumeIndex,
client: binderClient,
}
_, volumeController := framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return kubeClient.PersistentVolumes().List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return kubeClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), resourceVersion)
},
},
&api.PersistentVolume{},
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: binder.addVolume,
UpdateFunc: binder.updateVolume,
DeleteFunc: binder.deleteVolume,
},
)
_, claimController := framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return kubeClient.PersistentVolumeClaims(api.NamespaceAll).List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return kubeClient.PersistentVolumeClaims(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), resourceVersion)
},
},
&api.PersistentVolumeClaim{},
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: binder.addClaim,
UpdateFunc: binder.updateClaim,
// no DeleteFunc needed. a claim requires no clean-up.
// syncVolume handles the missing claim
},
)
binder.claimController = claimController
binder.volumeController = volumeController
return binder
}
func (binder *PersistentVolumeClaimBinder) addVolume(obj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
volume := obj.(*api.PersistentVolume)
err := syncVolume(binder.volumeIndex, binder.client, volume)
if err != nil {
glog.Errorf("PVClaimBinder could not add volume %s: %+v", volume.Name, err)
}
}
func (binder *PersistentVolumeClaimBinder) updateVolume(oldObj, newObj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
newVolume := newObj.(*api.PersistentVolume)
binder.volumeIndex.Update(newVolume)
err := syncVolume(binder.volumeIndex, binder.client, newVolume)
if err != nil {
glog.Errorf("PVClaimBinder could not update volume %s: %+v", newVolume.Name, err)
}
}
func (binder *PersistentVolumeClaimBinder) deleteVolume(obj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
volume := obj.(*api.PersistentVolume)
binder.volumeIndex.Delete(volume)
}
func (binder *PersistentVolumeClaimBinder) addClaim(obj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
claim := obj.(*api.PersistentVolumeClaim)
err := syncClaim(binder.volumeIndex, binder.client, claim)
if err != nil {
glog.Errorf("PVClaimBinder could not add claim %s: %+v", claim.Name, err)
}
}
func (binder *PersistentVolumeClaimBinder) updateClaim(oldObj, newObj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
newClaim := newObj.(*api.PersistentVolumeClaim)
err := syncClaim(binder.volumeIndex, binder.client, newClaim)
if err != nil {
glog.Errorf("PVClaimBinder could not update claim %s: %+v", newClaim.Name, err)
}
}
func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, volume *api.PersistentVolume) (err error) {
glog.V(5).Infof("Synchronizing PersistentVolume[%s], current phase: %s\n", volume.Name, volume.Status.Phase)
// volumes can be in one of the following states:
//
// VolumePending -- default value -- not bound to a claim and not yet processed through this controller.
// VolumeAvailable -- not bound to a claim, but processed at least once and found in this controller's volumeIndex.
// VolumeBound -- bound to a claim because volume.Spec.ClaimRef != nil. Claim status may not be correct.
// VolumeReleased -- volume.Spec.ClaimRef != nil but the claim has been deleted by the user.
// VolumeFailed -- volume.Spec.ClaimRef != nil and the volume failed processing in the recycler
currentPhase := volume.Status.Phase
nextPhase := currentPhase
switch currentPhase {
// pending volumes are available only after indexing in order to be matched to claims.
case api.VolumePending:
if volume.Spec.ClaimRef != nil {
// Pending volumes that have a ClaimRef were recently recycled. The Recycler set the phase to VolumePending
// to start the volume again at the beginning of this lifecycle.
// ClaimRef is the last bind between persistent volume and claim.
// The claim has already been deleted by the user at this point
oldClaimRef := volume.Spec.ClaimRef
volume.Spec.ClaimRef = nil
_, err = binderClient.UpdatePersistentVolume(volume)
if err != nil {
// rollback on error, keep the ClaimRef until we can successfully update the volume
volume.Spec.ClaimRef = oldClaimRef
return fmt.Errorf("Unexpected error saving PersistentVolume: %+v", err)
}
}
_, exists, err := volumeIndex.Get(volume)
if err != nil {
return err
}
if !exists {
volumeIndex.Add(volume)
}
glog.V(5).Infof("PersistentVolume[%s] is now available\n", volume.Name)
nextPhase = api.VolumeAvailable
// available volumes await a claim
case api.VolumeAvailable:
// TODO: remove api.VolumePending phase altogether
_, exists, err := volumeIndex.Get(volume)
if err != nil {
return err
}
if !exists {
volumeIndex.Add(volume)
}
if volume.Spec.ClaimRef != nil {
_, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
if err == nil {
// change of phase will trigger an update event with the newly bound volume
glog.V(5).Infof("PersistentVolume[%s] is now bound\n", volume.Name)
nextPhase = api.VolumeBound
} else {
if errors.IsNotFound(err) {
nextPhase = api.VolumeReleased
}
}
}
//bound volumes require verification of their bound claims
case api.VolumeBound:
if volume.Spec.ClaimRef == nil {
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume)
} else {
_, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
if err != nil {
if errors.IsNotFound(err) {
nextPhase = api.VolumeReleased
} else {
return err
}
}
}
// released volumes require recycling
case api.VolumeReleased:
if volume.Spec.ClaimRef == nil {
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume)
} else {
// another process is watching for released volumes.
// PersistentVolumeReclaimPolicy is set per PersistentVolume
}
// volumes are removed by processes external to this binder and must be removed from the cluster
case api.VolumeFailed:
if volume.Spec.ClaimRef == nil {
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume)
} else {
glog.V(5).Infof("PersistentVolume[%s] previously failed recycling. Skipping.\n", volume.Name)
}
}
if currentPhase != nextPhase {
volume.Status.Phase = nextPhase
// a change in state will trigger another update through this controller.
// each pass through this controller evaluates current phase and decides whether or not to change to the next phase
glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", volume.Name, currentPhase, nextPhase)
volume, err := binderClient.UpdatePersistentVolumeStatus(volume)
if err != nil {
// Rollback to previous phase
volume.Status.Phase = currentPhase
}
volumeIndex.Update(volume)
}
return nil
}
func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) {
glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name)
// claims can be in one of the following states:
//
// ClaimPending -- default value -- not bound to a claim. A volume that matches the claim may not exist.
// ClaimBound -- bound to a volume. claim.Status.VolumeRef != nil
currentPhase := claim.Status.Phase
nextPhase := currentPhase
switch currentPhase {
// pending claims await a matching volume
case api.ClaimPending:
volume, err := volumeIndex.FindBestMatchForClaim(claim)
if err != nil {
return err
}
if volume == nil {
return fmt.Errorf("A volume match does not exist for persistent claim: %s", claim.Name)
}
// make a binding reference to the claim.
// triggers update of the claim in this controller, which builds claim status
claim.Spec.VolumeName = volume.Name
// TODO: make this similar to Pod's binding both with BindingREST subresource and GuaranteedUpdate helper in etcd.go
claim, err = binderClient.UpdatePersistentVolumeClaim(claim)
if err == nil {
nextPhase = api.ClaimBound
glog.V(5).Infof("PersistentVolumeClaim[%s] is bound\n", claim.Name)
} else {
// Rollback by unsetting the ClaimRef on the volume pointer.
// the volume in the index will be unbound again and ready to be matched.
claim.Spec.VolumeName = ""
// Rollback by restoring original phase to claim pointer
nextPhase = api.ClaimPending
return fmt.Errorf("Error updating volume: %+v\n", err)
}
case api.ClaimBound:
volume, err := binderClient.GetPersistentVolume(claim.Spec.VolumeName)
if err != nil {
return fmt.Errorf("Unexpected error getting persistent volume: %v\n", err)
}
if volume.Spec.ClaimRef == nil {
glog.V(5).Infof("Rebuilding bind on pv.Spec.ClaimRef\n")
claimRef, err := api.GetReference(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
}
volume.Spec.ClaimRef = claimRef
_, err = binderClient.UpdatePersistentVolume(volume)
if err != nil {
return fmt.Errorf("Unexpected error saving PersistentVolume.Status: %+v", err)
}
}
// all "actuals" are transferred from PV to PVC so the user knows what
// type of volume they actually got for their claim.
// Volumes cannot have zero AccessModes, so checking that a claim has access modes
// is sufficient to tell us if these values have already been set.
if len(claim.Status.AccessModes) == 0 {
claim.Status.Phase = api.ClaimBound
claim.Status.AccessModes = volume.Spec.AccessModes
claim.Status.Capacity = volume.Spec.Capacity
_, err := binderClient.UpdatePersistentVolumeClaimStatus(claim)
if err != nil {
return fmt.Errorf("Unexpected error saving claim status: %+v", err)
}
}
}
if currentPhase != nextPhase {
claim.Status.Phase = nextPhase
binderClient.UpdatePersistentVolumeClaimStatus(claim)
}
return nil
}
// Run starts all of this binder's control loops
func (controller *PersistentVolumeClaimBinder) Run() {
glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n")
if controller.stopChannels == nil {
controller.stopChannels = make(map[string]chan struct{})
}
if _, exists := controller.stopChannels["volumes"]; !exists {
controller.stopChannels["volumes"] = make(chan struct{})
go controller.volumeController.Run(controller.stopChannels["volumes"])
}
if _, exists := controller.stopChannels["claims"]; !exists {
controller.stopChannels["claims"] = make(chan struct{})
go controller.claimController.Run(controller.stopChannels["claims"])
}
}
// Stop gracefully shuts down this binder
func (controller *PersistentVolumeClaimBinder) Stop() {
glog.V(5).Infof("Stopping PersistentVolumeClaimBinder\n")
for name, stopChan := range controller.stopChannels {
close(stopChan)
delete(controller.stopChannels, name)
}
}
// binderClient abstracts access to PVs and PVCs
type binderClient interface {
GetPersistentVolume(name string) (*api.PersistentVolume, error)
UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error)
DeletePersistentVolume(volume *api.PersistentVolume) error
UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error)
GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error)
UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
}
func NewBinderClient(c client.Interface) binderClient {
return &realBinderClient{c}
}
type realBinderClient struct {
client client.Interface
}
func (c *realBinderClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Get(name)
}
func (c *realBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Update(volume)
}
func (c *realBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
return c.client.PersistentVolumes().Delete(volume.Name)
}
func (c *realBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().UpdateStatus(volume)
}
func (c *realBinderClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(namespace).Get(name)
}
func (c *realBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(claim.Namespace).Update(claim)
}
func (c *realBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim)
}

View File

@@ -0,0 +1,407 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumeclaimbinder
import (
"reflect"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/testclient"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/host_path"
)
func TestRunStop(t *testing.T) {
o := testclient.NewObjects(api.Scheme, api.Scheme)
client := &testclient.Fake{ReactFn: testclient.ObjectReaction(o, api.RESTMapper)}
binder := NewPersistentVolumeClaimBinder(client, 1*time.Second)
if len(binder.stopChannels) != 0 {
t.Errorf("Non-running binder should not have any stopChannels. Got %v", len(binder.stopChannels))
}
binder.Run()
if len(binder.stopChannels) != 2 {
t.Errorf("Running binder should have exactly 2 stopChannels. Got %v", len(binder.stopChannels))
}
binder.Stop()
if len(binder.stopChannels) != 0 {
t.Errorf("Non-running binder should not have any stopChannels. Got %v", len(binder.stopChannels))
}
}
func TestExampleObjects(t *testing.T) {
scenarios := map[string]struct {
expected interface{}
}{
"claims/claim-01.yaml": {
expected: &api.PersistentVolumeClaim{
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("3Gi"),
},
},
},
},
},
"claims/claim-02.yaml": {
expected: &api.PersistentVolumeClaim{
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("8Gi"),
},
},
},
},
},
"volumes/local-01.yaml": {
expected: &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: "/tmp/data01",
},
},
},
},
},
"volumes/local-02.yaml": {
expected: &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("8Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: "/tmp/data02",
},
},
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle,
},
},
},
}
for name, scenario := range scenarios {
o := testclient.NewObjects(api.Scheme, api.Scheme)
if err := testclient.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/"+name, o, api.Scheme); err != nil {
t.Fatal(err)
}
client := &testclient.Fake{ReactFn: testclient.ObjectReaction(o, api.RESTMapper)}
if reflect.TypeOf(scenario.expected) == reflect.TypeOf(&api.PersistentVolumeClaim{}) {
pvc, err := client.PersistentVolumeClaims("ns").Get("doesntmatter")
if err != nil {
t.Errorf("Error retrieving object: %v", err)
}
expected := scenario.expected.(*api.PersistentVolumeClaim)
if pvc.Spec.AccessModes[0] != expected.Spec.AccessModes[0] {
t.Errorf("Unexpected mismatch. Got %v wanted %v", pvc.Spec.AccessModes[0], expected.Spec.AccessModes[0])
}
aQty := pvc.Spec.Resources.Requests[api.ResourceStorage]
bQty := expected.Spec.Resources.Requests[api.ResourceStorage]
aSize := aQty.Value()
bSize := bQty.Value()
if aSize != bSize {
t.Errorf("Unexpected mismatch. Got %v wanted %v", aSize, bSize)
}
}
if reflect.TypeOf(scenario.expected) == reflect.TypeOf(&api.PersistentVolume{}) {
pv, err := client.PersistentVolumes().Get("doesntmatter")
if err != nil {
t.Errorf("Error retrieving object: %v", err)
}
expected := scenario.expected.(*api.PersistentVolume)
if pv.Spec.AccessModes[0] != expected.Spec.AccessModes[0] {
t.Errorf("Unexpected mismatch. Got %v wanted %v", pv.Spec.AccessModes[0], expected.Spec.AccessModes[0])
}
aQty := pv.Spec.Capacity[api.ResourceStorage]
bQty := expected.Spec.Capacity[api.ResourceStorage]
aSize := aQty.Value()
bSize := bQty.Value()
if aSize != bSize {
t.Errorf("Unexpected mismatch. Got %v wanted %v", aSize, bSize)
}
if pv.Spec.HostPath.Path != expected.Spec.HostPath.Path {
t.Errorf("Unexpected mismatch. Got %v wanted %v", pv.Spec.HostPath.Path, expected.Spec.HostPath.Path)
}
}
}
}
func TestBindingWithExamples(t *testing.T) {
api.ForTesting_ReferencesAllowBlankSelfLinks = true
o := testclient.NewObjects(api.Scheme, api.Scheme)
if err := testclient.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/claims/claim-01.yaml", o, api.Scheme); err != nil {
t.Fatal(err)
}
if err := testclient.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/volumes/local-01.yaml", o, api.Scheme); err != nil {
t.Fatal(err)
}
client := &testclient.Fake{ReactFn: testclient.ObjectReaction(o, api.RESTMapper)}
pv, err := client.PersistentVolumes().Get("any")
pv.Spec.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimRecycle
if err != nil {
t.Error("Unexpected error getting PV from client: %v", err)
}
claim, error := client.PersistentVolumeClaims("ns").Get("any")
if error != nil {
t.Errorf("Unexpected error getting PVC from client: %v", err)
}
volumeIndex := NewPersistentVolumeOrderedIndex()
mockClient := &mockBinderClient{
volume: pv,
claim: claim,
}
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler), volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
recycler := &PersistentVolumeRecycler{
kubeClient: client,
client: mockClient,
pluginMgr: plugMgr,
}
// adds the volume to the index, making the volume available
syncVolume(volumeIndex, mockClient, pv)
if pv.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase)
}
// an initial sync for a claim will bind it to an unbound volume, triggers state change
syncClaim(volumeIndex, mockClient, claim)
// state change causes another syncClaim to update statuses
syncClaim(volumeIndex, mockClient, claim)
// claim updated volume's status, causing an update and syncVolume call
syncVolume(volumeIndex, mockClient, pv)
if pv.Spec.ClaimRef == nil {
t.Errorf("Expected ClaimRef but got nil for pv.Status.ClaimRef: %+v\n", pv)
}
if pv.Status.Phase != api.VolumeBound {
t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase)
}
if claim.Status.Phase != api.ClaimBound {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
if len(claim.Status.AccessModes) != len(pv.Spec.AccessModes) {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
if claim.Status.AccessModes[0] != pv.Spec.AccessModes[0] {
t.Errorf("Expected access mode %s but got %s", claim.Status.AccessModes[0], pv.Spec.AccessModes[0])
}
// pretend the user deleted their claim
mockClient.claim = nil
syncVolume(volumeIndex, mockClient, pv)
if pv.Status.Phase != api.VolumeReleased {
t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase)
}
if pv.Spec.ClaimRef == nil {
t.Errorf("Expected non-nil ClaimRef: %+v", pv.Spec)
}
mockClient.volume = pv
// released volumes with a PersistentVolumeReclaimPolicy (recycle/delete) can have further processing
err = recycler.reclaimVolume(pv)
if err != nil {
t.Errorf("Unexpected error reclaiming volume: %+v", err)
}
if pv.Status.Phase != api.VolumePending {
t.Errorf("Expected phase %s but got %s", api.VolumePending, pv.Status.Phase)
}
// after the recycling changes the phase to Pending, the binder picks up again
// to remove any vestiges of binding and make the volume Available again
syncVolume(volumeIndex, mockClient, pv)
if pv.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, pv.Status.Phase)
}
if pv.Spec.ClaimRef != nil {
t.Errorf("Expected nil ClaimRef: %+v", pv.Spec)
}
}
func TestMissingFromIndex(t *testing.T) {
api.ForTesting_ReferencesAllowBlankSelfLinks = true
o := testclient.NewObjects(api.Scheme, api.Scheme)
if err := testclient.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/claims/claim-01.yaml", o, api.Scheme); err != nil {
t.Fatal(err)
}
if err := testclient.AddObjectsFromPath("../../../docs/user-guide/persistent-volumes/volumes/local-01.yaml", o, api.Scheme); err != nil {
t.Fatal(err)
}
client := &testclient.Fake{ReactFn: testclient.ObjectReaction(o, api.RESTMapper)}
pv, err := client.PersistentVolumes().Get("any")
if err != nil {
t.Error("Unexpected error getting PV from client: %v", err)
}
claim, error := client.PersistentVolumeClaims("ns").Get("any")
if error != nil {
t.Errorf("Unexpected error getting PVC from client: %v", err)
}
volumeIndex := NewPersistentVolumeOrderedIndex()
mockClient := &mockBinderClient{
volume: pv,
claim: claim,
}
// the default value of the PV is Pending.
// if has previously been processed by the binder, it's status in etcd would be Available.
// Only Pending volumes were being indexed and made ready for claims.
pv.Status.Phase = api.VolumeAvailable
// adds the volume to the index, making the volume available
syncVolume(volumeIndex, mockClient, pv)
if pv.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase)
}
// an initial sync for a claim will bind it to an unbound volume, triggers state change
err = syncClaim(volumeIndex, mockClient, claim)
if err != nil {
t.Fatalf("Expected Clam to be bound, instead got an error: %+v\n", err)
}
// state change causes another syncClaim to update statuses
syncClaim(volumeIndex, mockClient, claim)
// claim updated volume's status, causing an update and syncVolume call
syncVolume(volumeIndex, mockClient, pv)
if pv.Spec.ClaimRef == nil {
t.Errorf("Expected ClaimRef but got nil for pv.Status.ClaimRef: %+v\n", pv)
}
if pv.Status.Phase != api.VolumeBound {
t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase)
}
if claim.Status.Phase != api.ClaimBound {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
if len(claim.Status.AccessModes) != len(pv.Spec.AccessModes) {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
if claim.Status.AccessModes[0] != pv.Spec.AccessModes[0] {
t.Errorf("Expected access mode %s but got %s", claim.Status.AccessModes[0], pv.Spec.AccessModes[0])
}
// pretend the user deleted their claim
mockClient.claim = nil
syncVolume(volumeIndex, mockClient, pv)
if pv.Status.Phase != api.VolumeReleased {
t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase)
}
}
type mockBinderClient struct {
volume *api.PersistentVolume
claim *api.PersistentVolumeClaim
}
func (c *mockBinderClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.volume, nil
}
func (c *mockBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return volume, nil
}
func (c *mockBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
c.volume = nil
return nil
}
func (c *mockBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return volume, nil
}
func (c *mockBinderClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
if c.claim != nil {
return c.claim, nil
} else {
return nil, errors.NewNotFound("persistentVolume", name)
}
}
func (c *mockBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return claim, nil
}
func (c *mockBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return claim, nil
}
func newMockRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) {
return &mockRecycler{
path: spec.PersistentVolumeSource.HostPath.Path,
}, nil
}
type mockRecycler struct {
path string
host volume.VolumeHost
}
func (r *mockRecycler) GetPath() string {
return r.path
}
func (r *mockRecycler) Recycle() error {
// return nil means recycle passed
return nil
}

View File

@@ -0,0 +1,356 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumeclaimbinder
import (
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
)
func TestMatchVolume(t *testing.T) {
volList := NewPersistentVolumeOrderedIndex()
for _, pv := range createTestVolumes() {
volList.Add(pv)
}
scenarios := map[string]struct {
expectedMatch string
claim *api.PersistentVolumeClaim
}{
"successful-match-gce-10": {
expectedMatch: "gce-pd-10",
claim: &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "claim01",
Namespace: "myns",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadOnlyMany, api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("8G"),
},
},
},
},
},
"successful-match-nfs-5": {
expectedMatch: "nfs-5",
claim: &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "claim01",
Namespace: "myns",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadOnlyMany, api.ReadWriteOnce, api.ReadWriteMany},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("5G"),
},
},
},
},
},
"successful-skip-1g-bound-volume": {
expectedMatch: "gce-pd-5",
claim: &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "claim01",
Namespace: "myns",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadOnlyMany, api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("1G"),
},
},
},
},
},
"successful-no-match": {
expectedMatch: "",
claim: &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "claim01",
Namespace: "myns",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadOnlyMany, api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("999G"),
},
},
},
},
},
}
for name, scenario := range scenarios {
volume, err := volList.FindBestMatchForClaim(scenario.claim)
if err != nil {
t.Errorf("Unexpected error matching volume by claim: %v", err)
}
if scenario.expectedMatch != "" && volume == nil {
t.Errorf("Expected match but received nil volume for scenario: %s", name)
}
if scenario.expectedMatch != "" && volume != nil && string(volume.UID) != scenario.expectedMatch {
t.Errorf("Expected %s but got volume %s in scenario %s", scenario.expectedMatch, volume.UID, name)
}
if scenario.expectedMatch == "" && volume != nil {
t.Errorf("Unexpected match for scenario: %s", name)
}
}
}
func TestMatchingWithBoundVolumes(t *testing.T) {
volumeIndex := NewPersistentVolumeOrderedIndex()
// two similar volumes, one is bound
pv1 := &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
UID: "gce-pd-1",
Name: "gce001",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("1G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{},
},
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce, api.ReadOnlyMany},
// this one we're pretending is already bound
ClaimRef: &api.ObjectReference{UID: "abc123"},
},
}
pv2 := &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
UID: "gce-pd-2",
Name: "gce002",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("1G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{},
},
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce, api.ReadOnlyMany},
},
}
volumeIndex.Add(pv1)
volumeIndex.Add(pv2)
claim := &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: "claim01",
Namespace: "myns",
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadOnlyMany, api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("1G"),
},
},
},
}
volume, err := volumeIndex.FindBestMatchForClaim(claim)
if err != nil {
t.Fatalf("Unexpected error matching volume by claim: %v", err)
}
if volume == nil {
t.Fatalf("Unexpected nil volume. Expected %s", pv2.Name)
}
if pv2.Name != volume.Name {
t.Errorf("Expected %s but got volume %s instead", pv2.Name, volume.Name)
}
}
func TestSort(t *testing.T) {
volList := NewPersistentVolumeOrderedIndex()
for _, pv := range createTestVolumes() {
volList.Add(pv)
}
volumes, err := volList.ListByAccessModes([]api.PersistentVolumeAccessMode{api.ReadWriteOnce, api.ReadOnlyMany})
if err != nil {
t.Error("Unexpected error retrieving volumes by access modes:", err)
}
for i, expected := range []string{"gce-pd-1", "gce-pd-5", "gce-pd-10"} {
if string(volumes[i].UID) != expected {
t.Error("Incorrect ordering of persistent volumes. Expected %s but got %s", expected, volumes[i].UID)
}
}
volumes, err = volList.ListByAccessModes([]api.PersistentVolumeAccessMode{api.ReadWriteOnce, api.ReadOnlyMany, api.ReadWriteMany})
if err != nil {
t.Error("Unexpected error retrieving volumes by access modes:", err)
}
for i, expected := range []string{"nfs-1", "nfs-5", "nfs-10"} {
if string(volumes[i].UID) != expected {
t.Error("Incorrect ordering of persistent volumes. Expected %s but got %s", expected, volumes[i].UID)
}
}
}
func createTestVolumes() []*api.PersistentVolume {
// these volumes are deliberately out-of-order to test indexing and sorting
return []*api.PersistentVolume{
{
ObjectMeta: api.ObjectMeta{
UID: "gce-pd-10",
Name: "gce003",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("10G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{},
},
AccessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
api.ReadOnlyMany,
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "gce-pd-20",
Name: "gce004",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("20G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{},
},
AccessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
api.ReadOnlyMany,
},
// this one we're pretending is already bound
ClaimRef: &api.ObjectReference{UID: "def456"},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "nfs-5",
Name: "nfs002",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("5G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
Glusterfs: &api.GlusterfsVolumeSource{},
},
AccessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
api.ReadOnlyMany,
api.ReadWriteMany,
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "gce-pd-1",
Name: "gce001",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("1G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{},
},
AccessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
api.ReadOnlyMany,
},
// this one we're pretending is already bound
ClaimRef: &api.ObjectReference{UID: "abc123"},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "nfs-10",
Name: "nfs003",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("10G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
Glusterfs: &api.GlusterfsVolumeSource{},
},
AccessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
api.ReadOnlyMany,
api.ReadWriteMany,
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "gce-pd-5",
Name: "gce002",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("5G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{},
},
AccessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
api.ReadOnlyMany,
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "nfs-1",
Name: "nfs001",
},
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("1G"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
Glusterfs: &api.GlusterfsVolumeSource{},
},
AccessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
api.ReadOnlyMany,
api.ReadWriteMany,
},
},
},
}
}

View File

@@ -0,0 +1,231 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumeclaimbinder
import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/mount"
)
// PersistentVolumeRecycler is a controller that watches for PersistentVolumes that are released from their claims.
// This controller will Recycle those volumes whose reclaim policy is set to PersistentVolumeReclaimRecycle and make them
// available again for a new claim.
type PersistentVolumeRecycler struct {
volumeController *framework.Controller
stopChannel chan struct{}
client recyclerClient
kubeClient client.Interface
pluginMgr volume.VolumePluginMgr
}
// PersistentVolumeRecycler creates a new PersistentVolumeRecycler
func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Duration, plugins []volume.VolumePlugin) (*PersistentVolumeRecycler, error) {
recyclerClient := NewRecyclerClient(kubeClient)
recycler := &PersistentVolumeRecycler{
client: recyclerClient,
kubeClient: kubeClient,
}
if err := recycler.pluginMgr.InitPlugins(plugins, recycler); err != nil {
return nil, fmt.Errorf("Could not initialize volume plugins for PVClaimBinder: %+v", err)
}
_, volumeController := framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return kubeClient.PersistentVolumes().List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return kubeClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), resourceVersion)
},
},
&api.PersistentVolume{},
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pv := obj.(*api.PersistentVolume)
recycler.reclaimVolume(pv)
},
UpdateFunc: func(oldObj, newObj interface{}) {
pv := newObj.(*api.PersistentVolume)
recycler.reclaimVolume(pv)
},
},
)
recycler.volumeController = volumeController
return recycler, nil
}
func (recycler *PersistentVolumeRecycler) reclaimVolume(pv *api.PersistentVolume) error {
if pv.Status.Phase == api.VolumeReleased && pv.Spec.ClaimRef != nil {
glog.V(5).Infof("Reclaiming PersistentVolume[%s]\n", pv.Name)
latest, err := recycler.client.GetPersistentVolume(pv.Name)
if err != nil {
return fmt.Errorf("Could not find PersistentVolume %s", pv.Name)
}
if latest.Status.Phase != api.VolumeReleased {
return fmt.Errorf("PersistentVolume[%s] phase is %s, expected %s. Skipping.", pv.Name, latest.Status.Phase, api.VolumeReleased)
}
// handleRecycle blocks until completion
// TODO: allow parallel recycling operations to increase throughput
// TODO implement handleDelete in a separate PR w/ cloud volumes
switch pv.Spec.PersistentVolumeReclaimPolicy {
case api.PersistentVolumeReclaimRecycle:
err = recycler.handleRecycle(pv)
case api.PersistentVolumeReclaimRetain:
glog.V(5).Infof("Volume %s is set to retain after release. Skipping.\n", pv.Name)
default:
err = fmt.Errorf("No PersistentVolumeReclaimPolicy defined for spec: %+v", pv)
}
if err != nil {
errMsg := fmt.Sprintf("Could not recycle volume spec: %+v", err)
glog.Errorf(errMsg)
return fmt.Errorf(errMsg)
}
}
return nil
}
func (recycler *PersistentVolumeRecycler) handleRecycle(pv *api.PersistentVolume) error {
glog.V(5).Infof("Recycling PersistentVolume[%s]\n", pv.Name)
currentPhase := pv.Status.Phase
nextPhase := currentPhase
spec := volume.NewSpecFromPersistentVolume(pv, false)
plugin, err := recycler.pluginMgr.FindRecyclablePluginBySpec(spec)
if err != nil {
return fmt.Errorf("Could not find recyclable volume plugin for spec: %+v", err)
}
volRecycler, err := plugin.NewRecycler(spec)
if err != nil {
return fmt.Errorf("Could not obtain Recycler for spec: %+v", err)
}
// blocks until completion
err = volRecycler.Recycle()
if err != nil {
glog.Errorf("PersistentVolume[%s] failed recycling: %+v", err)
pv.Status.Message = fmt.Sprintf("Recycling error: %s", err)
nextPhase = api.VolumeFailed
} else {
glog.V(5).Infof("PersistentVolume[%s] successfully recycled\n", pv.Name)
nextPhase = api.VolumePending
if err != nil {
glog.Errorf("Error updating pv.Status: %+v", err)
}
}
if currentPhase != nextPhase {
glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", pv.Name, currentPhase, nextPhase)
pv.Status.Phase = nextPhase
_, err := recycler.client.UpdatePersistentVolumeStatus(pv)
if err != nil {
// Rollback to previous phase
pv.Status.Phase = currentPhase
}
}
return nil
}
// Run starts this recycler's control loops
func (recycler *PersistentVolumeRecycler) Run() {
glog.V(5).Infof("Starting PersistentVolumeRecycler\n")
if recycler.stopChannel == nil {
recycler.stopChannel = make(chan struct{})
go recycler.volumeController.Run(recycler.stopChannel)
}
}
// Stop gracefully shuts down this binder
func (recycler *PersistentVolumeRecycler) Stop() {
glog.V(5).Infof("Stopping PersistentVolumeRecycler\n")
if recycler.stopChannel != nil {
close(recycler.stopChannel)
recycler.stopChannel = nil
}
}
// recyclerClient abstracts access to PVs
type recyclerClient interface {
GetPersistentVolume(name string) (*api.PersistentVolume, error)
UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error)
UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error)
}
func NewRecyclerClient(c client.Interface) recyclerClient {
return &realRecyclerClient{c}
}
type realRecyclerClient struct {
client client.Interface
}
func (c *realRecyclerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Get(name)
}
func (c *realRecyclerClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Update(volume)
}
func (c *realRecyclerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().UpdateStatus(volume)
}
// PersistentVolumeRecycler is host to the volume plugins, but does not actually mount any volumes.
// Because no mounting is performed, most of the VolumeHost methods are not implemented.
func (f *PersistentVolumeRecycler) GetPluginDir(podUID string) string {
return ""
}
func (f *PersistentVolumeRecycler) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string {
return ""
}
func (f *PersistentVolumeRecycler) GetPodPluginDir(podUID types.UID, pluginName string) string {
return ""
}
func (f *PersistentVolumeRecycler) GetKubeClient() client.Interface {
return f.kubeClient
}
func (f *PersistentVolumeRecycler) NewWrapperBuilder(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions, mounter mount.Interface) (volume.Builder, error) {
return nil, fmt.Errorf("NewWrapperBuilder not supported by PVClaimBinder's VolumeHost implementation")
}
func (f *PersistentVolumeRecycler) NewWrapperCleaner(spec *volume.Spec, podUID types.UID, mounter mount.Interface) (volume.Cleaner, error) {
return nil, fmt.Errorf("NewWrapperCleaner not supported by PVClaimBinder's VolumeHost implementation")
}

View File

@@ -0,0 +1,141 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumeclaimbinder
import (
"fmt"
"sort"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/volume"
)
// persistentVolumeOrderedIndex is a cache.Store that keeps persistent volumes indexed by AccessModes and ordered by storage capacity.
type persistentVolumeOrderedIndex struct {
cache.Indexer
}
var _ cache.Store = &persistentVolumeOrderedIndex{} // persistentVolumeOrderedIndex is a Store
func NewPersistentVolumeOrderedIndex() *persistentVolumeOrderedIndex {
return &persistentVolumeOrderedIndex{
cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"accessmodes": accessModesIndexFunc}),
}
}
// accessModesIndexFunc is an indexing function that returns a persistent volume's AccessModes as a string
func accessModesIndexFunc(obj interface{}) ([]string, error) {
if pv, ok := obj.(*api.PersistentVolume); ok {
modes := volume.GetAccessModesAsString(pv.Spec.AccessModes)
return []string{modes}, nil
}
return []string{""}, fmt.Errorf("object is not a persistent volume: %v", obj)
}
// ListByAccessModes returns all volumes with the given set of AccessModeTypes *in order* of their storage capacity (low to high)
func (pvIndex *persistentVolumeOrderedIndex) ListByAccessModes(modes []api.PersistentVolumeAccessMode) ([]*api.PersistentVolume, error) {
pv := &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: modes,
},
}
objs, err := pvIndex.Index("accessmodes", pv)
if err != nil {
return nil, err
}
volumes := make([]*api.PersistentVolume, len(objs))
for i, obj := range objs {
volumes[i] = obj.(*api.PersistentVolume)
}
sort.Sort(byCapacity{volumes})
return volumes, nil
}
// matchPredicate is a function that indicates that a persistent volume matches another
type matchPredicate func(compareThis, toThis *api.PersistentVolume) bool
// Find returns the nearest PV from the ordered list or nil if a match is not found
func (pvIndex *persistentVolumeOrderedIndex) Find(pv *api.PersistentVolume, matchPredicate matchPredicate) (*api.PersistentVolume, error) {
volumes, err := pvIndex.ListByAccessModes(pv.Spec.AccessModes)
if err != nil {
return nil, err
}
// volumes are sorted by size but some may be bound.
// remove bound volumes for easy binary search by size
unboundVolumes := []*api.PersistentVolume{}
for _, v := range volumes {
if v.Spec.ClaimRef == nil {
unboundVolumes = append(unboundVolumes, v)
}
}
i := sort.Search(len(unboundVolumes), func(i int) bool { return matchPredicate(pv, unboundVolumes[i]) })
if i < len(unboundVolumes) {
return unboundVolumes[i], nil
}
return nil, nil
}
// FindByAccessModesAndStorageCapacity is a convenience method that calls Find w/ requisite matchPredicate for storage
func (pvIndex *persistentVolumeOrderedIndex) FindByAccessModesAndStorageCapacity(modes []api.PersistentVolumeAccessMode, qty resource.Quantity) (*api.PersistentVolume, error) {
pv := &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: modes,
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): qty,
},
},
}
return pvIndex.Find(pv, matchStorageCapacity)
}
// FindBestMatchForClaim is a convenience method that finds a volume by the claim's AccessModes and requests for Storage
func (pvIndex *persistentVolumeOrderedIndex) FindBestMatchForClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolume, error) {
return pvIndex.FindByAccessModesAndStorageCapacity(claim.Spec.AccessModes, claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)])
}
// byCapacity is used to order volumes by ascending storage size
type byCapacity struct {
volumes []*api.PersistentVolume
}
func (c byCapacity) Less(i, j int) bool {
return matchStorageCapacity(c.volumes[i], c.volumes[j])
}
func (c byCapacity) Swap(i, j int) {
c.volumes[i], c.volumes[j] = c.volumes[j], c.volumes[i]
}
func (c byCapacity) Len() int {
return len(c.volumes)
}
// matchStorageCapacity is a matchPredicate used to sort and find volumes
func matchStorageCapacity(pvA, pvB *api.PersistentVolume) bool {
aQty := pvA.Spec.Capacity[api.ResourceStorage]
bQty := pvB.Spec.Capacity[api.ResourceStorage]
aSize := aQty.Value()
bSize := bQty.Value()
return aSize <= bSize
}