Merge pull request #36922 from rkouj/refactor-operation-executor
Automatic merge from submit-queue Refactor operation_executor to make it testable **What this PR does / why we need it**: To refactor operation_executor to make it unit testable **Release note**: `NONE`
This commit is contained in:
		| @@ -124,11 +124,11 @@ func NewAttachDetachController( | ||||
| 	adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr) | ||||
| 	adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr) | ||||
| 	adc.attacherDetacher = | ||||
| 		operationexecutor.NewOperationExecutor( | ||||
| 		operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( | ||||
| 			kubeClient, | ||||
| 			&adc.volumePluginMgr, | ||||
| 			recorder, | ||||
| 			false) // flag for experimental binary check for volume mount | ||||
| 			false)) // flag for experimental binary check for volume mount | ||||
| 	adc.nodeStatusUpdater = statusupdater.NewNodeStatusUpdater( | ||||
| 		kubeClient, nodeInformer, adc.actualStateOfWorld) | ||||
| 	adc.reconciler = reconciler.NewReconciler( | ||||
|   | ||||
| @@ -49,8 +49,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) { | ||||
| 	asw := cache.NewActualStateOfWorld(volumePluginMgr) | ||||
| 	fakeKubeClient := controllervolumetesting.CreateTestClient() | ||||
| 	fakeRecorder := &record.FakeRecorder{} | ||||
| 	ad := operationexecutor.NewOperationExecutor( | ||||
| 		fakeKubeClient, volumePluginMgr, fakeRecorder, false) | ||||
| 	ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */)) | ||||
| 	nodeInformer := informers.NewNodeInformer( | ||||
| 		fakeKubeClient, resyncPeriod) | ||||
| 	nsu := statusupdater.NewNodeStatusUpdater( | ||||
| @@ -81,7 +80,7 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) { | ||||
| 	asw := cache.NewActualStateOfWorld(volumePluginMgr) | ||||
| 	fakeKubeClient := controllervolumetesting.CreateTestClient() | ||||
| 	fakeRecorder := &record.FakeRecorder{} | ||||
| 	ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder, false) | ||||
| 	ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */)) | ||||
| 	nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) | ||||
| 	reconciler := NewReconciler( | ||||
| 		reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu) | ||||
| @@ -127,7 +126,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te | ||||
| 	asw := cache.NewActualStateOfWorld(volumePluginMgr) | ||||
| 	fakeKubeClient := controllervolumetesting.CreateTestClient() | ||||
| 	fakeRecorder := &record.FakeRecorder{} | ||||
| 	ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder, false) | ||||
| 	ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */)) | ||||
| 	nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) | ||||
| 	reconciler := NewReconciler( | ||||
| 		reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu) | ||||
| @@ -194,7 +193,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test | ||||
| 	asw := cache.NewActualStateOfWorld(volumePluginMgr) | ||||
| 	fakeKubeClient := controllervolumetesting.CreateTestClient() | ||||
| 	fakeRecorder := &record.FakeRecorder{} | ||||
| 	ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder, false) | ||||
| 	ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */)) | ||||
| 	nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) | ||||
| 	reconciler := NewReconciler( | ||||
| 		reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu) | ||||
| @@ -261,7 +260,7 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate | ||||
| 	asw := cache.NewActualStateOfWorld(volumePluginMgr) | ||||
| 	fakeKubeClient := controllervolumetesting.CreateTestClient() | ||||
| 	fakeRecorder := &record.FakeRecorder{} | ||||
| 	ad := operationexecutor.NewOperationExecutor(fakeKubeClient, volumePluginMgr, fakeRecorder, false) | ||||
| 	ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */)) | ||||
| 	nsu := statusupdater.NewFakeNodeStatusUpdater(true /* returnError */) | ||||
| 	reconciler := NewReconciler( | ||||
| 		reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu) | ||||
|   | ||||
| @@ -103,7 +103,7 @@ func TestRunOnce(t *testing.T) { | ||||
| 		kb.mounter, | ||||
| 		kb.getPodsDir(), | ||||
| 		kb.recorder, | ||||
| 		false /* experimentalCheckNodeCapabilitiesBeforeMount*/) | ||||
| 		false /* experimentalCheckNodeCapabilitiesBeforeMount */) | ||||
|  | ||||
| 	kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, kb.nonMasqueradeCIDR, network.UseDefaultMTU) | ||||
| 	// TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency | ||||
|   | ||||
| @@ -60,7 +60,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) { | ||||
| 	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) | ||||
| 	kubeClient := createTestClient() | ||||
| 	fakeRecorder := &record.FakeRecorder{} | ||||
| 	oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount*/) | ||||
| 	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(kubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */)) | ||||
| 	reconciler := NewReconciler( | ||||
| 		kubeClient, | ||||
| 		false, /* controllerAttachDetachEnabled */ | ||||
| @@ -97,7 +97,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) { | ||||
| 	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) | ||||
| 	kubeClient := createTestClient() | ||||
| 	fakeRecorder := &record.FakeRecorder{} | ||||
| 	oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr, fakeRecorder, false) | ||||
| 	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(kubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */)) | ||||
| 	reconciler := NewReconciler( | ||||
| 		kubeClient, | ||||
| 		false, /* controllerAttachDetachEnabled */ | ||||
| @@ -168,7 +168,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) { | ||||
| 	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) | ||||
| 	kubeClient := createTestClient() | ||||
| 	fakeRecorder := &record.FakeRecorder{} | ||||
| 	oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr, fakeRecorder, false) | ||||
| 	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(kubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */)) | ||||
| 	reconciler := NewReconciler( | ||||
| 		kubeClient, | ||||
| 		true, /* controllerAttachDetachEnabled */ | ||||
| @@ -240,7 +240,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) { | ||||
| 	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) | ||||
| 	kubeClient := createTestClient() | ||||
| 	fakeRecorder := &record.FakeRecorder{} | ||||
| 	oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr, fakeRecorder, false) | ||||
| 	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(kubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */)) | ||||
| 	reconciler := NewReconciler( | ||||
| 		kubeClient, | ||||
| 		false, /* controllerAttachDetachEnabled */ | ||||
| @@ -323,7 +323,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { | ||||
| 	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr) | ||||
| 	kubeClient := createTestClient() | ||||
| 	fakeRecorder := &record.FakeRecorder{} | ||||
| 	oex := operationexecutor.NewOperationExecutor(kubeClient, volumePluginMgr, fakeRecorder, false) | ||||
| 	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(kubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */)) | ||||
| 	reconciler := NewReconciler( | ||||
| 		kubeClient, | ||||
| 		true, /* controllerAttachDetachEnabled */ | ||||
|   | ||||
| @@ -164,11 +164,12 @@ func NewVolumeManager( | ||||
| 		volumePluginMgr:     volumePluginMgr, | ||||
| 		desiredStateOfWorld: cache.NewDesiredStateOfWorld(volumePluginMgr), | ||||
| 		actualStateOfWorld:  cache.NewActualStateOfWorld(nodeName, volumePluginMgr), | ||||
| 		operationExecutor: operationexecutor.NewOperationExecutor( | ||||
| 		operationExecutor: operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( | ||||
| 			kubeClient, | ||||
| 			volumePluginMgr, | ||||
| 			recorder, | ||||
| 			checkNodeCapabilitiesBeforeMount), | ||||
| 		), | ||||
| 	} | ||||
|  | ||||
| 	vm.reconciler = reconciler.NewReconciler( | ||||
|   | ||||
| @@ -196,7 +196,7 @@ func newTestVolumeManager( | ||||
| 		&mount.FakeMounter{}, | ||||
| 		"", | ||||
| 		fakeRecorder, | ||||
| 		false /* experimentalCheckNodeCapabilitiesBeforeMount*/) | ||||
| 		false /* experimentalCheckNodeCapabilitiesBeforeMount */) | ||||
|  | ||||
| 	return vm, err | ||||
| } | ||||
|   | ||||
| @@ -5,11 +5,15 @@ licenses(["notice"]) | ||||
| load( | ||||
|     "@io_bazel_rules_go//go:def.bzl", | ||||
|     "go_library", | ||||
|     "go_test", | ||||
| ) | ||||
|  | ||||
| go_library( | ||||
|     name = "go_default_library", | ||||
|     srcs = ["operation_executor.go"], | ||||
|     srcs = [ | ||||
|         "operation_executor.go", | ||||
|         "operation_generator.go", | ||||
|     ], | ||||
|     tags = ["automanaged"], | ||||
|     deps = [ | ||||
|         "//pkg/api/errors:go_default_library", | ||||
| @@ -27,3 +31,16 @@ go_library( | ||||
|         "//vendor:github.com/golang/glog", | ||||
|     ], | ||||
| ) | ||||
|  | ||||
| go_test( | ||||
|     name = "go_default_test", | ||||
|     srcs = ["operation_executor_test.go"], | ||||
|     library = "go_default_library", | ||||
|     tags = ["automanaged"], | ||||
|     deps = [ | ||||
|         "//pkg/api/v1:go_default_library", | ||||
|         "//pkg/types:go_default_library", | ||||
|         "//pkg/util/mount:go_default_library", | ||||
|         "//vendor:k8s.io/client-go/_vendor/github.com/pborman/uuid", | ||||
|     ], | ||||
| ) | ||||
|   | ||||
| @@ -21,17 +21,10 @@ limitations under the License. | ||||
| package operationexecutor | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/golang/glog" | ||||
| 	"k8s.io/kubernetes/pkg/api/errors" | ||||
| 	"k8s.io/kubernetes/pkg/api/v1" | ||||
| 	metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" | ||||
| 	clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" | ||||
| 	"k8s.io/kubernetes/pkg/client/record" | ||||
| 	kevents "k8s.io/kubernetes/pkg/kubelet/events" | ||||
| 	"k8s.io/kubernetes/pkg/types" | ||||
| 	"k8s.io/kubernetes/pkg/util/mount" | ||||
| 	"k8s.io/kubernetes/pkg/volume" | ||||
| @@ -119,18 +112,12 @@ type OperationExecutor interface { | ||||
|  | ||||
| // NewOperationExecutor returns a new instance of OperationExecutor. | ||||
| func NewOperationExecutor( | ||||
| 	kubeClient clientset.Interface, | ||||
| 	volumePluginMgr *volume.VolumePluginMgr, | ||||
| 	recorder record.EventRecorder, | ||||
| 	checkNodeCapabilitiesBeforeMount bool) OperationExecutor { | ||||
| 	operationGenerator OperationGenerator) OperationExecutor { | ||||
|  | ||||
| 	return &operationExecutor{ | ||||
| 		kubeClient:      kubeClient, | ||||
| 		volumePluginMgr: volumePluginMgr, | ||||
| 		pendingOperations: nestedpendingoperations.NewNestedPendingOperations( | ||||
| 			true /* exponentialBackOffOnError */), | ||||
| 		recorder: recorder, | ||||
| 		checkNodeCapabilitiesBeforeMount: checkNodeCapabilitiesBeforeMount, | ||||
| 		operationGenerator: operationGenerator, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -361,25 +348,13 @@ type MountedVolume struct { | ||||
| } | ||||
|  | ||||
| type operationExecutor struct { | ||||
| 	// Used to fetch objects from the API server like Node in the | ||||
| 	// VerifyControllerAttachedVolume operation. | ||||
| 	kubeClient clientset.Interface | ||||
|  | ||||
| 	// volumePluginMgr is the volume plugin manager used to create volume | ||||
| 	// plugin objects. | ||||
| 	volumePluginMgr *volume.VolumePluginMgr | ||||
|  | ||||
| 	// pendingOperations keeps track of pending attach and detach operations so | ||||
| 	// multiple operations are not started on the same volume | ||||
| 	pendingOperations nestedpendingoperations.NestedPendingOperations | ||||
|  | ||||
| 	// recorder is used to record events in the API server | ||||
| 	recorder record.EventRecorder | ||||
|  | ||||
| 	// checkNodeCapabilitiesBeforeMount, if set, enables the CanMount check, | ||||
| 	// which verifies that the components (binaries, etc.) required to mount | ||||
| 	// the volume are available on the underlying node before attempting mount. | ||||
| 	checkNodeCapabilitiesBeforeMount bool | ||||
| 	// operationGenerator is an interface that provides implementations for | ||||
| 	// generating volume function | ||||
| 	operationGenerator OperationGenerator | ||||
| } | ||||
|  | ||||
| func (oe *operationExecutor) IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool { | ||||
| @@ -390,7 +365,7 @@ func (oe *operationExecutor) AttachVolume( | ||||
| 	volumeToAttach VolumeToAttach, | ||||
| 	actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { | ||||
| 	attachFunc, err := | ||||
| 		oe.generateAttachVolumeFunc(volumeToAttach, actualStateOfWorld) | ||||
| 		oe.operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -404,7 +379,7 @@ func (oe *operationExecutor) DetachVolume( | ||||
| 	verifySafeToDetach bool, | ||||
| 	actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { | ||||
| 	detachFunc, err := | ||||
| 		oe.generateDetachVolumeFunc(volumeToDetach, verifySafeToDetach, actualStateOfWorld) | ||||
| 		oe.operationGenerator.GenerateDetachVolumeFunc(volumeToDetach, verifySafeToDetach, actualStateOfWorld) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -418,7 +393,7 @@ func (oe *operationExecutor) VerifyVolumesAreAttached( | ||||
| 	nodeName types.NodeName, | ||||
| 	actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { | ||||
| 	volumesAreAttachedFunc, err := | ||||
| 		oe.generateVolumesAreAttachedFunc(attachedVolumes, nodeName, actualStateOfWorld) | ||||
| 		oe.operationGenerator.GenerateVolumesAreAttachedFunc(attachedVolumes, nodeName, actualStateOfWorld) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -430,7 +405,7 @@ func (oe *operationExecutor) MountVolume( | ||||
| 	waitForAttachTimeout time.Duration, | ||||
| 	volumeToMount VolumeToMount, | ||||
| 	actualStateOfWorld ActualStateOfWorldMounterUpdater) error { | ||||
| 	mountFunc, err := oe.generateMountVolumeFunc( | ||||
| 	mountFunc, err := oe.operationGenerator.GenerateMountVolumeFunc( | ||||
| 		waitForAttachTimeout, volumeToMount, actualStateOfWorld) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| @@ -453,7 +428,7 @@ func (oe *operationExecutor) UnmountVolume( | ||||
| 	actualStateOfWorld ActualStateOfWorldMounterUpdater) error { | ||||
|  | ||||
| 	unmountFunc, err := | ||||
| 		oe.generateUnmountVolumeFunc(volumeToUnmount, actualStateOfWorld) | ||||
| 		oe.operationGenerator.GenerateUnmountVolumeFunc(volumeToUnmount, actualStateOfWorld) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -471,7 +446,7 @@ func (oe *operationExecutor) UnmountDevice( | ||||
| 	actualStateOfWorld ActualStateOfWorldMounterUpdater, | ||||
| 	mounter mount.Interface) error { | ||||
| 	unmountDeviceFunc, err := | ||||
| 		oe.generateUnmountDeviceFunc(deviceToDetach, actualStateOfWorld, mounter) | ||||
| 		oe.operationGenerator.GenerateUnmountDeviceFunc(deviceToDetach, actualStateOfWorld, mounter) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -485,7 +460,7 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume( | ||||
| 	nodeName types.NodeName, | ||||
| 	actualStateOfWorld ActualStateOfWorldAttacherUpdater) error { | ||||
| 	verifyControllerAttachedVolumeFunc, err := | ||||
| 		oe.generateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld) | ||||
| 		oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(volumeToMount, nodeName, actualStateOfWorld) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| @@ -494,639 +469,6 @@ func (oe *operationExecutor) VerifyControllerAttachedVolume( | ||||
| 		volumeToMount.VolumeName, "" /* podName */, verifyControllerAttachedVolumeFunc) | ||||
| } | ||||
|  | ||||
| func (oe *operationExecutor) generateVolumesAreAttachedFunc( | ||||
| 	attachedVolumes []AttachedVolume, | ||||
| 	nodeName types.NodeName, | ||||
| 	actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { | ||||
|  | ||||
| 	// volumesPerPlugin maps from a volume plugin to a list of volume specs which belong | ||||
| 	// to this type of plugin | ||||
| 	volumesPerPlugin := make(map[string][]*volume.Spec) | ||||
| 	// volumeSpecMap maps from a volume spec to its unique volumeName which will be used | ||||
| 	// when calling MarkVolumeAsDetached | ||||
| 	volumeSpecMap := make(map[*volume.Spec]v1.UniqueVolumeName) | ||||
| 	// Iterate each volume spec and put them into a map index by the pluginName | ||||
| 	for _, volumeAttached := range attachedVolumes { | ||||
| 		volumePlugin, err := | ||||
| 			oe.volumePluginMgr.FindPluginBySpec(volumeAttached.VolumeSpec) | ||||
| 		if err != nil || volumePlugin == nil { | ||||
| 			glog.Errorf( | ||||
| 				"VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v", | ||||
| 				volumeAttached.VolumeName, | ||||
| 				volumeAttached.VolumeSpec.Name(), | ||||
| 				volumeAttached.NodeName, | ||||
| 				err) | ||||
| 		} | ||||
| 		volumeSpecList, pluginExists := volumesPerPlugin[volumePlugin.GetPluginName()] | ||||
| 		if !pluginExists { | ||||
| 			volumeSpecList = []*volume.Spec{} | ||||
| 		} | ||||
| 		volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec) | ||||
| 		volumesPerPlugin[volumePlugin.GetPluginName()] = volumeSpecList | ||||
| 		volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName | ||||
| 	} | ||||
|  | ||||
| 	return func() error { | ||||
|  | ||||
| 		// For each volume plugin, pass the list of volume specs to VolumesAreAttached to check | ||||
| 		// whether the volumes are still attached. | ||||
| 		for pluginName, volumesSpecs := range volumesPerPlugin { | ||||
| 			attachableVolumePlugin, err := | ||||
| 				oe.volumePluginMgr.FindAttachablePluginByName(pluginName) | ||||
| 			if err != nil || attachableVolumePlugin == nil { | ||||
| 				glog.Errorf( | ||||
| 					"VolumeAreAttached.FindAttachablePluginBySpec failed for plugin %q with: %v", | ||||
| 					pluginName, | ||||
| 					err) | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() | ||||
| 			if newAttacherErr != nil { | ||||
| 				glog.Errorf( | ||||
| 					"VolumesAreAttached failed for getting plugin %q with: %v", | ||||
| 					pluginName, | ||||
| 					newAttacherErr) | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			attached, areAttachedErr := volumeAttacher.VolumesAreAttached(volumesSpecs, nodeName) | ||||
| 			if areAttachedErr != nil { | ||||
| 				glog.Errorf( | ||||
| 					"VolumesAreAttached failed for checking on node %q with: %v", | ||||
| 					nodeName, | ||||
| 					areAttachedErr) | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			for spec, check := range attached { | ||||
| 				if !check { | ||||
| 					actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[spec], nodeName) | ||||
| 					glog.V(1).Infof("VerifyVolumesAreAttached determined volume %q (spec.Name: %q) is no longer attached to node %q, therefore it was marked as detached.", | ||||
| 						volumeSpecMap[spec], spec.Name(), nodeName) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		return nil | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (oe *operationExecutor) generateAttachVolumeFunc( | ||||
| 	volumeToAttach VolumeToAttach, | ||||
| 	actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { | ||||
| 	// Get attacher plugin | ||||
| 	attachableVolumePlugin, err := | ||||
| 		oe.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec) | ||||
| 	if err != nil || attachableVolumePlugin == nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"AttachVolume.FindAttachablePluginBySpec failed for volume %q (spec.Name: %q) from node %q with: %v", | ||||
| 			volumeToAttach.VolumeName, | ||||
| 			volumeToAttach.VolumeSpec.Name(), | ||||
| 			volumeToAttach.NodeName, | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() | ||||
| 	if newAttacherErr != nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"AttachVolume.NewAttacher failed for volume %q (spec.Name: %q) from node %q with: %v", | ||||
| 			volumeToAttach.VolumeName, | ||||
| 			volumeToAttach.VolumeSpec.Name(), | ||||
| 			volumeToAttach.NodeName, | ||||
| 			newAttacherErr) | ||||
| 	} | ||||
|  | ||||
| 	return func() error { | ||||
| 		// Execute attach | ||||
| 		devicePath, attachErr := volumeAttacher.Attach( | ||||
| 			volumeToAttach.VolumeSpec, volumeToAttach.NodeName) | ||||
|  | ||||
| 		if attachErr != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			err := fmt.Errorf( | ||||
| 				"Failed to attach volume %q on node %q with: %v", | ||||
| 				volumeToAttach.VolumeSpec.Name(), | ||||
| 				volumeToAttach.NodeName, | ||||
| 				attachErr) | ||||
| 			for _, pod := range volumeToAttach.ScheduledPods { | ||||
| 				oe.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedMountVolume, err.Error()) | ||||
| 			} | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		glog.Infof( | ||||
| 			"AttachVolume.Attach succeeded for volume %q (spec.Name: %q) from node %q.", | ||||
| 			volumeToAttach.VolumeName, | ||||
| 			volumeToAttach.VolumeSpec.Name(), | ||||
| 			volumeToAttach.NodeName) | ||||
|  | ||||
| 		// Update actual state of world | ||||
| 		addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached( | ||||
| 			v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath) | ||||
| 		if addVolumeNodeErr != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			return fmt.Errorf( | ||||
| 				"AttachVolume.MarkVolumeAsAttached failed for volume %q (spec.Name: %q) from node %q with: %v", | ||||
| 				volumeToAttach.VolumeName, | ||||
| 				volumeToAttach.VolumeSpec.Name(), | ||||
| 				volumeToAttach.NodeName, | ||||
| 				addVolumeNodeErr) | ||||
| 		} | ||||
|  | ||||
| 		return nil | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (oe *operationExecutor) generateDetachVolumeFunc( | ||||
| 	volumeToDetach AttachedVolume, | ||||
| 	verifySafeToDetach bool, | ||||
| 	actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { | ||||
| 	// Get attacher plugin | ||||
| 	attachableVolumePlugin, err := | ||||
| 		oe.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec) | ||||
| 	if err != nil || attachableVolumePlugin == nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"DetachVolume.FindAttachablePluginBySpec failed for volume %q (spec.Name: %q) from node %q with: %v", | ||||
| 			volumeToDetach.VolumeName, | ||||
| 			volumeToDetach.VolumeSpec.Name(), | ||||
| 			volumeToDetach.NodeName, | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	volumeName, err := | ||||
| 		attachableVolumePlugin.GetVolumeName(volumeToDetach.VolumeSpec) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"DetachVolume.GetVolumeName failed for volume %q (spec.Name: %q) from node %q with: %v", | ||||
| 			volumeToDetach.VolumeName, | ||||
| 			volumeToDetach.VolumeSpec.Name(), | ||||
| 			volumeToDetach.NodeName, | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	volumeDetacher, err := attachableVolumePlugin.NewDetacher() | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"DetachVolume.NewDetacher failed for volume %q (spec.Name: %q) from node %q with: %v", | ||||
| 			volumeToDetach.VolumeName, | ||||
| 			volumeToDetach.VolumeSpec.Name(), | ||||
| 			volumeToDetach.NodeName, | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	return func() error { | ||||
| 		var err error | ||||
| 		if verifySafeToDetach { | ||||
| 			err = oe.verifyVolumeIsSafeToDetach(volumeToDetach) | ||||
| 		} | ||||
| 		if err == nil { | ||||
| 			err = volumeDetacher.Detach(volumeName, volumeToDetach.NodeName) | ||||
| 		} | ||||
| 		if err != nil { | ||||
| 			// On failure, add volume back to ReportAsAttached list | ||||
| 			actualStateOfWorld.AddVolumeToReportAsAttached( | ||||
| 				volumeToDetach.VolumeName, volumeToDetach.NodeName) | ||||
| 			return fmt.Errorf( | ||||
| 				"DetachVolume.Detach failed for volume %q (spec.Name: %q) from node %q with: %v", | ||||
| 				volumeToDetach.VolumeName, | ||||
| 				volumeToDetach.VolumeSpec.Name(), | ||||
| 				volumeToDetach.NodeName, | ||||
| 				err) | ||||
| 		} | ||||
|  | ||||
| 		glog.Infof( | ||||
| 			"DetachVolume.Detach succeeded for volume %q (spec.Name: %q) from node %q.", | ||||
| 			volumeToDetach.VolumeName, | ||||
| 			volumeToDetach.VolumeSpec.Name(), | ||||
| 			volumeToDetach.NodeName) | ||||
|  | ||||
| 		// Update actual state of world | ||||
| 		actualStateOfWorld.MarkVolumeAsDetached( | ||||
| 			volumeToDetach.VolumeName, volumeToDetach.NodeName) | ||||
|  | ||||
| 		return nil | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (oe *operationExecutor) verifyVolumeIsSafeToDetach( | ||||
| 	volumeToDetach AttachedVolume) error { | ||||
| 	// Fetch current node object | ||||
| 	node, fetchErr := oe.kubeClient.Core().Nodes().Get(string(volumeToDetach.NodeName), metav1.GetOptions{}) | ||||
| 	if fetchErr != nil { | ||||
| 		if errors.IsNotFound(fetchErr) { | ||||
| 			glog.Warningf("Node %q not found on API server. DetachVolume will skip safe to detach check.", | ||||
| 				volumeToDetach.NodeName, | ||||
| 				volumeToDetach.VolumeName, | ||||
| 				volumeToDetach.VolumeSpec.Name()) | ||||
| 			return nil | ||||
| 		} | ||||
|  | ||||
| 		// On failure, return error. Caller will log and retry. | ||||
| 		return fmt.Errorf( | ||||
| 			"DetachVolume failed fetching node from API server for volume %q (spec.Name: %q) from node %q with: %v", | ||||
| 			volumeToDetach.VolumeName, | ||||
| 			volumeToDetach.VolumeSpec.Name(), | ||||
| 			volumeToDetach.NodeName, | ||||
| 			fetchErr) | ||||
| 	} | ||||
|  | ||||
| 	if node == nil { | ||||
| 		// On failure, return error. Caller will log and retry. | ||||
| 		return fmt.Errorf( | ||||
| 			"DetachVolume failed fetching node from API server for volume %q (spec.Name: %q) from node %q. Error: node object retrieved from API server is nil", | ||||
| 			volumeToDetach.VolumeName, | ||||
| 			volumeToDetach.VolumeSpec.Name(), | ||||
| 			volumeToDetach.NodeName) | ||||
| 	} | ||||
|  | ||||
| 	for _, inUseVolume := range node.Status.VolumesInUse { | ||||
| 		if inUseVolume == volumeToDetach.VolumeName { | ||||
| 			return fmt.Errorf("DetachVolume failed for volume %q (spec.Name: %q) from node %q. Error: volume is still in use by node, according to Node status", | ||||
| 				volumeToDetach.VolumeName, | ||||
| 				volumeToDetach.VolumeSpec.Name(), | ||||
| 				volumeToDetach.NodeName) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Volume is not marked as in use by node | ||||
| 	glog.Infof("Verified volume is safe to detach for volume %q (spec.Name: %q) from node %q.", | ||||
| 		volumeToDetach.VolumeName, | ||||
| 		volumeToDetach.VolumeSpec.Name(), | ||||
| 		volumeToDetach.NodeName) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (oe *operationExecutor) generateMountVolumeFunc( | ||||
| 	waitForAttachTimeout time.Duration, | ||||
| 	volumeToMount VolumeToMount, | ||||
| 	actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) { | ||||
| 	// Get mounter plugin | ||||
| 	volumePlugin, err := | ||||
| 		oe.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) | ||||
| 	if err != nil || volumePlugin == nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"MountVolume.FindPluginBySpec failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 			volumeToMount.VolumeName, | ||||
| 			volumeToMount.VolumeSpec.Name(), | ||||
| 			volumeToMount.PodName, | ||||
| 			volumeToMount.Pod.UID, | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	volumeMounter, newMounterErr := volumePlugin.NewMounter( | ||||
| 		volumeToMount.VolumeSpec, | ||||
| 		volumeToMount.Pod, | ||||
| 		volume.VolumeOptions{}) | ||||
| 	if newMounterErr != nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"MountVolume.NewMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 			volumeToMount.VolumeName, | ||||
| 			volumeToMount.VolumeSpec.Name(), | ||||
| 			volumeToMount.PodName, | ||||
| 			volumeToMount.Pod.UID, | ||||
| 			newMounterErr) | ||||
| 	} | ||||
|  | ||||
| 	// Get attacher, if possible | ||||
| 	attachableVolumePlugin, _ := | ||||
| 		oe.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec) | ||||
| 	var volumeAttacher volume.Attacher | ||||
| 	if attachableVolumePlugin != nil { | ||||
| 		volumeAttacher, _ = attachableVolumePlugin.NewAttacher() | ||||
| 	} | ||||
|  | ||||
| 	var fsGroup *int64 | ||||
| 	if volumeToMount.Pod.Spec.SecurityContext != nil && | ||||
| 		volumeToMount.Pod.Spec.SecurityContext.FSGroup != nil { | ||||
| 		fsGroup = volumeToMount.Pod.Spec.SecurityContext.FSGroup | ||||
| 	} | ||||
|  | ||||
| 	return func() error { | ||||
| 		if volumeAttacher != nil { | ||||
| 			// Wait for attachable volumes to finish attaching | ||||
| 			glog.Infof( | ||||
| 				"Entering MountVolume.WaitForAttach for volume %q (spec.Name: %q) pod %q (UID: %q) DevicePath: %q", | ||||
| 				volumeToMount.VolumeName, | ||||
| 				volumeToMount.VolumeSpec.Name(), | ||||
| 				volumeToMount.PodName, | ||||
| 				volumeToMount.Pod.UID, | ||||
| 				volumeToMount.DevicePath) | ||||
|  | ||||
| 			devicePath, err := volumeAttacher.WaitForAttach( | ||||
| 				volumeToMount.VolumeSpec, volumeToMount.DevicePath, waitForAttachTimeout) | ||||
| 			if err != nil { | ||||
| 				// On failure, return error. Caller will log and retry. | ||||
| 				return fmt.Errorf( | ||||
| 					"MountVolume.WaitForAttach failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 					volumeToMount.VolumeName, | ||||
| 					volumeToMount.VolumeSpec.Name(), | ||||
| 					volumeToMount.PodName, | ||||
| 					volumeToMount.Pod.UID, | ||||
| 					err) | ||||
| 			} | ||||
|  | ||||
| 			glog.Infof( | ||||
| 				"MountVolume.WaitForAttach succeeded for volume %q (spec.Name: %q) pod %q (UID: %q).", | ||||
| 				volumeToMount.VolumeName, | ||||
| 				volumeToMount.VolumeSpec.Name(), | ||||
| 				volumeToMount.PodName, | ||||
| 				volumeToMount.Pod.UID) | ||||
|  | ||||
| 			deviceMountPath, err := | ||||
| 				volumeAttacher.GetDeviceMountPath(volumeToMount.VolumeSpec) | ||||
| 			if err != nil { | ||||
| 				// On failure, return error. Caller will log and retry. | ||||
| 				return fmt.Errorf( | ||||
| 					"MountVolume.GetDeviceMountPath failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 					volumeToMount.VolumeName, | ||||
| 					volumeToMount.VolumeSpec.Name(), | ||||
| 					volumeToMount.PodName, | ||||
| 					volumeToMount.Pod.UID, | ||||
| 					err) | ||||
| 			} | ||||
|  | ||||
| 			// Mount device to global mount path | ||||
| 			err = volumeAttacher.MountDevice( | ||||
| 				volumeToMount.VolumeSpec, | ||||
| 				devicePath, | ||||
| 				deviceMountPath) | ||||
| 			if err != nil { | ||||
| 				// On failure, return error. Caller will log and retry. | ||||
| 				err := fmt.Errorf( | ||||
| 					"MountVolume.MountDevice failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 					volumeToMount.VolumeName, | ||||
| 					volumeToMount.VolumeSpec.Name(), | ||||
| 					volumeToMount.PodName, | ||||
| 					volumeToMount.Pod.UID, | ||||
| 					err) | ||||
| 				oe.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, err.Error()) | ||||
| 				return err | ||||
| 			} | ||||
|  | ||||
| 			glog.Infof( | ||||
| 				"MountVolume.MountDevice succeeded for volume %q (spec.Name: %q) pod %q (UID: %q) device mount path %q", | ||||
| 				volumeToMount.VolumeName, | ||||
| 				volumeToMount.VolumeSpec.Name(), | ||||
| 				volumeToMount.PodName, | ||||
| 				volumeToMount.Pod.UID, | ||||
| 				deviceMountPath) | ||||
|  | ||||
| 			// Update actual state of world to reflect volume is globally mounted | ||||
| 			markDeviceMountedErr := actualStateOfWorld.MarkDeviceAsMounted( | ||||
| 				volumeToMount.VolumeName) | ||||
| 			if markDeviceMountedErr != nil { | ||||
| 				// On failure, return error. Caller will log and retry. | ||||
| 				return fmt.Errorf( | ||||
| 					"MountVolume.MarkDeviceAsMounted failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 					volumeToMount.VolumeName, | ||||
| 					volumeToMount.VolumeSpec.Name(), | ||||
| 					volumeToMount.PodName, | ||||
| 					volumeToMount.Pod.UID, | ||||
| 					markDeviceMountedErr) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		if oe.checkNodeCapabilitiesBeforeMount { | ||||
| 			if canMountErr := volumeMounter.CanMount(); canMountErr != nil { | ||||
| 				errMsg := fmt.Sprintf("Unable to mount volume %v (spec.Name: %v) on pod %v (UID: %v). Verify that your node machine has the required components before attempting to mount this volume type. %s", volumeToMount.VolumeName, volumeToMount.VolumeSpec.Name(), volumeToMount.Pod.Name, volumeToMount.Pod.UID, canMountErr.Error()) | ||||
| 				oe.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, errMsg) | ||||
| 				glog.Errorf(errMsg) | ||||
| 				return fmt.Errorf(errMsg) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// Execute mount | ||||
| 		mountErr := volumeMounter.SetUp(fsGroup) | ||||
| 		if mountErr != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			err := fmt.Errorf( | ||||
| 				"MountVolume.SetUp failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 				volumeToMount.VolumeName, | ||||
| 				volumeToMount.VolumeSpec.Name(), | ||||
| 				volumeToMount.PodName, | ||||
| 				volumeToMount.Pod.UID, | ||||
| 				mountErr) | ||||
| 			oe.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, err.Error()) | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		glog.Infof( | ||||
| 			"MountVolume.SetUp succeeded for volume %q (spec.Name: %q) pod %q (UID: %q).", | ||||
| 			volumeToMount.VolumeName, | ||||
| 			volumeToMount.VolumeSpec.Name(), | ||||
| 			volumeToMount.PodName, | ||||
| 			volumeToMount.Pod.UID) | ||||
|  | ||||
| 		// Update actual state of world | ||||
| 		markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted( | ||||
| 			volumeToMount.PodName, | ||||
| 			volumeToMount.Pod.UID, | ||||
| 			volumeToMount.VolumeName, | ||||
| 			volumeMounter, | ||||
| 			volumeToMount.OuterVolumeSpecName, | ||||
| 			volumeToMount.VolumeGidValue) | ||||
| 		if markVolMountedErr != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			return fmt.Errorf( | ||||
| 				"MountVolume.MarkVolumeAsMounted failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 				volumeToMount.VolumeName, | ||||
| 				volumeToMount.VolumeSpec.Name(), | ||||
| 				volumeToMount.PodName, | ||||
| 				volumeToMount.Pod.UID, | ||||
| 				markVolMountedErr) | ||||
| 		} | ||||
|  | ||||
| 		return nil | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (oe *operationExecutor) generateUnmountVolumeFunc( | ||||
| 	volumeToUnmount MountedVolume, | ||||
| 	actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) { | ||||
| 	// Get mountable plugin | ||||
| 	volumePlugin, err := | ||||
| 		oe.volumePluginMgr.FindPluginByName(volumeToUnmount.PluginName) | ||||
| 	if err != nil || volumePlugin == nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"UnmountVolume.FindPluginByName failed for volume %q (volume.spec.Name: %q) pod %q (UID: %q) err=%v", | ||||
| 			volumeToUnmount.VolumeName, | ||||
| 			volumeToUnmount.OuterVolumeSpecName, | ||||
| 			volumeToUnmount.PodName, | ||||
| 			volumeToUnmount.PodUID, | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	volumeUnmounter, newUnmounterErr := volumePlugin.NewUnmounter( | ||||
| 		volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID) | ||||
| 	if newUnmounterErr != nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"UnmountVolume.NewUnmounter failed for volume %q (volume.spec.Name: %q) pod %q (UID: %q) err=%v", | ||||
| 			volumeToUnmount.VolumeName, | ||||
| 			volumeToUnmount.OuterVolumeSpecName, | ||||
| 			volumeToUnmount.PodName, | ||||
| 			volumeToUnmount.PodUID, | ||||
| 			newUnmounterErr) | ||||
| 	} | ||||
|  | ||||
| 	return func() error { | ||||
| 		// Execute unmount | ||||
| 		unmountErr := volumeUnmounter.TearDown() | ||||
| 		if unmountErr != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			return fmt.Errorf( | ||||
| 				"UnmountVolume.TearDown failed for volume %q (volume.spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 				volumeToUnmount.VolumeName, | ||||
| 				volumeToUnmount.OuterVolumeSpecName, | ||||
| 				volumeToUnmount.PodName, | ||||
| 				volumeToUnmount.PodUID, | ||||
| 				unmountErr) | ||||
| 		} | ||||
|  | ||||
| 		glog.Infof( | ||||
| 			"UnmountVolume.TearDown succeeded for volume %q (OuterVolumeSpecName: %q) pod %q (UID: %q). InnerVolumeSpecName %q. PluginName %q, VolumeGidValue %q", | ||||
| 			volumeToUnmount.VolumeName, | ||||
| 			volumeToUnmount.OuterVolumeSpecName, | ||||
| 			volumeToUnmount.PodName, | ||||
| 			volumeToUnmount.PodUID, | ||||
| 			volumeToUnmount.InnerVolumeSpecName, | ||||
| 			volumeToUnmount.PluginName, | ||||
| 			volumeToUnmount.VolumeGidValue) | ||||
|  | ||||
| 		// Update actual state of world | ||||
| 		markVolMountedErr := actualStateOfWorld.MarkVolumeAsUnmounted( | ||||
| 			volumeToUnmount.PodName, volumeToUnmount.VolumeName) | ||||
| 		if markVolMountedErr != nil { | ||||
| 			// On failure, just log and exit | ||||
| 			glog.Errorf( | ||||
| 				"UnmountVolume.MarkVolumeAsUnmounted failed for volume %q (volume.spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 				volumeToUnmount.VolumeName, | ||||
| 				volumeToUnmount.OuterVolumeSpecName, | ||||
| 				volumeToUnmount.PodName, | ||||
| 				volumeToUnmount.PodUID, | ||||
| 				markVolMountedErr) | ||||
| 		} | ||||
|  | ||||
| 		return nil | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (oe *operationExecutor) generateUnmountDeviceFunc( | ||||
| 	deviceToDetach AttachedVolume, | ||||
| 	actualStateOfWorld ActualStateOfWorldMounterUpdater, | ||||
| 	mounter mount.Interface) (func() error, error) { | ||||
| 	// Get attacher plugin | ||||
| 	attachableVolumePlugin, err := | ||||
| 		oe.volumePluginMgr.FindAttachablePluginBySpec(deviceToDetach.VolumeSpec) | ||||
| 	if err != nil || attachableVolumePlugin == nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"UnmountDevice.FindAttachablePluginBySpec failed for volume %q (spec.Name: %q) with: %v", | ||||
| 			deviceToDetach.VolumeName, | ||||
| 			deviceToDetach.VolumeSpec.Name(), | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	volumeDetacher, err := attachableVolumePlugin.NewDetacher() | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"UnmountDevice.NewDetacher failed for volume %q (spec.Name: %q) with: %v", | ||||
| 			deviceToDetach.VolumeName, | ||||
| 			deviceToDetach.VolumeSpec.Name(), | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	volumeAttacher, err := attachableVolumePlugin.NewAttacher() | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"UnmountDevice.NewAttacher failed for volume %q (spec.Name: %q) with: %v", | ||||
| 			deviceToDetach.VolumeName, | ||||
| 			deviceToDetach.VolumeSpec.Name(), | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	return func() error { | ||||
| 		deviceMountPath, err := | ||||
| 			volumeAttacher.GetDeviceMountPath(deviceToDetach.VolumeSpec) | ||||
| 		if err != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			return fmt.Errorf( | ||||
| 				"GetDeviceMountPath failed for volume %q (spec.Name: %q) with: %v", | ||||
| 				deviceToDetach.VolumeName, | ||||
| 				deviceToDetach.VolumeSpec.Name(), | ||||
| 				err) | ||||
| 		} | ||||
| 		refs, err := attachableVolumePlugin.GetDeviceMountRefs(deviceMountPath) | ||||
|  | ||||
| 		if err != nil || hasMountRefs(deviceMountPath, refs) { | ||||
| 			if err == nil { | ||||
| 				err = fmt.Errorf("The device mount path %q is still mounted by other references %v", deviceMountPath, refs) | ||||
| 			} | ||||
| 			return fmt.Errorf( | ||||
| 				"GetDeviceMountRefs check failed for volume %q (spec.Name: %q) with: %v", | ||||
| 				deviceToDetach.VolumeName, | ||||
| 				deviceToDetach.VolumeSpec.Name(), | ||||
| 				err) | ||||
| 		} | ||||
| 		// Execute unmount | ||||
| 		unmountDeviceErr := volumeDetacher.UnmountDevice(deviceMountPath) | ||||
| 		if unmountDeviceErr != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			return fmt.Errorf( | ||||
| 				"UnmountDevice failed for volume %q (spec.Name: %q) with: %v", | ||||
| 				deviceToDetach.VolumeName, | ||||
| 				deviceToDetach.VolumeSpec.Name(), | ||||
| 				unmountDeviceErr) | ||||
| 		} | ||||
| 		// Before logging that UnmountDevice succeeded and moving on, | ||||
| 		// use mounter.PathIsDevice to check if the path is a device, | ||||
| 		// if so use mounter.DeviceOpened to check if the device is in use anywhere | ||||
| 		// else on the system. Retry if it returns true. | ||||
| 		isDevicePath, devicePathErr := mounter.PathIsDevice(deviceToDetach.DevicePath) | ||||
| 		var deviceOpened bool | ||||
| 		var deviceOpenedErr error | ||||
| 		if !isDevicePath && devicePathErr == nil { | ||||
| 			// not a device path or path doesn't exist | ||||
| 			//TODO: refer to #36092 | ||||
| 			glog.V(3).Infof("Not checking device path %s", deviceToDetach.DevicePath) | ||||
| 			deviceOpened = false | ||||
| 		} else { | ||||
| 			deviceOpened, deviceOpenedErr = mounter.DeviceOpened(deviceToDetach.DevicePath) | ||||
| 			if deviceOpenedErr != nil { | ||||
| 				return fmt.Errorf( | ||||
| 					"UnmountDevice.DeviceOpened failed for volume %q (spec.Name: %q) with: %v", | ||||
| 					deviceToDetach.VolumeName, | ||||
| 					deviceToDetach.VolumeSpec.Name(), | ||||
| 					deviceOpenedErr) | ||||
| 			} | ||||
| 		} | ||||
| 		// The device is still in use elsewhere. Caller will log and retry. | ||||
| 		if deviceOpened { | ||||
| 			return fmt.Errorf( | ||||
| 				"UnmountDevice failed for volume %q (spec.Name: %q) because the device is in use when it was no longer expected to be in use", | ||||
| 				deviceToDetach.VolumeName, | ||||
| 				deviceToDetach.VolumeSpec.Name()) | ||||
| 		} | ||||
|  | ||||
| 		glog.Infof( | ||||
| 			"UnmountDevice succeeded for volume %q (spec.Name: %q).", | ||||
| 			deviceToDetach.VolumeName, | ||||
| 			deviceToDetach.VolumeSpec.Name()) | ||||
|  | ||||
| 		// Update actual state of world | ||||
| 		markDeviceUnmountedErr := actualStateOfWorld.MarkDeviceAsUnmounted( | ||||
| 			deviceToDetach.VolumeName) | ||||
| 		if markDeviceUnmountedErr != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			return fmt.Errorf( | ||||
| 				"MarkDeviceAsUnmounted failed for device %q (spec.Name: %q) with: %v", | ||||
| 				deviceToDetach.VolumeName, | ||||
| 				deviceToDetach.VolumeSpec.Name(), | ||||
| 				markDeviceUnmountedErr) | ||||
| 		} | ||||
|  | ||||
| 		return nil | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| // TODO: this is a workaround for the unmount device issue caused by gci mounter. | ||||
| // In GCI cluster, if gci mounter is used for mounting, the container started by mounter | ||||
| // script will cause additional mounts created in the container. Since these mounts are | ||||
| @@ -1144,100 +486,3 @@ func hasMountRefs(mountPath string, mountRefs []string) bool { | ||||
| 	} | ||||
| 	return count > 0 | ||||
| } | ||||
|  | ||||
| func (oe *operationExecutor) generateVerifyControllerAttachedVolumeFunc( | ||||
| 	volumeToMount VolumeToMount, | ||||
| 	nodeName types.NodeName, | ||||
| 	actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { | ||||
| 	return func() error { | ||||
| 		if !volumeToMount.PluginIsAttachable { | ||||
| 			// If the volume does not implement the attacher interface, it is | ||||
| 			// assumed to be attached and the actual state of the world is | ||||
| 			// updated accordingly. | ||||
|  | ||||
| 			addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached( | ||||
| 				volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" /* devicePath */) | ||||
| 			if addVolumeNodeErr != nil { | ||||
| 				// On failure, return error. Caller will log and retry. | ||||
| 				return fmt.Errorf( | ||||
| 					"VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 					volumeToMount.VolumeName, | ||||
| 					volumeToMount.VolumeSpec.Name(), | ||||
| 					volumeToMount.PodName, | ||||
| 					volumeToMount.Pod.UID, | ||||
| 					addVolumeNodeErr) | ||||
| 			} | ||||
|  | ||||
| 			return nil | ||||
| 		} | ||||
|  | ||||
| 		if !volumeToMount.ReportedInUse { | ||||
| 			// If the given volume has not yet been added to the list of | ||||
| 			// VolumesInUse in the node's volume status, do not proceed, return | ||||
| 			// error. Caller will log and retry. The node status is updated | ||||
| 			// periodically by kubelet, so it may take as much as 10 seconds | ||||
| 			// before this clears. | ||||
| 			// Issue #28141 to enable on demand status updates. | ||||
| 			return fmt.Errorf("Volume %q (spec.Name: %q) pod %q (UID: %q) has not yet been added to the list of VolumesInUse in the node's volume status", | ||||
| 				volumeToMount.VolumeName, | ||||
| 				volumeToMount.VolumeSpec.Name(), | ||||
| 				volumeToMount.PodName, | ||||
| 				volumeToMount.Pod.UID) | ||||
| 		} | ||||
|  | ||||
| 		// Fetch current node object | ||||
| 		node, fetchErr := oe.kubeClient.Core().Nodes().Get(string(nodeName), metav1.GetOptions{}) | ||||
| 		if fetchErr != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			return fmt.Errorf( | ||||
| 				"VerifyControllerAttachedVolume failed fetching node from API server. Volume %q (spec.Name: %q) pod %q (UID: %q). Error: %v", | ||||
| 				volumeToMount.VolumeName, | ||||
| 				volumeToMount.VolumeSpec.Name(), | ||||
| 				volumeToMount.PodName, | ||||
| 				volumeToMount.Pod.UID, | ||||
| 				fetchErr) | ||||
| 		} | ||||
|  | ||||
| 		if node == nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			return fmt.Errorf( | ||||
| 				"VerifyControllerAttachedVolume failed. Volume %q (spec.Name: %q) pod %q (UID: %q). Error: node object retrieved from API server is nil", | ||||
| 				volumeToMount.VolumeName, | ||||
| 				volumeToMount.VolumeSpec.Name(), | ||||
| 				volumeToMount.PodName, | ||||
| 				volumeToMount.Pod.UID) | ||||
| 		} | ||||
|  | ||||
| 		for _, attachedVolume := range node.Status.VolumesAttached { | ||||
| 			if attachedVolume.Name == volumeToMount.VolumeName { | ||||
| 				addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached( | ||||
| 					v1.UniqueVolumeName(""), volumeToMount.VolumeSpec, nodeName, attachedVolume.DevicePath) | ||||
| 				glog.Infof("Controller successfully attached volume %q (spec.Name: %q) pod %q (UID: %q) devicePath: %q", | ||||
| 					volumeToMount.VolumeName, | ||||
| 					volumeToMount.VolumeSpec.Name(), | ||||
| 					volumeToMount.PodName, | ||||
| 					volumeToMount.Pod.UID, | ||||
| 					attachedVolume.DevicePath) | ||||
|  | ||||
| 				if addVolumeNodeErr != nil { | ||||
| 					// On failure, return error. Caller will log and retry. | ||||
| 					return fmt.Errorf( | ||||
| 						"VerifyControllerAttachedVolume.MarkVolumeAsAttached failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 						volumeToMount.VolumeName, | ||||
| 						volumeToMount.VolumeSpec.Name(), | ||||
| 						volumeToMount.PodName, | ||||
| 						volumeToMount.Pod.UID, | ||||
| 						addVolumeNodeErr) | ||||
| 				} | ||||
| 				return nil | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// Volume not attached, return error. Caller will log and retry. | ||||
| 		return fmt.Errorf("Volume %q (spec.Name: %q) pod %q (UID: %q) is not yet attached according to node status", | ||||
| 			volumeToMount.VolumeName, | ||||
| 			volumeToMount.VolumeSpec.Name(), | ||||
| 			volumeToMount.PodName, | ||||
| 			volumeToMount.Pod.UID) | ||||
| 	}, nil | ||||
| } | ||||
|   | ||||
							
								
								
									
										226
									
								
								pkg/volume/util/operationexecutor/operation_executor_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										226
									
								
								pkg/volume/util/operationexecutor/operation_executor_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,226 @@ | ||||
| /* | ||||
| Copyright 2016 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package operationexecutor | ||||
|  | ||||
| import ( | ||||
| 	"k8s.io/client-go/_vendor/github.com/pborman/uuid" | ||||
| 	"k8s.io/kubernetes/pkg/api/v1" | ||||
| 	"k8s.io/kubernetes/pkg/types" | ||||
| 	"k8s.io/kubernetes/pkg/util/mount" | ||||
| 	"strconv" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| const numVolumesToMount = 2 | ||||
|  | ||||
| var _ OperationGenerator = &mockOperationGenerator{} | ||||
|  | ||||
| func TestOperationExecutor_MountVolume_ParallelMountForNonAttachablePlugins(t *testing.T) { | ||||
| 	// Arrange | ||||
| 	ch, quit := make(chan interface{}), make(chan interface{}) | ||||
| 	numMountOperationsStarted := 0 | ||||
| 	mopg := newMockOperationGenerator(ch, quit) | ||||
| 	oe := NewOperationExecutor(mopg) | ||||
| 	volumesToMount := make([]VolumeToMount, numVolumesToMount) | ||||
| 	secretName := "secret-volume" | ||||
| 	volumeName := v1.UniqueVolumeName(secretName) | ||||
|  | ||||
| 	// Act | ||||
| 	for i := range volumesToMount { | ||||
| 		podName := "pod-" + strconv.Itoa((i + 1)) | ||||
| 		pod := getTestPodWithSecret(podName, secretName) | ||||
| 		volumesToMount[i] = VolumeToMount{ | ||||
| 			Pod:                pod, | ||||
| 			VolumeName:         volumeName, | ||||
| 			PluginIsAttachable: false, // this field determines whether the plugin is attachable | ||||
| 			ReportedInUse:      true, | ||||
| 		} | ||||
| 		oe.MountVolume(0 /* waitForAttachTimeOut */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */) | ||||
| 	} | ||||
|  | ||||
| 	// Assert | ||||
| loop: | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-ch: | ||||
| 			numMountOperationsStarted++ | ||||
| 			if numMountOperationsStarted == numVolumesToMount { | ||||
| 				break loop | ||||
| 			} | ||||
| 		case <-time.After(5 * time.Second): | ||||
| 			t.Fatalf("Unable to start mount operations in parallel for non-attachable volumes") | ||||
| 			break loop | ||||
| 		} | ||||
| 	} | ||||
| 	close(quit) | ||||
| } | ||||
|  | ||||
| func TestOperationExecutor_MountVolume_ParallelMountForAttachablePlugins(t *testing.T) { | ||||
| 	// Arrange | ||||
| 	ch, quit := make(chan interface{}), make(chan interface{}) | ||||
| 	numMountOperationsStarted := 0 | ||||
| 	mopg := newMockOperationGenerator(ch, quit) | ||||
| 	oe := NewOperationExecutor(mopg) | ||||
| 	volumesToMount := make([]VolumeToMount, numVolumesToMount) | ||||
| 	pdName := "pd-volume" | ||||
| 	volumeName := v1.UniqueVolumeName(pdName) | ||||
|  | ||||
| 	// Act | ||||
| 	for i := range volumesToMount { | ||||
| 		podName := "pod-" + strconv.Itoa((i + 1)) | ||||
| 		pod := getTestPodWithGCEPD(podName, pdName) | ||||
| 		volumesToMount[i] = VolumeToMount{ | ||||
| 			Pod:                pod, | ||||
| 			VolumeName:         volumeName, | ||||
| 			PluginIsAttachable: true, // this field determines whether the plugin is attachable | ||||
| 			ReportedInUse:      true, | ||||
| 		} | ||||
| 		oe.MountVolume(0 /* waitForAttachTimeout */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */) | ||||
| 	} | ||||
|  | ||||
| 	// Assert | ||||
| loop: | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-ch: | ||||
| 			numMountOperationsStarted++ | ||||
| 			if numMountOperationsStarted > 1 { | ||||
| 				t.Fatalf("Mount operations should not start in parallel for attachable volumes") | ||||
| 				break loop | ||||
| 			} | ||||
| 		case <-time.After(5 * time.Second): | ||||
| 			break loop | ||||
| 		} | ||||
| 	} | ||||
| 	close(quit) | ||||
| } | ||||
|  | ||||
| type mockOperationGenerator struct { | ||||
| 	ch   chan interface{} | ||||
| 	quit chan interface{} | ||||
| } | ||||
|  | ||||
| func newMockOperationGenerator(ch, quit chan interface{}) OperationGenerator { | ||||
| 	return &mockOperationGenerator{ | ||||
| 		ch:   ch, | ||||
| 		quit: quit, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (mopg *mockOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (func() error, error) { | ||||
| 	return func() error { | ||||
| 		mopg.ch <- nil | ||||
| 		// Blocks until the assertion is complete | ||||
| 		<-mopg.quit | ||||
| 		return nil | ||||
| 	}, nil | ||||
| } | ||||
| func (mopg *mockOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) { | ||||
| 	return func() error { return nil }, nil | ||||
| } | ||||
| func (mopg *mockOperationGenerator) GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { | ||||
| 	return func() error { return nil }, nil | ||||
| } | ||||
| func (mopg *mockOperationGenerator) GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { | ||||
| 	return func() error { return nil }, nil | ||||
| } | ||||
| func (mopg *mockOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { | ||||
| 	return func() error { return nil }, nil | ||||
| } | ||||
| func (mopg *mockOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, error) { | ||||
| 	return func() error { return nil }, nil | ||||
| } | ||||
| func (mopg *mockOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { | ||||
| 	return func() error { return nil }, nil | ||||
| } | ||||
|  | ||||
| func getTestPodWithSecret(podName, secretName string) *v1.Pod { | ||||
| 	return &v1.Pod{ | ||||
| 		ObjectMeta: v1.ObjectMeta{ | ||||
| 			Name: podName, | ||||
| 			UID:  types.UID(podName), | ||||
| 		}, | ||||
| 		Spec: v1.PodSpec{ | ||||
| 			Volumes: []v1.Volume{ | ||||
| 				{ | ||||
| 					Name: secretName, | ||||
| 					VolumeSource: v1.VolumeSource{ | ||||
| 						Secret: &v1.SecretVolumeSource{ | ||||
| 							SecretName: secretName, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			Containers: []v1.Container{ | ||||
| 				{ | ||||
| 					Name:  "secret-volume-test", | ||||
| 					Image: "gcr.io/google_containers/mounttest:0.7", | ||||
| 					Args: []string{ | ||||
| 						"--file_content=/etc/secret-volume/data-1", | ||||
| 						"--file_mode=/etc/secret-volume/data-1"}, | ||||
| 					VolumeMounts: []v1.VolumeMount{ | ||||
| 						{ | ||||
| 							Name:      secretName, | ||||
| 							MountPath: "/data", | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			RestartPolicy: v1.RestartPolicyNever, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func getTestPodWithGCEPD(podName, pdName string) *v1.Pod { | ||||
| 	return &v1.Pod{ | ||||
| 		ObjectMeta: v1.ObjectMeta{ | ||||
| 			Name: podName, | ||||
| 			UID:  types.UID(podName + string(uuid.New())), | ||||
| 		}, | ||||
| 		Spec: v1.PodSpec{ | ||||
| 			Volumes: []v1.Volume{ | ||||
| 				{ | ||||
| 					Name: pdName, | ||||
| 					VolumeSource: v1.VolumeSource{ | ||||
| 						GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{ | ||||
| 							PDName:   pdName, | ||||
| 							FSType:   "ext4", | ||||
| 							ReadOnly: false, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			Containers: []v1.Container{ | ||||
| 				{ | ||||
| 					Name:  "pd-volume-test", | ||||
| 					Image: "gcr.io/google_containers/mounttest:0.7", | ||||
| 					Args: []string{ | ||||
| 						"--file_content=/etc/pd-volume/data-1", | ||||
| 					}, | ||||
| 					VolumeMounts: []v1.VolumeMount{ | ||||
| 						{ | ||||
| 							Name:      pdName, | ||||
| 							MountPath: "/data", | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			RestartPolicy: v1.RestartPolicyNever, | ||||
| 		}, | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										869
									
								
								pkg/volume/util/operationexecutor/operation_generator.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										869
									
								
								pkg/volume/util/operationexecutor/operation_generator.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,869 @@ | ||||
| /* | ||||
| Copyright 2016 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| package operationexecutor | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/golang/glog" | ||||
| 	"k8s.io/kubernetes/pkg/api/errors" | ||||
| 	"k8s.io/kubernetes/pkg/api/v1" | ||||
| 	metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" | ||||
| 	"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" | ||||
| 	"k8s.io/kubernetes/pkg/client/record" | ||||
| 	kevents "k8s.io/kubernetes/pkg/kubelet/events" | ||||
| 	"k8s.io/kubernetes/pkg/types" | ||||
| 	"k8s.io/kubernetes/pkg/util/mount" | ||||
| 	"k8s.io/kubernetes/pkg/volume" | ||||
| ) | ||||
|  | ||||
| var _ OperationGenerator = &operationGenerator{} | ||||
|  | ||||
| type operationGenerator struct { | ||||
| 	// Used to fetch objects from the API server like Node in the | ||||
| 	// VerifyControllerAttachedVolume operation. | ||||
| 	kubeClient clientset.Interface | ||||
|  | ||||
| 	// volumePluginMgr is the volume plugin manager used to create volume | ||||
| 	// plugin objects. | ||||
| 	volumePluginMgr *volume.VolumePluginMgr | ||||
|  | ||||
| 	// recorder is used to record events in the API server | ||||
| 	recorder record.EventRecorder | ||||
|  | ||||
| 	// checkNodeCapabilitiesBeforeMount, if set, enables the CanMount check, | ||||
| 	// which verifies that the components (binaries, etc.) required to mount | ||||
| 	// the volume are available on the underlying node before attempting mount. | ||||
| 	checkNodeCapabilitiesBeforeMount bool | ||||
| } | ||||
|  | ||||
| // NewOperationGenerator is returns instance of operationGenerator | ||||
| func NewOperationGenerator(kubeClient clientset.Interface, | ||||
| 	volumePluginMgr *volume.VolumePluginMgr, | ||||
| 	recorder record.EventRecorder, | ||||
| 	checkNodeCapabilitiesBeforeMount bool) OperationGenerator { | ||||
|  | ||||
| 	return &operationGenerator{ | ||||
| 		kubeClient:      kubeClient, | ||||
| 		volumePluginMgr: volumePluginMgr, | ||||
| 		recorder:        recorder, | ||||
| 		checkNodeCapabilitiesBeforeMount: checkNodeCapabilitiesBeforeMount, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable | ||||
| type OperationGenerator interface { | ||||
| 	// Generates the MountVolume function needed to perform the mount of a volume plugin | ||||
| 	GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (func() error, error) | ||||
|  | ||||
| 	// Generates the UnmountVolume function needed to perform the unmount of a volume plugin | ||||
| 	GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) | ||||
|  | ||||
| 	// Generates the AttachVolume function needed to perform attach of a volume plugin | ||||
| 	GenerateAttachVolumeFunc(volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) | ||||
|  | ||||
| 	// Generates the DetachVolume function needed to perform the detach of a volume plugin | ||||
| 	GenerateDetachVolumeFunc(volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) | ||||
|  | ||||
| 	// Generates the VolumesAreAttached function needed to verify if volume plugins are attached | ||||
| 	GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) | ||||
|  | ||||
| 	// Generates the UnMountDevice function needed to perform the unmount of a device | ||||
| 	GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter mount.Interface) (func() error, error) | ||||
|  | ||||
| 	// Generates the function needed to check if the attach_detach controller has attached the volume plugin | ||||
| 	GenerateVerifyControllerAttachedVolumeFunc(volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) | ||||
| } | ||||
|  | ||||
| func (og *operationGenerator) GenerateVolumesAreAttachedFunc( | ||||
| 	attachedVolumes []AttachedVolume, | ||||
| 	nodeName types.NodeName, | ||||
| 	actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { | ||||
|  | ||||
| 	// volumesPerPlugin maps from a volume plugin to a list of volume specs which belong | ||||
| 	// to this type of plugin | ||||
| 	volumesPerPlugin := make(map[string][]*volume.Spec) | ||||
| 	// volumeSpecMap maps from a volume spec to its unique volumeName which will be used | ||||
| 	// when calling MarkVolumeAsDetached | ||||
| 	volumeSpecMap := make(map[*volume.Spec]v1.UniqueVolumeName) | ||||
| 	// Iterate each volume spec and put them into a map index by the pluginName | ||||
| 	for _, volumeAttached := range attachedVolumes { | ||||
| 		volumePlugin, err := | ||||
| 			og.volumePluginMgr.FindPluginBySpec(volumeAttached.VolumeSpec) | ||||
| 		if err != nil || volumePlugin == nil { | ||||
| 			glog.Errorf( | ||||
| 				"VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v", | ||||
| 				volumeAttached.VolumeName, | ||||
| 				volumeAttached.VolumeSpec.Name(), | ||||
| 				volumeAttached.NodeName, | ||||
| 				err) | ||||
| 		} | ||||
| 		volumeSpecList, pluginExists := volumesPerPlugin[volumePlugin.GetPluginName()] | ||||
| 		if !pluginExists { | ||||
| 			volumeSpecList = []*volume.Spec{} | ||||
| 		} | ||||
| 		volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec) | ||||
| 		volumesPerPlugin[volumePlugin.GetPluginName()] = volumeSpecList | ||||
| 		volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName | ||||
| 	} | ||||
|  | ||||
| 	return func() error { | ||||
|  | ||||
| 		// For each volume plugin, pass the list of volume specs to VolumesAreAttached to check | ||||
| 		// whether the volumes are still attached. | ||||
| 		for pluginName, volumesSpecs := range volumesPerPlugin { | ||||
| 			attachableVolumePlugin, err := | ||||
| 				og.volumePluginMgr.FindAttachablePluginByName(pluginName) | ||||
| 			if err != nil || attachableVolumePlugin == nil { | ||||
| 				glog.Errorf( | ||||
| 					"VolumeAreAttached.FindAttachablePluginBySpec failed for plugin %q with: %v", | ||||
| 					pluginName, | ||||
| 					err) | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() | ||||
| 			if newAttacherErr != nil { | ||||
| 				glog.Errorf( | ||||
| 					"VolumesAreAttached failed for getting plugin %q with: %v", | ||||
| 					pluginName, | ||||
| 					newAttacherErr) | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			attached, areAttachedErr := volumeAttacher.VolumesAreAttached(volumesSpecs, nodeName) | ||||
| 			if areAttachedErr != nil { | ||||
| 				glog.Errorf( | ||||
| 					"VolumesAreAttached failed for checking on node %q with: %v", | ||||
| 					nodeName, | ||||
| 					areAttachedErr) | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			for spec, check := range attached { | ||||
| 				if !check { | ||||
| 					actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[spec], nodeName) | ||||
| 					glog.V(1).Infof("VerifyVolumesAreAttached determined volume %q (spec.Name: %q) is no longer attached to node %q, therefore it was marked as detached.", | ||||
| 						volumeSpecMap[spec], spec.Name(), nodeName) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		return nil | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (og *operationGenerator) GenerateAttachVolumeFunc( | ||||
| 	volumeToAttach VolumeToAttach, | ||||
| 	actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { | ||||
| 	// Get attacher plugin | ||||
| 	attachableVolumePlugin, err := | ||||
| 		og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec) | ||||
| 	if err != nil || attachableVolumePlugin == nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"AttachVolume.FindAttachablePluginBySpec failed for volume %q (spec.Name: %q) from node %q with: %v", | ||||
| 			volumeToAttach.VolumeName, | ||||
| 			volumeToAttach.VolumeSpec.Name(), | ||||
| 			volumeToAttach.NodeName, | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher() | ||||
| 	if newAttacherErr != nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"AttachVolume.NewAttacher failed for volume %q (spec.Name: %q) from node %q with: %v", | ||||
| 			volumeToAttach.VolumeName, | ||||
| 			volumeToAttach.VolumeSpec.Name(), | ||||
| 			volumeToAttach.NodeName, | ||||
| 			newAttacherErr) | ||||
| 	} | ||||
|  | ||||
| 	return func() error { | ||||
| 		// Execute attach | ||||
| 		devicePath, attachErr := volumeAttacher.Attach( | ||||
| 			volumeToAttach.VolumeSpec, volumeToAttach.NodeName) | ||||
|  | ||||
| 		if attachErr != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			err := fmt.Errorf( | ||||
| 				"Failed to attach volume %q on node %q with: %v", | ||||
| 				volumeToAttach.VolumeSpec.Name(), | ||||
| 				volumeToAttach.NodeName, | ||||
| 				attachErr) | ||||
| 			for _, pod := range volumeToAttach.ScheduledPods { | ||||
| 				og.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedMountVolume, err.Error()) | ||||
| 			} | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		glog.Infof( | ||||
| 			"AttachVolume.Attach succeeded for volume %q (spec.Name: %q) from node %q.", | ||||
| 			volumeToAttach.VolumeName, | ||||
| 			volumeToAttach.VolumeSpec.Name(), | ||||
| 			volumeToAttach.NodeName) | ||||
|  | ||||
| 		// Update actual state of world | ||||
| 		addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached( | ||||
| 			v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath) | ||||
| 		if addVolumeNodeErr != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			return fmt.Errorf( | ||||
| 				"AttachVolume.MarkVolumeAsAttached failed for volume %q (spec.Name: %q) from node %q with: %v", | ||||
| 				volumeToAttach.VolumeName, | ||||
| 				volumeToAttach.VolumeSpec.Name(), | ||||
| 				volumeToAttach.NodeName, | ||||
| 				addVolumeNodeErr) | ||||
| 		} | ||||
|  | ||||
| 		return nil | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (og *operationGenerator) GenerateDetachVolumeFunc( | ||||
| 	volumeToDetach AttachedVolume, | ||||
| 	verifySafeToDetach bool, | ||||
| 	actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { | ||||
| 	// Get attacher plugin | ||||
| 	attachableVolumePlugin, err := | ||||
| 		og.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec) | ||||
| 	if err != nil || attachableVolumePlugin == nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"DetachVolume.FindAttachablePluginBySpec failed for volume %q (spec.Name: %q) from node %q with: %v", | ||||
| 			volumeToDetach.VolumeName, | ||||
| 			volumeToDetach.VolumeSpec.Name(), | ||||
| 			volumeToDetach.NodeName, | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	volumeName, err := | ||||
| 		attachableVolumePlugin.GetVolumeName(volumeToDetach.VolumeSpec) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"DetachVolume.GetVolumeName failed for volume %q (spec.Name: %q) from node %q with: %v", | ||||
| 			volumeToDetach.VolumeName, | ||||
| 			volumeToDetach.VolumeSpec.Name(), | ||||
| 			volumeToDetach.NodeName, | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	volumeDetacher, err := attachableVolumePlugin.NewDetacher() | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"DetachVolume.NewDetacher failed for volume %q (spec.Name: %q) from node %q with: %v", | ||||
| 			volumeToDetach.VolumeName, | ||||
| 			volumeToDetach.VolumeSpec.Name(), | ||||
| 			volumeToDetach.NodeName, | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	return func() error { | ||||
| 		var err error | ||||
| 		if verifySafeToDetach { | ||||
| 			err = og.verifyVolumeIsSafeToDetach(volumeToDetach) | ||||
| 		} | ||||
| 		if err == nil { | ||||
| 			err = volumeDetacher.Detach(volumeName, volumeToDetach.NodeName) | ||||
| 		} | ||||
| 		if err != nil { | ||||
| 			// On failure, add volume back to ReportAsAttached list | ||||
| 			actualStateOfWorld.AddVolumeToReportAsAttached( | ||||
| 				volumeToDetach.VolumeName, volumeToDetach.NodeName) | ||||
| 			return fmt.Errorf( | ||||
| 				"DetachVolume.Detach failed for volume %q (spec.Name: %q) from node %q with: %v", | ||||
| 				volumeToDetach.VolumeName, | ||||
| 				volumeToDetach.VolumeSpec.Name(), | ||||
| 				volumeToDetach.NodeName, | ||||
| 				err) | ||||
| 		} | ||||
|  | ||||
| 		glog.Infof( | ||||
| 			"DetachVolume.Detach succeeded for volume %q (spec.Name: %q) from node %q.", | ||||
| 			volumeToDetach.VolumeName, | ||||
| 			volumeToDetach.VolumeSpec.Name(), | ||||
| 			volumeToDetach.NodeName) | ||||
|  | ||||
| 		// Update actual state of world | ||||
| 		actualStateOfWorld.MarkVolumeAsDetached( | ||||
| 			volumeToDetach.VolumeName, volumeToDetach.NodeName) | ||||
|  | ||||
| 		return nil | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (og *operationGenerator) GerifyVolumeIsSafeToDetach( | ||||
| 	volumeToDetach AttachedVolume) error { | ||||
| 	// Fetch current node object | ||||
| 	node, fetchErr := og.kubeClient.Core().Nodes().Get(string(volumeToDetach.NodeName), metav1.GetOptions{}) | ||||
| 	if fetchErr != nil { | ||||
| 		if errors.IsNotFound(fetchErr) { | ||||
| 			glog.Warningf("Node %q not found on API server. DetachVolume will skip safe to detach check.", | ||||
| 				volumeToDetach.NodeName, | ||||
| 				volumeToDetach.VolumeName, | ||||
| 				volumeToDetach.VolumeSpec.Name()) | ||||
| 			return nil | ||||
| 		} | ||||
|  | ||||
| 		// On failure, return error. Caller will log and retry. | ||||
| 		return fmt.Errorf( | ||||
| 			"DetachVolume failed fetching node from API server for volume %q (spec.Name: %q) from node %q with: %v", | ||||
| 			volumeToDetach.VolumeName, | ||||
| 			volumeToDetach.VolumeSpec.Name(), | ||||
| 			volumeToDetach.NodeName, | ||||
| 			fetchErr) | ||||
| 	} | ||||
|  | ||||
| 	if node == nil { | ||||
| 		// On failure, return error. Caller will log and retry. | ||||
| 		return fmt.Errorf( | ||||
| 			"DetachVolume failed fetching node from API server for volume %q (spec.Name: %q) from node %q. Error: node object retrieved from API server is nil", | ||||
| 			volumeToDetach.VolumeName, | ||||
| 			volumeToDetach.VolumeSpec.Name(), | ||||
| 			volumeToDetach.NodeName) | ||||
| 	} | ||||
|  | ||||
| 	for _, inUseVolume := range node.Status.VolumesInUse { | ||||
| 		if inUseVolume == volumeToDetach.VolumeName { | ||||
| 			return fmt.Errorf("DetachVolume failed for volume %q (spec.Name: %q) from node %q. Error: volume is still in use by node, according to Node status", | ||||
| 				volumeToDetach.VolumeName, | ||||
| 				volumeToDetach.VolumeSpec.Name(), | ||||
| 				volumeToDetach.NodeName) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Volume is not marked as in use by node | ||||
| 	glog.Infof("Verified volume is safe to detach for volume %q (spec.Name: %q) from node %q.", | ||||
| 		volumeToDetach.VolumeName, | ||||
| 		volumeToDetach.VolumeSpec.Name(), | ||||
| 		volumeToDetach.NodeName) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (og *operationGenerator) GenerateMountVolumeFunc( | ||||
| 	waitForAttachTimeout time.Duration, | ||||
| 	volumeToMount VolumeToMount, | ||||
| 	actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) { | ||||
| 	// Get mounter plugin | ||||
| 	volumePlugin, err := | ||||
| 		og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) | ||||
| 	if err != nil || volumePlugin == nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"MountVolume.FindPluginBySpec failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 			volumeToMount.VolumeName, | ||||
| 			volumeToMount.VolumeSpec.Name(), | ||||
| 			volumeToMount.PodName, | ||||
| 			volumeToMount.Pod.UID, | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	volumeMounter, newMounterErr := volumePlugin.NewMounter( | ||||
| 		volumeToMount.VolumeSpec, | ||||
| 		volumeToMount.Pod, | ||||
| 		volume.VolumeOptions{}) | ||||
| 	if newMounterErr != nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"MountVolume.NewMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 			volumeToMount.VolumeName, | ||||
| 			volumeToMount.VolumeSpec.Name(), | ||||
| 			volumeToMount.PodName, | ||||
| 			volumeToMount.Pod.UID, | ||||
| 			newMounterErr) | ||||
| 	} | ||||
|  | ||||
| 	// Get attacher, if possible | ||||
| 	attachableVolumePlugin, _ := | ||||
| 		og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec) | ||||
| 	var volumeAttacher volume.Attacher | ||||
| 	if attachableVolumePlugin != nil { | ||||
| 		volumeAttacher, _ = attachableVolumePlugin.NewAttacher() | ||||
| 	} | ||||
|  | ||||
| 	var fsGroup *int64 | ||||
| 	if volumeToMount.Pod.Spec.SecurityContext != nil && | ||||
| 		volumeToMount.Pod.Spec.SecurityContext.FSGroup != nil { | ||||
| 		fsGroup = volumeToMount.Pod.Spec.SecurityContext.FSGroup | ||||
| 	} | ||||
|  | ||||
| 	return func() error { | ||||
| 		if volumeAttacher != nil { | ||||
| 			// Wait for attachable volumes to finish attaching | ||||
| 			glog.Infof( | ||||
| 				"Entering MountVolume.WaitForAttach for volume %q (spec.Name: %q) pod %q (UID: %q) DevicePath: %q", | ||||
| 				volumeToMount.VolumeName, | ||||
| 				volumeToMount.VolumeSpec.Name(), | ||||
| 				volumeToMount.PodName, | ||||
| 				volumeToMount.Pod.UID, | ||||
| 				volumeToMount.DevicePath) | ||||
|  | ||||
| 			devicePath, err := volumeAttacher.WaitForAttach( | ||||
| 				volumeToMount.VolumeSpec, volumeToMount.DevicePath, waitForAttachTimeout) | ||||
| 			if err != nil { | ||||
| 				// On failure, return error. Caller will log and retry. | ||||
| 				return fmt.Errorf( | ||||
| 					"MountVolume.WaitForAttach failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 					volumeToMount.VolumeName, | ||||
| 					volumeToMount.VolumeSpec.Name(), | ||||
| 					volumeToMount.PodName, | ||||
| 					volumeToMount.Pod.UID, | ||||
| 					err) | ||||
| 			} | ||||
|  | ||||
| 			glog.Infof( | ||||
| 				"MountVolume.WaitForAttach succeeded for volume %q (spec.Name: %q) pod %q (UID: %q).", | ||||
| 				volumeToMount.VolumeName, | ||||
| 				volumeToMount.VolumeSpec.Name(), | ||||
| 				volumeToMount.PodName, | ||||
| 				volumeToMount.Pod.UID) | ||||
|  | ||||
| 			deviceMountPath, err := | ||||
| 				volumeAttacher.GetDeviceMountPath(volumeToMount.VolumeSpec) | ||||
| 			if err != nil { | ||||
| 				// On failure, return error. Caller will log and retry. | ||||
| 				return fmt.Errorf( | ||||
| 					"MountVolume.GetDeviceMountPath failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 					volumeToMount.VolumeName, | ||||
| 					volumeToMount.VolumeSpec.Name(), | ||||
| 					volumeToMount.PodName, | ||||
| 					volumeToMount.Pod.UID, | ||||
| 					err) | ||||
| 			} | ||||
|  | ||||
| 			// Mount device to global mount path | ||||
| 			err = volumeAttacher.MountDevice( | ||||
| 				volumeToMount.VolumeSpec, | ||||
| 				devicePath, | ||||
| 				deviceMountPath) | ||||
| 			if err != nil { | ||||
| 				// On failure, return error. Caller will log and retry. | ||||
| 				err := fmt.Errorf( | ||||
| 					"MountVolume.MountDevice failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 					volumeToMount.VolumeName, | ||||
| 					volumeToMount.VolumeSpec.Name(), | ||||
| 					volumeToMount.PodName, | ||||
| 					volumeToMount.Pod.UID, | ||||
| 					err) | ||||
| 				og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, err.Error()) | ||||
| 				return err | ||||
| 			} | ||||
|  | ||||
| 			glog.Infof( | ||||
| 				"MountVolume.MountDevice succeeded for volume %q (spec.Name: %q) pod %q (UID: %q) device mount path %q", | ||||
| 				volumeToMount.VolumeName, | ||||
| 				volumeToMount.VolumeSpec.Name(), | ||||
| 				volumeToMount.PodName, | ||||
| 				volumeToMount.Pod.UID, | ||||
| 				deviceMountPath) | ||||
|  | ||||
| 			// Update actual state of world to reflect volume is globally mounted | ||||
| 			markDeviceMountedErr := actualStateOfWorld.MarkDeviceAsMounted( | ||||
| 				volumeToMount.VolumeName) | ||||
| 			if markDeviceMountedErr != nil { | ||||
| 				// On failure, return error. Caller will log and retry. | ||||
| 				return fmt.Errorf( | ||||
| 					"MountVolume.MarkDeviceAsMounted failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 					volumeToMount.VolumeName, | ||||
| 					volumeToMount.VolumeSpec.Name(), | ||||
| 					volumeToMount.PodName, | ||||
| 					volumeToMount.Pod.UID, | ||||
| 					markDeviceMountedErr) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		if og.checkNodeCapabilitiesBeforeMount { | ||||
| 			if canMountErr := volumeMounter.CanMount(); canMountErr != nil { | ||||
| 				errMsg := fmt.Sprintf("Unable to mount volume %v (spec.Name: %v) on pod %v (UID: %v). Verify that your node machine has the required components before attempting to mount this volume type. %s", volumeToMount.VolumeName, volumeToMount.VolumeSpec.Name(), volumeToMount.Pod.Name, volumeToMount.Pod.UID, canMountErr.Error()) | ||||
| 				og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, errMsg) | ||||
| 				glog.Errorf(errMsg) | ||||
| 				return fmt.Errorf(errMsg) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// Execute mount | ||||
| 		mountErr := volumeMounter.SetUp(fsGroup) | ||||
| 		if mountErr != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			err := fmt.Errorf( | ||||
| 				"MountVolume.SetUp failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 				volumeToMount.VolumeName, | ||||
| 				volumeToMount.VolumeSpec.Name(), | ||||
| 				volumeToMount.PodName, | ||||
| 				volumeToMount.Pod.UID, | ||||
| 				mountErr) | ||||
| 			og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, err.Error()) | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		glog.Infof( | ||||
| 			"MountVolume.SetUp succeeded for volume %q (spec.Name: %q) pod %q (UID: %q).", | ||||
| 			volumeToMount.VolumeName, | ||||
| 			volumeToMount.VolumeSpec.Name(), | ||||
| 			volumeToMount.PodName, | ||||
| 			volumeToMount.Pod.UID) | ||||
|  | ||||
| 		// Update actual state of world | ||||
| 		markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted( | ||||
| 			volumeToMount.PodName, | ||||
| 			volumeToMount.Pod.UID, | ||||
| 			volumeToMount.VolumeName, | ||||
| 			volumeMounter, | ||||
| 			volumeToMount.OuterVolumeSpecName, | ||||
| 			volumeToMount.VolumeGidValue) | ||||
| 		if markVolMountedErr != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			return fmt.Errorf( | ||||
| 				"MountVolume.MarkVolumeAsMounted failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 				volumeToMount.VolumeName, | ||||
| 				volumeToMount.VolumeSpec.Name(), | ||||
| 				volumeToMount.PodName, | ||||
| 				volumeToMount.Pod.UID, | ||||
| 				markVolMountedErr) | ||||
| 		} | ||||
|  | ||||
| 		return nil | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (og *operationGenerator) GenerateUnmountVolumeFunc( | ||||
| 	volumeToUnmount MountedVolume, | ||||
| 	actualStateOfWorld ActualStateOfWorldMounterUpdater) (func() error, error) { | ||||
| 	// Get mountable plugin | ||||
| 	volumePlugin, err := | ||||
| 		og.volumePluginMgr.FindPluginByName(volumeToUnmount.PluginName) | ||||
| 	if err != nil || volumePlugin == nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"UnmountVolume.FindPluginByName failed for volume %q (volume.spec.Name: %q) pod %q (UID: %q) err=%v", | ||||
| 			volumeToUnmount.VolumeName, | ||||
| 			volumeToUnmount.OuterVolumeSpecName, | ||||
| 			volumeToUnmount.PodName, | ||||
| 			volumeToUnmount.PodUID, | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	volumeUnmounter, newUnmounterErr := volumePlugin.NewUnmounter( | ||||
| 		volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID) | ||||
| 	if newUnmounterErr != nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"UnmountVolume.NewUnmounter failed for volume %q (volume.spec.Name: %q) pod %q (UID: %q) err=%v", | ||||
| 			volumeToUnmount.VolumeName, | ||||
| 			volumeToUnmount.OuterVolumeSpecName, | ||||
| 			volumeToUnmount.PodName, | ||||
| 			volumeToUnmount.PodUID, | ||||
| 			newUnmounterErr) | ||||
| 	} | ||||
|  | ||||
| 	return func() error { | ||||
| 		// Execute unmount | ||||
| 		unmountErr := volumeUnmounter.TearDown() | ||||
| 		if unmountErr != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			return fmt.Errorf( | ||||
| 				"UnmountVolume.TearDown failed for volume %q (volume.spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 				volumeToUnmount.VolumeName, | ||||
| 				volumeToUnmount.OuterVolumeSpecName, | ||||
| 				volumeToUnmount.PodName, | ||||
| 				volumeToUnmount.PodUID, | ||||
| 				unmountErr) | ||||
| 		} | ||||
|  | ||||
| 		glog.Infof( | ||||
| 			"UnmountVolume.TearDown succeeded for volume %q (OuterVolumeSpecName: %q) pod %q (UID: %q). InnerVolumeSpecName %q. PluginName %q, VolumeGidValue %q", | ||||
| 			volumeToUnmount.VolumeName, | ||||
| 			volumeToUnmount.OuterVolumeSpecName, | ||||
| 			volumeToUnmount.PodName, | ||||
| 			volumeToUnmount.PodUID, | ||||
| 			volumeToUnmount.InnerVolumeSpecName, | ||||
| 			volumeToUnmount.PluginName, | ||||
| 			volumeToUnmount.VolumeGidValue) | ||||
|  | ||||
| 		// Update actual state of world | ||||
| 		markVolMountedErr := actualStateOfWorld.MarkVolumeAsUnmounted( | ||||
| 			volumeToUnmount.PodName, volumeToUnmount.VolumeName) | ||||
| 		if markVolMountedErr != nil { | ||||
| 			// On failure, just log and exit | ||||
| 			glog.Errorf( | ||||
| 				"UnmountVolume.MarkVolumeAsUnmounted failed for volume %q (volume.spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 				volumeToUnmount.VolumeName, | ||||
| 				volumeToUnmount.OuterVolumeSpecName, | ||||
| 				volumeToUnmount.PodName, | ||||
| 				volumeToUnmount.PodUID, | ||||
| 				markVolMountedErr) | ||||
| 		} | ||||
|  | ||||
| 		return nil | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (og *operationGenerator) GenerateUnmountDeviceFunc( | ||||
| 	deviceToDetach AttachedVolume, | ||||
| 	actualStateOfWorld ActualStateOfWorldMounterUpdater, | ||||
| 	mounter mount.Interface) (func() error, error) { | ||||
| 	// Get attacher plugin | ||||
| 	attachableVolumePlugin, err := | ||||
| 		og.volumePluginMgr.FindAttachablePluginBySpec(deviceToDetach.VolumeSpec) | ||||
| 	if err != nil || attachableVolumePlugin == nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"UnmountDevice.FindAttachablePluginBySpec failed for volume %q (spec.Name: %q) with: %v", | ||||
| 			deviceToDetach.VolumeName, | ||||
| 			deviceToDetach.VolumeSpec.Name(), | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	volumeDetacher, err := attachableVolumePlugin.NewDetacher() | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"UnmountDevice.NewDetacher failed for volume %q (spec.Name: %q) with: %v", | ||||
| 			deviceToDetach.VolumeName, | ||||
| 			deviceToDetach.VolumeSpec.Name(), | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	volumeAttacher, err := attachableVolumePlugin.NewAttacher() | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"UnmountDevice.NewAttacher failed for volume %q (spec.Name: %q) with: %v", | ||||
| 			deviceToDetach.VolumeName, | ||||
| 			deviceToDetach.VolumeSpec.Name(), | ||||
| 			err) | ||||
| 	} | ||||
|  | ||||
| 	return func() error { | ||||
| 		deviceMountPath, err := | ||||
| 			volumeAttacher.GetDeviceMountPath(deviceToDetach.VolumeSpec) | ||||
| 		if err != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			return fmt.Errorf( | ||||
| 				"GetDeviceMountPath failed for volume %q (spec.Name: %q) with: %v", | ||||
| 				deviceToDetach.VolumeName, | ||||
| 				deviceToDetach.VolumeSpec.Name(), | ||||
| 				err) | ||||
| 		} | ||||
| 		refs, err := attachableVolumePlugin.GetDeviceMountRefs(deviceMountPath) | ||||
|  | ||||
| 		if err != nil || hasMountRefs(deviceMountPath, refs) { | ||||
| 			if err == nil { | ||||
| 				err = fmt.Errorf("The device mount path %q is still mounted by other references %v", deviceMountPath, refs) | ||||
| 			} | ||||
| 			return fmt.Errorf( | ||||
| 				"GetDeviceMountRefs check failed for volume %q (spec.Name: %q) with: %v", | ||||
| 				deviceToDetach.VolumeName, | ||||
| 				deviceToDetach.VolumeSpec.Name(), | ||||
| 				err) | ||||
| 		} | ||||
| 		// Execute unmount | ||||
| 		unmountDeviceErr := volumeDetacher.UnmountDevice(deviceMountPath) | ||||
| 		if unmountDeviceErr != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			return fmt.Errorf( | ||||
| 				"UnmountDevice failed for volume %q (spec.Name: %q) with: %v", | ||||
| 				deviceToDetach.VolumeName, | ||||
| 				deviceToDetach.VolumeSpec.Name(), | ||||
| 				unmountDeviceErr) | ||||
| 		} | ||||
| 		// Before logging that UnmountDevice succeeded and moving on, | ||||
| 		// use mounter.PathIsDevice to check if the path is a device, | ||||
| 		// if so use mounter.DeviceOpened to check if the device is in use anywhere | ||||
| 		// else on the system. Retry if it returns true. | ||||
| 		isDevicePath, devicePathErr := mounter.PathIsDevice(deviceToDetach.DevicePath) | ||||
| 		var deviceOpened bool | ||||
| 		var deviceOpenedErr error | ||||
| 		if !isDevicePath && devicePathErr == nil { | ||||
| 			// not a device path or path doesn't exist | ||||
| 			//TODO: refer to #36092 | ||||
| 			glog.V(3).Infof("Not checking device path %s", deviceToDetach.DevicePath) | ||||
| 			deviceOpened = false | ||||
| 		} else { | ||||
| 			deviceOpened, deviceOpenedErr = mounter.DeviceOpened(deviceToDetach.DevicePath) | ||||
| 			if deviceOpenedErr != nil { | ||||
| 				return fmt.Errorf( | ||||
| 					"UnmountDevice.DeviceOpened failed for volume %q (spec.Name: %q) with: %v", | ||||
| 					deviceToDetach.VolumeName, | ||||
| 					deviceToDetach.VolumeSpec.Name(), | ||||
| 					deviceOpenedErr) | ||||
| 			} | ||||
| 		} | ||||
| 		// The device is still in use elsewhere. Caller will log and retry. | ||||
| 		if deviceOpened { | ||||
| 			return fmt.Errorf( | ||||
| 				"UnmountDevice failed for volume %q (spec.Name: %q) because the device is in use when it was no longer expected to be in use", | ||||
| 				deviceToDetach.VolumeName, | ||||
| 				deviceToDetach.VolumeSpec.Name()) | ||||
| 		} | ||||
|  | ||||
| 		glog.Infof( | ||||
| 			"UnmountDevice succeeded for volume %q (spec.Name: %q).", | ||||
| 			deviceToDetach.VolumeName, | ||||
| 			deviceToDetach.VolumeSpec.Name()) | ||||
|  | ||||
| 		// Update actual state of world | ||||
| 		markDeviceUnmountedErr := actualStateOfWorld.MarkDeviceAsUnmounted( | ||||
| 			deviceToDetach.VolumeName) | ||||
| 		if markDeviceUnmountedErr != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			return fmt.Errorf( | ||||
| 				"MarkDeviceAsUnmounted failed for device %q (spec.Name: %q) with: %v", | ||||
| 				deviceToDetach.VolumeName, | ||||
| 				deviceToDetach.VolumeSpec.Name(), | ||||
| 				markDeviceUnmountedErr) | ||||
| 		} | ||||
|  | ||||
| 		return nil | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc( | ||||
| 	volumeToMount VolumeToMount, | ||||
| 	nodeName types.NodeName, | ||||
| 	actualStateOfWorld ActualStateOfWorldAttacherUpdater) (func() error, error) { | ||||
| 	return func() error { | ||||
| 		if !volumeToMount.PluginIsAttachable { | ||||
| 			// If the volume does not implement the attacher interface, it is | ||||
| 			// assumed to be attached and the actual state of the world is | ||||
| 			// updated accordingly. | ||||
|  | ||||
| 			addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached( | ||||
| 				volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" /* devicePath */) | ||||
| 			if addVolumeNodeErr != nil { | ||||
| 				// On failure, return error. Caller will log and retry. | ||||
| 				return fmt.Errorf( | ||||
| 					"VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 					volumeToMount.VolumeName, | ||||
| 					volumeToMount.VolumeSpec.Name(), | ||||
| 					volumeToMount.PodName, | ||||
| 					volumeToMount.Pod.UID, | ||||
| 					addVolumeNodeErr) | ||||
| 			} | ||||
|  | ||||
| 			return nil | ||||
| 		} | ||||
|  | ||||
| 		if !volumeToMount.ReportedInUse { | ||||
| 			// If the given volume has not yet been added to the list of | ||||
| 			// VolumesInUse in the node's volume status, do not proceed, return | ||||
| 			// error. Caller will log and retry. The node status is updated | ||||
| 			// periodically by kubelet, so it may take as much as 10 seconds | ||||
| 			// before this clears. | ||||
| 			// Issue #28141 to enable on demand status updates. | ||||
| 			return fmt.Errorf("Volume %q (spec.Name: %q) pod %q (UID: %q) has not yet been added to the list of VolumesInUse in the node's volume status", | ||||
| 				volumeToMount.VolumeName, | ||||
| 				volumeToMount.VolumeSpec.Name(), | ||||
| 				volumeToMount.PodName, | ||||
| 				volumeToMount.Pod.UID) | ||||
| 		} | ||||
|  | ||||
| 		// Fetch current node object | ||||
| 		node, fetchErr := og.kubeClient.Core().Nodes().Get(string(nodeName), metav1.GetOptions{}) | ||||
| 		if fetchErr != nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			return fmt.Errorf( | ||||
| 				"VerifyControllerAttachedVolume failed fetching node from API server. Volume %q (spec.Name: %q) pod %q (UID: %q). Error: %v", | ||||
| 				volumeToMount.VolumeName, | ||||
| 				volumeToMount.VolumeSpec.Name(), | ||||
| 				volumeToMount.PodName, | ||||
| 				volumeToMount.Pod.UID, | ||||
| 				fetchErr) | ||||
| 		} | ||||
|  | ||||
| 		if node == nil { | ||||
| 			// On failure, return error. Caller will log and retry. | ||||
| 			return fmt.Errorf( | ||||
| 				"VerifyControllerAttachedVolume failed. Volume %q (spec.Name: %q) pod %q (UID: %q). Error: node object retrieved from API server is nil", | ||||
| 				volumeToMount.VolumeName, | ||||
| 				volumeToMount.VolumeSpec.Name(), | ||||
| 				volumeToMount.PodName, | ||||
| 				volumeToMount.Pod.UID) | ||||
| 		} | ||||
|  | ||||
| 		for _, attachedVolume := range node.Status.VolumesAttached { | ||||
| 			if attachedVolume.Name == volumeToMount.VolumeName { | ||||
| 				addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached( | ||||
| 					v1.UniqueVolumeName(""), volumeToMount.VolumeSpec, nodeName, attachedVolume.DevicePath) | ||||
| 				glog.Infof("Controller successfully attached volume %q (spec.Name: %q) pod %q (UID: %q) devicePath: %q", | ||||
| 					volumeToMount.VolumeName, | ||||
| 					volumeToMount.VolumeSpec.Name(), | ||||
| 					volumeToMount.PodName, | ||||
| 					volumeToMount.Pod.UID, | ||||
| 					attachedVolume.DevicePath) | ||||
|  | ||||
| 				if addVolumeNodeErr != nil { | ||||
| 					// On failure, return error. Caller will log and retry. | ||||
| 					return fmt.Errorf( | ||||
| 						"VerifyControllerAttachedVolume.MarkVolumeAsAttached failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 						volumeToMount.VolumeName, | ||||
| 						volumeToMount.VolumeSpec.Name(), | ||||
| 						volumeToMount.PodName, | ||||
| 						volumeToMount.Pod.UID, | ||||
| 						addVolumeNodeErr) | ||||
| 				} | ||||
| 				return nil | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// Volume not attached, return error. Caller will log and retry. | ||||
| 		return fmt.Errorf("Volume %q (spec.Name: %q) pod %q (UID: %q) is not yet attached according to node status", | ||||
| 			volumeToMount.VolumeName, | ||||
| 			volumeToMount.VolumeSpec.Name(), | ||||
| 			volumeToMount.PodName, | ||||
| 			volumeToMount.Pod.UID) | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (og *operationGenerator) verifyVolumeIsSafeToDetach( | ||||
| 	volumeToDetach AttachedVolume) error { | ||||
| 	// Fetch current node object | ||||
| 	node, fetchErr := og.kubeClient.Core().Nodes().Get(string(volumeToDetach.NodeName), metav1.GetOptions{}) | ||||
| 	if fetchErr != nil { | ||||
| 		if errors.IsNotFound(fetchErr) { | ||||
| 			glog.Warningf("Node %q not found on API server. DetachVolume will skip safe to detach check.", | ||||
| 				volumeToDetach.NodeName, | ||||
| 				volumeToDetach.VolumeName, | ||||
| 				volumeToDetach.VolumeSpec.Name()) | ||||
| 			return nil | ||||
| 		} | ||||
|  | ||||
| 		// On failure, return error. Caller will log and retry. | ||||
| 		return fmt.Errorf( | ||||
| 			"DetachVolume failed fetching node from API server for volume %q (spec.Name: %q) from node %q with: %v", | ||||
| 			volumeToDetach.VolumeName, | ||||
| 			volumeToDetach.VolumeSpec.Name(), | ||||
| 			volumeToDetach.NodeName, | ||||
| 			fetchErr) | ||||
| 	} | ||||
|  | ||||
| 	if node == nil { | ||||
| 		// On failure, return error. Caller will log and retry. | ||||
| 		return fmt.Errorf( | ||||
| 			"DetachVolume failed fetching node from API server for volume %q (spec.Name: %q) from node %q. Error: node object retrieved from API server is nil", | ||||
| 			volumeToDetach.VolumeName, | ||||
| 			volumeToDetach.VolumeSpec.Name(), | ||||
| 			volumeToDetach.NodeName) | ||||
| 	} | ||||
|  | ||||
| 	for _, inUseVolume := range node.Status.VolumesInUse { | ||||
| 		if inUseVolume == volumeToDetach.VolumeName { | ||||
| 			return fmt.Errorf("DetachVolume failed for volume %q (spec.Name: %q) from node %q. Error: volume is still in use by node, according to Node status", | ||||
| 				volumeToDetach.VolumeName, | ||||
| 				volumeToDetach.VolumeSpec.Name(), | ||||
| 				volumeToDetach.NodeName) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Volume is not marked as in use by node | ||||
| 	glog.Infof("Verified volume is safe to detach for volume %q (spec.Name: %q) from node %q.", | ||||
| 		volumeToDetach.VolumeName, | ||||
| 		volumeToDetach.VolumeSpec.Name(), | ||||
| 		volumeToDetach.NodeName) | ||||
| 	return nil | ||||
| } | ||||
| @@ -179,14 +179,16 @@ Federation secrets Secret objects should be created and deleted successfully,pmo | ||||
| Federation secrets Secret objects should be deleted from underlying clusters when OrphanDependents is false,nikhiljindal,0 | ||||
| Federation secrets Secret objects should not be deleted from underlying clusters when OrphanDependents is nil,nikhiljindal,0 | ||||
| Federation secrets Secret objects should not be deleted from underlying clusters when OrphanDependents is true,nikhiljindal,0 | ||||
| Firewall rule should create valid firewall rules for LoadBalancer type service,brendandburns,0 | ||||
| Firewall rule should have correct firewall rules for e2e cluster,brendandburns,0 | ||||
| Firewall rule should create valid firewall rules for LoadBalancer type service,rkouj,0 | ||||
| Firewall rule should have correct firewall rules for e2e cluster,rkouj,0 | ||||
| GCP Volumes GlusterFS should be mountable,nikhiljindal,0 | ||||
| GCP Volumes NFSv4 should be mountable for NFSv4,nikhiljindal,0 | ||||
| GKE local SSD should write and read from node local SSD,fabioy,0 | ||||
| GKE node pools should create a cluster with multiple node pools,fabioy,1 | ||||
| Garbage Collection Test: * Should eventually garbage collect containers when we exceed the number of dead containers per container,Random-Liu,0 | ||||
| Garbage collector should delete RS created by deployment when not orphaning,rkouj,0 | ||||
| Garbage collector should delete pods created by rc when not orphaning,justinsb,1 | ||||
| Garbage collector should orphan RS created by deployment when deleteOptions.OrphanDependents is true,rkouj,0 | ||||
| Garbage collector should orphan pods created by rc if delete options say so,fabioy,1 | ||||
| Garbage collector should orphan pods created by rc if deleteOptions.OrphanDependents is nil,zmerlynn,1 | ||||
| "Generated release_1_5 clientset should create pods, delete pods, watch pods",rrati,0 | ||||
| @@ -247,6 +249,7 @@ Kubectl client Kubectl taint should update the taint on a node,pwittrock,0 | ||||
| Kubectl client Kubectl version should check is all data is printed,janetkuo,0 | ||||
| Kubectl client Proxy server should support --unix-socket=/path,zmerlynn,1 | ||||
| Kubectl client Proxy server should support proxy with --port 0,ncdc,1 | ||||
| Kubectl client Simple pod should handle in-cluster config,rkouj,0 | ||||
| Kubectl client Simple pod should return command exit codes,yifan-gu,1 | ||||
| Kubectl client Simple pod should support exec,ncdc,0 | ||||
| Kubectl client Simple pod should support exec through an HTTP proxy,ncdc,0 | ||||
| @@ -264,9 +267,7 @@ Kubelet Container Manager Validate OOM score adjustments once the node is setup | ||||
| Kubelet Container Manager Validate OOM score adjustments once the node is setup docker daemon's oom-score-adj should be -999,thockin,1 | ||||
| Kubelet Container Manager Validate OOM score adjustments once the node is setup guaranteed container's oom-score-adj should be -998,kargakis,1 | ||||
| Kubelet Container Manager Validate OOM score adjustments once the node is setup pod infra containers oom-score-adj should be -998 and best effort container's should be 1000,timothysc,1 | ||||
| Kubelet Eviction Manager hard eviction test pod using the most disk space gets evicted when the node disk usage is above the eviction hard threshold should evict the pod using the most disk space,rrati,0 | ||||
| Kubelet Volume Manager Volume Manager On terminatation of pod with memory backed volume should remove the volume from the node,derekwaynecarr,0 | ||||
| Kubelet Eviction Manager hard eviction test pod using the most disk space gets evicted when the node disk usage is above the eviction hard threshold should evict the pod using the most disk space,karlkfi,1 | ||||
| Kubelet Eviction Manager hard eviction test pod using the most disk space gets evicted when the node disk usage is above the eviction hard threshold should evict the pod using the most disk space,rkouj,0 | ||||
| Kubelet Volume Manager Volume Manager On terminatation of pod with memory backed volume should remove the volume from the node,rkouj,0 | ||||
| Kubelet experimental resource usage tracking resource tracking for * pods per node,yujuhong,0 | ||||
| Kubelet regular resource usage tracking resource tracking for * pods per node,yujuhong,0 | ||||
| @@ -344,11 +345,10 @@ PersistentVolumes PersistentVolumes:NFS with multiple PVs and PVCs all in same n | ||||
| PersistentVolumes PersistentVolumes:NFS with multiple PVs and PVCs all in same ns should create 3 PVs and 3 PVCs: test write access,copejon,0 | ||||
| PersistentVolumes PersistentVolumes:NFS with multiple PVs and PVCs all in same ns should create 4 PVs and 2 PVCs: test write access,copejon,0 | ||||
| Pet Store should scale to persist a nominal number ( * ) of transactions in * seconds,xiang90,1 | ||||
| "Pod Disks Should schedule a pod w/ a RW PD, gracefully remove it, then schedule it on another host",alex-mohr,1 | ||||
| "Pod Disks Should schedule a pod w/ a readonly PD on two hosts, then remove both gracefully.",rrati,0 | ||||
| "Pod Disks Should schedule a pod w/ a RW PD, gracefully remove it, then schedule it on another host",saad-ali,0 | ||||
| "Pod Disks Should schedule a pod w/ a readonly PD on two hosts, then remove both gracefully.",saad-ali,0 | ||||
| Pod Disks should be able to detach from a node which was deleted,rkouj,0 | ||||
| Pod Disks should be able to detach from a node whose api object was deleted,rkouj,0 | ||||
| "Pod Disks should schedule a pod w/ a RW PD shared between multiple containers, write to PD, delete pod, verify contents, and repeat in rapid succession",saad-ali,0 | ||||
| "Pod Disks should schedule a pod w/ a RW PD, ungracefully remove it, then schedule it on another host",mml,1 | ||||
| "Pod Disks should schedule a pod w/ a readonly PD on two hosts, then remove both ungracefully.",saad-ali,1 | ||||
| @@ -471,14 +471,10 @@ SimpleMount should be able to mount an emptydir on a container,rrati,0 | ||||
| "Spark should start spark master, driver and workers",jszczepkowski,1 | ||||
| "Staging client repo client should create pods, delete pods, watch pods",jbeda,1 | ||||
| StatefulSet Basic StatefulSet functionality Scaling down before scale up is finished should wait until current pod will be running and ready before it will be removed,derekwaynecarr,0 | ||||
| StatefulSet Basic StatefulSet functionality Scaling should happen in predictable order and halt if any stateful pod is unhealthy,brendandburns,0 | ||||
| StatefulSet Basic StatefulSet functionality Should recreate evicted statefulset,brendandburns,0 | ||||
| StatefulSet Basic StatefulSet functionality should allow template updates,derekwaynecarr,0 | ||||
| StatefulSet Basic StatefulSet functionality should handle healthy stateful pod restarts during scale,kevin-wangzefeng,1 | ||||
| StatefulSet Basic StatefulSet functionality Scaling should happen in predictable order and halt if any pet is unhealthy,rkouj,0 | ||||
| StatefulSet Basic StatefulSet functionality Scaling should happen in predictable order and halt if any stateful pod is unhealthy,derekwaynecarr,0 | ||||
| StatefulSet Basic StatefulSet functionality Should recreate evicted statefulset,rrati,0 | ||||
| StatefulSet Basic StatefulSet functionality should allow template updates,rkouj,0 | ||||
| StatefulSet Basic StatefulSet functionality should handle healthy pet restarts during scale,girishkalele,1 | ||||
| StatefulSet Basic StatefulSet functionality should handle healthy stateful pod restarts during scale,brendandburns,0 | ||||
| StatefulSet Basic StatefulSet functionality should handle healthy stateful pod restarts during scale,kevin-wangzefeng,1 | ||||
| StatefulSet Basic StatefulSet functionality should provide basic identity,bprashanth,1 | ||||
| StatefulSet Deploy clustered applications should creating a working CockroachDB cluster,rkouj,0 | ||||
| StatefulSet Deploy clustered applications should creating a working mysql cluster,yujuhong,1 | ||||
| @@ -502,6 +498,7 @@ Variable Expansion should allow substituting values in a container's command,mml | ||||
| Volumes Ceph RBD should be mountable,fabioy,1 | ||||
| Volumes CephFS should be mountable,Q-Lee,1 | ||||
| Volumes Cinder should be mountable,cjcullen,1 | ||||
| Volumes ConfigMap should be mountable,rkouj,0 | ||||
| Volumes GlusterFS should be mountable,eparis,1 | ||||
| Volumes NFS should be mountable,rrati,0 | ||||
| Volumes PD should be mountable,caesarxuchao,1 | ||||
| @@ -561,15 +558,8 @@ k8s.io/kubernetes/pkg/api/meta,fabioy,1 | ||||
| k8s.io/kubernetes/pkg/api/resource,smarterclayton,1 | ||||
| k8s.io/kubernetes/pkg/api/service,spxtr,1 | ||||
| k8s.io/kubernetes/pkg/api/testapi,caesarxuchao,1 | ||||
| k8s.io/kubernetes/pkg/api/util,rrati,0 | ||||
| k8s.io/kubernetes/pkg/api/v1,rrati,0 | ||||
| k8s.io/kubernetes/pkg/api/v1/endpoints,madhusudancs,0 | ||||
| k8s.io/kubernetes/pkg/api/v1/pod,madhusudancs,0 | ||||
| k8s.io/kubernetes/pkg/api/v1/service,madhusudancs,0 | ||||
| k8s.io/kubernetes/pkg/api/unversioned,kevin-wangzefeng,1 | ||||
| k8s.io/kubernetes/pkg/api/unversioned/validation,brendandburns,1 | ||||
| k8s.io/kubernetes/pkg/api/util,ghodss,1 | ||||
| k8s.io/kubernetes/pkg/api/v1,vulpecula,1 | ||||
| k8s.io/kubernetes/pkg/api/util,rkouj,0 | ||||
| k8s.io/kubernetes/pkg/api/v1,rkouj,0 | ||||
| k8s.io/kubernetes/pkg/api/v1/endpoints,rkouj,0 | ||||
| k8s.io/kubernetes/pkg/api/v1/pod,rkouj,0 | ||||
| k8s.io/kubernetes/pkg/api/v1/service,rkouj,0 | ||||
| @@ -599,6 +589,9 @@ k8s.io/kubernetes/pkg/apis/rbac/validation,erictune,0 | ||||
| k8s.io/kubernetes/pkg/apis/storage/validation,caesarxuchao,1 | ||||
| k8s.io/kubernetes/pkg/apiserver,nikhiljindal,0 | ||||
| k8s.io/kubernetes/pkg/apiserver/filters,dchen1107,1 | ||||
| k8s.io/kubernetes/pkg/apiserver/handlers,rkouj,0 | ||||
| k8s.io/kubernetes/pkg/apiserver/handlers/errors,rkouj,0 | ||||
| k8s.io/kubernetes/pkg/apiserver/handlers/negotiation,rkouj,0 | ||||
| k8s.io/kubernetes/pkg/apiserver/request,lavalamp,1 | ||||
| k8s.io/kubernetes/pkg/auth/authenticator/bearertoken,liggitt,0 | ||||
| k8s.io/kubernetes/pkg/auth/authorizer/abac,liggitt,0 | ||||
| @@ -634,6 +627,7 @@ k8s.io/kubernetes/pkg/cloudprovider/providers/photon,luomiao,0 | ||||
| k8s.io/kubernetes/pkg/cloudprovider/providers/rackspace,caesarxuchao,1 | ||||
| k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere,apelisse,1 | ||||
| k8s.io/kubernetes/pkg/controller,mikedanese,1 | ||||
| k8s.io/kubernetes/pkg/controller/cloud,rkouj,0 | ||||
| k8s.io/kubernetes/pkg/controller/cronjob,soltysh,1 | ||||
| k8s.io/kubernetes/pkg/controller/daemon,Q-Lee,1 | ||||
| k8s.io/kubernetes/pkg/controller/deployment,asalkeld,0 | ||||
| @@ -665,10 +659,7 @@ k8s.io/kubernetes/pkg/credentialprovider,justinsb,1 | ||||
| k8s.io/kubernetes/pkg/credentialprovider/aws,zmerlynn,1 | ||||
| k8s.io/kubernetes/pkg/credentialprovider/azure,brendandburns,0 | ||||
| k8s.io/kubernetes/pkg/credentialprovider/gcp,mml,1 | ||||
| k8s.io/kubernetes/pkg/dns,rrati,0 | ||||
| k8s.io/kubernetes/pkg/dns/config,derekwaynecarr,0 | ||||
| k8s.io/kubernetes/pkg/dns/federation,derekwaynecarr,0 | ||||
| k8s.io/kubernetes/pkg/dns,jdef,1 | ||||
| k8s.io/kubernetes/pkg/dns,rkouj,0 | ||||
| k8s.io/kubernetes/pkg/dns/config,rkouj,0 | ||||
| k8s.io/kubernetes/pkg/dns/federation,rkouj,0 | ||||
| k8s.io/kubernetes/pkg/dns/treecache,bowei,0 | ||||
| @@ -681,6 +672,7 @@ k8s.io/kubernetes/pkg/genericapiserver/mux,spxtr,1 | ||||
| k8s.io/kubernetes/pkg/genericapiserver/openapi,davidopp,1 | ||||
| k8s.io/kubernetes/pkg/healthz,thockin,1 | ||||
| k8s.io/kubernetes/pkg/httplog,mtaufen,1 | ||||
| k8s.io/kubernetes/pkg/kubeapiserver/authorizer,rkouj,0 | ||||
| k8s.io/kubernetes/pkg/kubectl,madhusudancs,1 | ||||
| k8s.io/kubernetes/pkg/kubectl/cmd,rmmh,1 | ||||
| k8s.io/kubernetes/pkg/kubectl/cmd/config,asalkeld,0 | ||||
| @@ -786,6 +778,7 @@ k8s.io/kubernetes/pkg/registry/core/service/ipallocator,eparis,1 | ||||
| k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller,mtaufen,1 | ||||
| k8s.io/kubernetes/pkg/registry/core/service/ipallocator/etcd,kargakis,1 | ||||
| k8s.io/kubernetes/pkg/registry/core/service/portallocator,rrati,0 | ||||
| k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller,rkouj,0 | ||||
| k8s.io/kubernetes/pkg/registry/core/serviceaccount,caesarxuchao,1 | ||||
| k8s.io/kubernetes/pkg/registry/core/serviceaccount/etcd,bprashanth,1 | ||||
| k8s.io/kubernetes/pkg/registry/extensions/controller/etcd,mwielgus,1 | ||||
| @@ -812,7 +805,6 @@ k8s.io/kubernetes/pkg/registry/policy/poddisruptionbudget/etcd,xiang90,1 | ||||
| k8s.io/kubernetes/pkg/registry/storage/storageclass,brendandburns,1 | ||||
| k8s.io/kubernetes/pkg/registry/storage/storageclass/etcd,eparis,1 | ||||
| k8s.io/kubernetes/pkg/runtime,wojtek-t,0 | ||||
| k8s.io/kubernetes/pkg/runtime/schema,madhusudancs,0 | ||||
| k8s.io/kubernetes/pkg/runtime/schema,rkouj,0 | ||||
| k8s.io/kubernetes/pkg/runtime/serializer,wojtek-t,0 | ||||
| k8s.io/kubernetes/pkg/runtime/serializer/json,wojtek-t,0 | ||||
| @@ -918,6 +910,7 @@ k8s.io/kubernetes/pkg/volume/rbd,piosz,1 | ||||
| k8s.io/kubernetes/pkg/volume/secret,rmmh,1 | ||||
| k8s.io/kubernetes/pkg/volume/util,saad-ali,0 | ||||
| k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations,freehan,1 | ||||
| k8s.io/kubernetes/pkg/volume/util/operationexecutor,rkouj,0 | ||||
| k8s.io/kubernetes/pkg/volume/vsphere_volume,deads2k,1 | ||||
| k8s.io/kubernetes/pkg/watch,mwielgus,1 | ||||
| k8s.io/kubernetes/pkg/watch/versioned,childsb,1 | ||||
|   | ||||
| 
 | 
		Reference in New Issue
	
	Block a user
	 Kubernetes Submit Queue
					Kubernetes Submit Queue