Fix volume states out of sync problem after kubelet restarts
When kubelet restarts, all the information about the volumes will be gone from actual/desired states. When update node status with mounted volumes, the volume list might be empty although there are still volumes are mounted and in turn causing master to detach those volumes since they are not in the mounted volumes list. This fix is to make sure only update mounted volumes list after reconciler starts sync states process. This sync state process will scan the existing volume directories and reconstruct actual states if they are missing. This PR also fixes the problem during orphaned pods' directories. In case of the pod directory is unmounted but has not yet deleted (e.g., interrupted with kubelet restarts), clean up routine will delete the directory so that the pod directoriy could be cleaned up (it is safe to delete directory since it is no longer mounted) The third issue this PR fixes is that during reconstruct volume in actual state, mounter could not be nil since it is required for creating container.VolumeMap. If it is nil, it might cause nil pointer exception in kubelet. Details are in proposal PR #33203
This commit is contained in:
		| @@ -239,9 +239,9 @@ func (kl *Kubelet) GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 { | ||||
| 	return kl.volumeManager.GetExtraSupplementalGroupsForPod(pod) | ||||
| } | ||||
|  | ||||
| // getPodVolumeNameListFromDisk returns a list of the volume names by reading the | ||||
| // getPodVolumePathListFromDisk returns a list of the volume paths by reading the | ||||
| // volume directories for the given pod from the disk. | ||||
| func (kl *Kubelet) getPodVolumeNameListFromDisk(podUID types.UID) ([]string, error) { | ||||
| func (kl *Kubelet) getPodVolumePathListFromDisk(podUID types.UID) ([]string, error) { | ||||
| 	volumes := []string{} | ||||
| 	podVolDir := kl.getPodVolumesDir(podUID) | ||||
| 	volumePluginDirs, err := ioutil.ReadDir(podVolDir) | ||||
| @@ -254,9 +254,11 @@ func (kl *Kubelet) getPodVolumeNameListFromDisk(podUID types.UID) ([]string, err | ||||
| 		volumePluginPath := path.Join(podVolDir, volumePluginName) | ||||
| 		volumeDirs, err := util.ReadDirNoStat(volumePluginPath) | ||||
| 		if err != nil { | ||||
| 			return volumes, err | ||||
| 			return volumes, fmt.Errorf("Could not read directory %s: %v", volumePluginPath, err) | ||||
| 		} | ||||
| 		for _, volumeDir := range volumeDirs { | ||||
| 			volumes = append(volumes, path.Join(volumePluginPath, volumeDir)) | ||||
| 		} | ||||
| 		volumes = append(volumes, volumeDirs...) | ||||
| 	} | ||||
| 	return volumes, nil | ||||
| } | ||||
|   | ||||
| @@ -342,6 +342,8 @@ func (kl *Kubelet) tryUpdateNodeStatus() error { | ||||
| 	} | ||||
| 	// Update the current status on the API server | ||||
| 	updatedNode, err := kl.kubeClient.Core().Nodes().UpdateStatus(node) | ||||
| 	// If update finishes sucessfully, mark the volumeInUse as reportedInUse to indicate | ||||
| 	// those volumes are already updated in the node's status | ||||
| 	if err == nil { | ||||
| 		kl.volumeManager.MarkVolumesAsReportedInUse( | ||||
| 			updatedNode.Status.VolumesInUse) | ||||
| @@ -882,9 +884,13 @@ func (kl *Kubelet) recordNodeSchedulableEvent(node *api.Node) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Update VolumesInUse field in Node Status | ||||
| // Update VolumesInUse field in Node Status only after states are synced up at least once | ||||
| // in volume reconciler. | ||||
| func (kl *Kubelet) setNodeVolumesInUseStatus(node *api.Node) { | ||||
| 	// Make sure to only update node status after reconciler starts syncing up states | ||||
| 	if kl.volumeManager.ReconcilerStatesHasBeenSynced() { | ||||
| 		node.Status.VolumesInUse = kl.volumeManager.GetVolumesInUse() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // setNodeStatus fills in the Status fields of the given Node, overwriting | ||||
|   | ||||
| @@ -27,6 +27,7 @@ import ( | ||||
| 	"k8s.io/kubernetes/pkg/securitycontext" | ||||
| 	"k8s.io/kubernetes/pkg/types" | ||||
| 	utilerrors "k8s.io/kubernetes/pkg/util/errors" | ||||
| 	"k8s.io/kubernetes/pkg/util/mount" | ||||
| 	"k8s.io/kubernetes/pkg/util/selinux" | ||||
| 	"k8s.io/kubernetes/pkg/util/sets" | ||||
| 	"k8s.io/kubernetes/pkg/volume" | ||||
| @@ -153,8 +154,20 @@ func (kl *Kubelet) cleanupOrphanedPodDirs( | ||||
| 			continue | ||||
| 		} | ||||
| 		// Check whether volume is still mounted on disk. If so, do not delete directory | ||||
| 		if volumeNames, err := kl.getPodVolumeNameListFromDisk(uid); err != nil || len(volumeNames) != 0 { | ||||
| 			glog.V(3).Infof("Orphaned pod %q found, but volumes are still mounted; err: %v, volumes: %v ", uid, err, volumeNames) | ||||
| 		volumePaths, err := kl.getPodVolumePathListFromDisk(uid) | ||||
| 		if err != nil { | ||||
| 			glog.Errorf("Orphaned pod %q found, but error %v occured during reading volume dir from disk", uid, err) | ||||
| 			continue | ||||
| 		} else if len(volumePaths) > 0 { | ||||
| 			for _, path := range volumePaths { | ||||
| 				notMount, err := mount.IsNotMountPoint(path) | ||||
| 				if err == nil && notMount { | ||||
| 					glog.V(2).Infof("Volume path %q is no longer mounted, remove it", path) | ||||
| 					os.Remove(path) | ||||
| 				} else { | ||||
| 					glog.Errorf("Orphaned pod %q found, but it might still mounted with error %v", uid, err) | ||||
| 				} | ||||
| 			} | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
|   | ||||
| @@ -186,6 +186,7 @@ func IsRemountRequiredError(err error) bool { | ||||
| type actualStateOfWorld struct { | ||||
| 	// nodeName is the name of this node. This value is passed to Attach/Detach | ||||
| 	nodeName types.NodeName | ||||
|  | ||||
| 	// attachedVolumes is a map containing the set of volumes the kubelet volume | ||||
| 	// manager believes to be successfully attached to this node. Volume types | ||||
| 	// that do not implement an attacher interface are assumed to be in this | ||||
| @@ -193,6 +194,7 @@ type actualStateOfWorld struct { | ||||
| 	// The key in this map is the name of the volume and the value is an object | ||||
| 	// containing more information about the attached volume. | ||||
| 	attachedVolumes map[api.UniqueVolumeName]attachedVolume | ||||
|  | ||||
| 	// volumePluginMgr is the volume plugin manager used to create volume | ||||
| 	// plugin objects. | ||||
| 	volumePluginMgr *volume.VolumePluginMgr | ||||
|   | ||||
| @@ -58,7 +58,8 @@ type DesiredStateOfWorld interface { | ||||
| 	// ReportedInUse value is reset to false. The default ReportedInUse value | ||||
| 	// for a newly created volume is false. | ||||
| 	// When set to true this value indicates that the volume was successfully | ||||
| 	// added to the VolumesInUse field in the node's status. | ||||
| 	// added to the VolumesInUse field in the node's status. Mount operation needs | ||||
| 	// to check this value before issuing the operation. | ||||
| 	// If a volume in the reportedVolumes list does not exist in the list of | ||||
| 	// volumes that should be attached to this node, it is skipped without error. | ||||
| 	MarkVolumesReportedInUse(reportedVolumes []api.UniqueVolumeName) | ||||
|   | ||||
| @@ -37,7 +37,7 @@ import ( | ||||
| 	"k8s.io/kubernetes/pkg/util/mount" | ||||
| 	"k8s.io/kubernetes/pkg/util/strings" | ||||
| 	"k8s.io/kubernetes/pkg/util/wait" | ||||
| 	"k8s.io/kubernetes/pkg/volume" | ||||
| 	volumepkg "k8s.io/kubernetes/pkg/volume" | ||||
| 	"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations" | ||||
| 	"k8s.io/kubernetes/pkg/volume/util/operationexecutor" | ||||
| 	volumetypes "k8s.io/kubernetes/pkg/volume/util/types" | ||||
| @@ -59,6 +59,10 @@ type Reconciler interface { | ||||
| 	// volumes that should be attached are attached and volumes that should | ||||
| 	// be detached are detached and trigger attach/detach operations as needed. | ||||
| 	Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) | ||||
|  | ||||
| 	// StatesHasBeenSynced returns true only after syncStates process starts to sync | ||||
| 	// states at least once after kubelet starts | ||||
| 	StatesHasBeenSynced() bool | ||||
| } | ||||
|  | ||||
| // NewReconciler returns a new instance of Reconciler. | ||||
| @@ -68,7 +72,7 @@ type Reconciler interface { | ||||
| //   this node, and therefore the volume manager should not | ||||
| // loopSleepDuration - the amount of time the reconciler loop sleeps between | ||||
| //   successive executions | ||||
| //   reconstructDuration - the amount of time the reconstruct sleeps between | ||||
| //   syncDuration - the amount of time the syncStates sleeps between | ||||
| //   successive executions | ||||
| // waitForAttachTimeout - the amount of time the Mount function will wait for | ||||
| //   the volume to be attached | ||||
| @@ -84,20 +88,20 @@ func NewReconciler( | ||||
| 	kubeClient internalclientset.Interface, | ||||
| 	controllerAttachDetachEnabled bool, | ||||
| 	loopSleepDuration time.Duration, | ||||
| 	reconstructDuration time.Duration, | ||||
| 	syncDuration time.Duration, | ||||
| 	waitForAttachTimeout time.Duration, | ||||
| 	nodeName types.NodeName, | ||||
| 	desiredStateOfWorld cache.DesiredStateOfWorld, | ||||
| 	actualStateOfWorld cache.ActualStateOfWorld, | ||||
| 	operationExecutor operationexecutor.OperationExecutor, | ||||
| 	mounter mount.Interface, | ||||
| 	volumePluginMgr *volume.VolumePluginMgr, | ||||
| 	volumePluginMgr *volumepkg.VolumePluginMgr, | ||||
| 	kubeletPodsDir string) Reconciler { | ||||
| 	return &reconciler{ | ||||
| 		kubeClient:                    kubeClient, | ||||
| 		controllerAttachDetachEnabled: controllerAttachDetachEnabled, | ||||
| 		loopSleepDuration:             loopSleepDuration, | ||||
| 		reconstructDuration:           reconstructDuration, | ||||
| 		syncDuration:                  syncDuration, | ||||
| 		waitForAttachTimeout:          waitForAttachTimeout, | ||||
| 		nodeName:                      nodeName, | ||||
| 		desiredStateOfWorld:           desiredStateOfWorld, | ||||
| @@ -106,7 +110,7 @@ func NewReconciler( | ||||
| 		mounter:                       mounter, | ||||
| 		volumePluginMgr:               volumePluginMgr, | ||||
| 		kubeletPodsDir:                kubeletPodsDir, | ||||
| 		timeOfLastReconstruct:         time.Now(), | ||||
| 		timeOfLastSync:                time.Time{}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -114,16 +118,16 @@ type reconciler struct { | ||||
| 	kubeClient                    internalclientset.Interface | ||||
| 	controllerAttachDetachEnabled bool | ||||
| 	loopSleepDuration             time.Duration | ||||
| 	reconstructDuration           time.Duration | ||||
| 	syncDuration                  time.Duration | ||||
| 	waitForAttachTimeout          time.Duration | ||||
| 	nodeName                      types.NodeName | ||||
| 	desiredStateOfWorld           cache.DesiredStateOfWorld | ||||
| 	actualStateOfWorld            cache.ActualStateOfWorld | ||||
| 	operationExecutor             operationexecutor.OperationExecutor | ||||
| 	mounter                       mount.Interface | ||||
| 	volumePluginMgr               *volume.VolumePluginMgr | ||||
| 	volumePluginMgr               *volumepkg.VolumePluginMgr | ||||
| 	kubeletPodsDir                string | ||||
| 	timeOfLastReconstruct         time.Time | ||||
| 	timeOfLastSync                time.Time | ||||
| } | ||||
|  | ||||
| func (rc *reconciler) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) { | ||||
| @@ -139,9 +143,9 @@ func (rc *reconciler) reconciliationLoopFunc(sourcesReady config.SourcesReady) f | ||||
| 		// reconciler's reconstruct process may add incomplete volume information and cause confusion. | ||||
| 		// In addition, if some sources are not ready, the reconstruct process may clean up pods' volumes | ||||
| 		// that are still in use because desired states could not get a complete list of pods. | ||||
| 		if sourcesReady.AllReady() && time.Since(rc.timeOfLastReconstruct) > rc.reconstructDuration { | ||||
| 		if sourcesReady.AllReady() && time.Since(rc.timeOfLastSync) > rc.syncDuration { | ||||
| 			glog.V(5).Infof("Sources are all ready, starting reconstruct state function") | ||||
| 			rc.reconstruct() | ||||
| 			rc.sync() | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| @@ -292,12 +296,17 @@ func (rc *reconciler) reconcile() { | ||||
| 					err) | ||||
| 			} | ||||
| 			if err == nil { | ||||
| 				glog.Infof("MountVolume operation started for volume %q (spec.Name: %q) to pod %q (UID: %q). %s", | ||||
| 				logMsg := fmt.Sprintf("MountVolume operation started for volume %q (spec.Name: %q) to pod %q (UID: %q). %s", | ||||
| 					volumeToMount.VolumeName, | ||||
| 					volumeToMount.VolumeSpec.Name(), | ||||
| 					volumeToMount.PodName, | ||||
| 					volumeToMount.Pod.UID, | ||||
| 					remountingLogStr) | ||||
| 				if remountingLogStr == "" { | ||||
| 					glog.V(1).Infof(logMsg) | ||||
| 				} else { | ||||
| 					glog.V(5).Infof(logMsg) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| @@ -366,18 +375,22 @@ func (rc *reconciler) reconcile() { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // reconstruct process tries to observe the real world by scanning all pods' volume directories from the disk. | ||||
| // sync process tries to observe the real world by scanning all pods' volume directories from the disk. | ||||
| // If the actual and desired state of worlds are not consistent with the observed world, it means that some | ||||
| // mounted volumes are left out probably during kubelet restart. This process will reconstruct | ||||
| // the volumes and udpate the actual and desired states. In the following reconciler loop, those volumes will | ||||
| // be cleaned up. | ||||
| func (rc *reconciler) reconstruct() { | ||||
| 	defer rc.updateReconstructTime() | ||||
| 	rc.reconstructStates(rc.kubeletPodsDir) | ||||
| func (rc *reconciler) sync() { | ||||
| 	defer rc.updateLastSyncTime() | ||||
| 	rc.syncStates(rc.kubeletPodsDir) | ||||
| } | ||||
|  | ||||
| func (rc *reconciler) updateReconstructTime() { | ||||
| 	rc.timeOfLastReconstruct = time.Now() | ||||
| func (rc *reconciler) updateLastSyncTime() { | ||||
| 	rc.timeOfLastSync = time.Now() | ||||
| } | ||||
|  | ||||
| func (rc *reconciler) StatesHasBeenSynced() bool { | ||||
| 	return !rc.timeOfLastSync.IsZero() | ||||
| } | ||||
|  | ||||
| type podVolume struct { | ||||
| @@ -387,25 +400,39 @@ type podVolume struct { | ||||
| 	pluginName     string | ||||
| } | ||||
|  | ||||
| type reconstructedVolume struct { | ||||
| 	volumeName          api.UniqueVolumeName | ||||
| 	podName             volumetypes.UniquePodName | ||||
| 	volumeSpec          *volumepkg.Spec | ||||
| 	outerVolumeSpecName string | ||||
| 	pod                 *api.Pod | ||||
| 	pluginIsAttachable  bool | ||||
| 	volumeGidValue      string | ||||
| 	devicePath          string | ||||
| 	reportedInUse       bool | ||||
| 	mounter             volumepkg.Mounter | ||||
| } | ||||
|  | ||||
| // reconstructFromDisk scans the volume directories under the given pod directory. If the volume is not | ||||
| // in either actual or desired state of world, or pending operation, this function will reconstruct | ||||
| // the volume spec and put it in both the actual and desired state of worlds. If no running | ||||
| // container is mounting the volume, the volume will be removed by desired state of world's populator and | ||||
| // cleaned up by the reconciler. | ||||
| func (rc *reconciler) reconstructStates(podsDir string) { | ||||
| func (rc *reconciler) syncStates(podsDir string) { | ||||
| 	// Get volumes information by reading the pod's directory | ||||
| 	podVolumes, err := getVolumesFromPodDir(podsDir) | ||||
| 	if err != nil { | ||||
| 		glog.Errorf("Cannot get volumes from disk %v", err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	volumesNeedUpdate := make(map[api.UniqueVolumeName]*reconstructedVolume) | ||||
| 	for _, volume := range podVolumes { | ||||
| 		volumeToMount, err := rc.reconstructVolume(volume) | ||||
| 		reconstructedVolume, err := rc.reconstructVolume(volume) | ||||
| 		if err != nil { | ||||
| 			glog.Errorf("Could not construct volume information: %v", err) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		// Check if there is an pending operation for the given pod and volume. | ||||
| 		// Need to check pending operation before checking the actual and desired | ||||
| 		// states to avoid race condition during checking. For example, the following | ||||
| @@ -413,26 +440,50 @@ func (rc *reconciler) reconstructStates(podsDir string) { | ||||
| 		// 1. Checking the pod and it does not exist in either actual or desired state. | ||||
| 		// 2. An operation for the given pod finishes and the actual state is updated. | ||||
| 		// 3. Checking and there is no pending operation for the given pod. | ||||
| 		if rc.operationExecutor.IsOperationPending(volumeToMount.VolumeName, volumeToMount.PodName) { | ||||
| 		// During state reconstruction period, no new volume operations could be issued. If the | ||||
| 		// mounted path is not in either pending operation, or actual or desired states, this | ||||
| 		// volume needs to be reconstructed back to the states. | ||||
| 		pending := rc.operationExecutor.IsOperationPending(reconstructedVolume.volumeName, reconstructedVolume.podName) | ||||
| 		dswExist := rc.desiredStateOfWorld.PodExistsInVolume(reconstructedVolume.podName, reconstructedVolume.volumeName) | ||||
| 		aswExist, _, _ := rc.actualStateOfWorld.PodExistsInVolume(reconstructedVolume.podName, reconstructedVolume.volumeName) | ||||
|  | ||||
| 		if !rc.StatesHasBeenSynced() { | ||||
| 			// In case this is the first time to reconstruct state after kubelet starts, for a persistant volume, it must have | ||||
| 			// been mounted before kubelet restarts because no mount operations could be started at this time (node | ||||
| 			// status has not yet been updated before this very first syncStates finishes, so that VerifyControllerAttachedVolume will fail), | ||||
| 			// In this case, the volume state should be put back to actual state now no matter desired state has it or not. | ||||
| 			// This is to prevent node status from being updated to empty for attachable volumes. This might happen because | ||||
| 			// in the case that a volume is discovered on disk, and it is part of desired state, but is then quickly deleted | ||||
| 			// from the desired state. If in such situation, the volume is not added to the actual state, the node status updater will | ||||
| 			// not get this volume from either actual or desired state. In turn, this might cause master controller | ||||
| 			// detaching while the volume is still mounted. | ||||
| 			if aswExist || !reconstructedVolume.pluginIsAttachable { | ||||
| 				continue | ||||
| 			} | ||||
| 		desiredPods := rc.desiredStateOfWorld.GetPods() | ||||
| 		actualPods := rc.actualStateOfWorld.GetPods() | ||||
| 		if desiredPods[volume.podName] || actualPods[volume.podName] { | ||||
| 		} else { | ||||
| 			// Check pending first since no new operations could be started at this point. | ||||
| 			// Otherwise there might a race condition in checking actual states and pending operations | ||||
| 			if pending || dswExist || aswExist { | ||||
| 				continue | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		glog.V(3).Infof( | ||||
| 			"Could not find pod information in desired or actual states or pending operation, update it in both states: %+v", | ||||
| 			volumeToMount) | ||||
| 		if err = rc.updateStates(volumeToMount); err != nil { | ||||
| 		glog.V(2).Infof( | ||||
| 			"Reconciler sync states: could not find pod information in desired or actual states or pending operation, update it in both states: %+v", | ||||
| 			reconstructedVolume) | ||||
| 		volumesNeedUpdate[reconstructedVolume.volumeName] = reconstructedVolume | ||||
|  | ||||
| 	} | ||||
| 	if len(volumesNeedUpdate) > 0 { | ||||
| 		if err = rc.updateStates(volumesNeedUpdate); err != nil { | ||||
| 			glog.Errorf("Error occurred during reconstruct volume from disk: %v", err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| } | ||||
|  | ||||
| // Reconstruct Volume object and volumeToMount data structure by reading the pod's volume directories | ||||
| func (rc *reconciler) reconstructVolume(volume podVolume) (*operationexecutor.VolumeToMount, error) { | ||||
| // Reconstruct Volume object and reconstructedVolume data structure by reading the pod's volume directories | ||||
| func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, error) { | ||||
| 	plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| @@ -461,48 +512,83 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*operationexecutor.Vo | ||||
| 		uniqueVolumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(volume.podName, plugin, volumeSpec) | ||||
| 	} | ||||
|  | ||||
| 	volumeToMount := &operationexecutor.VolumeToMount{ | ||||
| 		VolumeName:          uniqueVolumeName, | ||||
| 		PodName:             volume.podName, | ||||
| 		VolumeSpec:          volumeSpec, | ||||
| 		OuterVolumeSpecName: volumeName, /*volumeName is InnerVolumeSpecName. But this information will not be used for cleanup*/ | ||||
| 		Pod:                 pod, | ||||
| 		PluginIsAttachable:  attachablePlugin != nil, | ||||
| 		VolumeGidValue:      "", | ||||
| 		DevicePath:          "", | ||||
| 	volumeMounter, newMounterErr := plugin.NewMounter( | ||||
| 		volumeSpec, | ||||
| 		pod, | ||||
| 		volumepkg.VolumeOptions{}) | ||||
| 	if newMounterErr != nil { | ||||
| 		return nil, fmt.Errorf( | ||||
| 			"MountVolume.NewMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", | ||||
| 			uniqueVolumeName, | ||||
| 			volumeSpec.Name(), | ||||
| 			volume.podName, | ||||
| 			pod.UID, | ||||
| 			newMounterErr) | ||||
| 	} | ||||
| 	return volumeToMount, nil | ||||
|  | ||||
| 	reconstructedVolume := &reconstructedVolume{ | ||||
| 		volumeName:          uniqueVolumeName, | ||||
| 		podName:             volume.podName, | ||||
| 		volumeSpec:          volumeSpec, | ||||
| 		outerVolumeSpecName: volumeName, /* volumeName is InnerVolumeSpecName. But this information will not be used for cleanup */ | ||||
| 		pod:                 pod, | ||||
| 		pluginIsAttachable:  attachablePlugin != nil, | ||||
| 		volumeGidValue:      "", | ||||
| 		devicePath:          "", | ||||
| 		mounter:             volumeMounter, | ||||
| 	} | ||||
| 	return reconstructedVolume, nil | ||||
| } | ||||
|  | ||||
| func (rc *reconciler) updateStates(volumeToMount *operationexecutor.VolumeToMount) error { | ||||
| func (rc *reconciler) updateStates(volumesNeedUpdate map[api.UniqueVolumeName]*reconstructedVolume) error { | ||||
| 	// Get the node status to retrieve volume device path information. | ||||
| 	node, fetchErr := rc.kubeClient.Core().Nodes().Get(string(rc.nodeName)) | ||||
| 	if fetchErr != nil { | ||||
| 		glog.Errorf("updateStates in reconciler: could not get node status with error %v", fetchErr) | ||||
| 	} else { | ||||
| 		for _, attachedVolume := range node.Status.VolumesAttached { | ||||
| 			if volume, exists := volumesNeedUpdate[attachedVolume.Name]; exists { | ||||
| 				volume.devicePath = attachedVolume.DevicePath | ||||
| 				volumesNeedUpdate[attachedVolume.Name] = volume | ||||
| 				glog.V(4).Infof("Get devicePath from node status for volume (%q): %q", attachedVolume.Name, volume.devicePath) | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	for _, volume := range volumesNeedUpdate { | ||||
| 		err := rc.actualStateOfWorld.MarkVolumeAsAttached( | ||||
| 		volumeToMount.VolumeName, volumeToMount.VolumeSpec, "", volumeToMount.DevicePath) | ||||
| 			volume.volumeName, volume.volumeSpec, "" /* nodeName */, volume.devicePath) | ||||
| 		if err != nil { | ||||
| 		return fmt.Errorf("Could not add volume information to actual state of world: %v", err) | ||||
| 			glog.Errorf("Could not add volume information to actual state of world: %v", err) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		err = rc.actualStateOfWorld.AddPodToVolume( | ||||
| 		volumeToMount.PodName, | ||||
| 		types.UID(volumeToMount.PodName), | ||||
| 		volumeToMount.VolumeName, | ||||
| 		nil, | ||||
| 		volumeToMount.OuterVolumeSpecName, | ||||
| 		volumeToMount.DevicePath) | ||||
| 			volume.podName, | ||||
| 			types.UID(volume.podName), | ||||
| 			volume.volumeName, | ||||
| 			volume.mounter, | ||||
| 			volume.outerVolumeSpecName, | ||||
| 			volume.devicePath) | ||||
| 		if err != nil { | ||||
| 		return fmt.Errorf("Could not add pod to volume information to actual state of world: %v", err) | ||||
| 			glog.Errorf("Could not add pod to volume information to actual state of world: %v", err) | ||||
| 			continue | ||||
| 		} | ||||
| 	if volumeToMount.PluginIsAttachable { | ||||
| 		err = rc.actualStateOfWorld.MarkDeviceAsMounted(volumeToMount.VolumeName) | ||||
| 		if volume.pluginIsAttachable { | ||||
| 			err = rc.actualStateOfWorld.MarkDeviceAsMounted(volume.volumeName) | ||||
| 			if err != nil { | ||||
| 			return fmt.Errorf("Could not mark device is mounted to actual state of world: %v", err) | ||||
| 				glog.Errorf("Could not mark device is mounted to actual state of world: %v", err) | ||||
| 				continue | ||||
| 			} | ||||
| 		} | ||||
| 	_, err = rc.desiredStateOfWorld.AddPodToVolume(volumeToMount.PodName, | ||||
| 		volumeToMount.Pod, | ||||
| 		volumeToMount.VolumeSpec, | ||||
| 		volumeToMount.OuterVolumeSpecName, | ||||
| 		volumeToMount.VolumeGidValue) | ||||
| 		_, err = rc.desiredStateOfWorld.AddPodToVolume(volume.podName, | ||||
| 			volume.pod, | ||||
| 			volume.volumeSpec, | ||||
| 			volume.outerVolumeSpecName, | ||||
| 			volume.volumeGidValue) | ||||
| 		if err != nil { | ||||
| 		return fmt.Errorf("Could not add pod to volume information to desired state of world: %v", err) | ||||
| 			glog.Errorf("Could not add pod to volume information to desired state of world: %v", err) | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
| @@ -43,7 +43,7 @@ const ( | ||||
| 	// reconcilerLoopSleepDuration is the amount of time the reconciler loop | ||||
| 	// waits between successive executions | ||||
| 	reconcilerLoopSleepDuration     time.Duration = 0 * time.Millisecond | ||||
| 	reconcilerReconstructSleepPeriod time.Duration = 10 * time.Minute | ||||
| 	reconcilerSyncStatesSleepPeriod time.Duration = 10 * time.Minute | ||||
| 	// waitForAttachTimeout is the maximum amount of time a | ||||
| 	// operationexecutor.Mount call will wait for a volume to be attached. | ||||
| 	waitForAttachTimeout time.Duration     = 1 * time.Second | ||||
| @@ -65,7 +65,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) { | ||||
| 		kubeClient, | ||||
| 		false, /* controllerAttachDetachEnabled */ | ||||
| 		reconcilerLoopSleepDuration, | ||||
| 		reconcilerReconstructSleepPeriod, | ||||
| 		reconcilerSyncStatesSleepPeriod, | ||||
| 		waitForAttachTimeout, | ||||
| 		nodeName, | ||||
| 		dsw, | ||||
| @@ -102,7 +102,7 @@ func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) { | ||||
| 		kubeClient, | ||||
| 		false, /* controllerAttachDetachEnabled */ | ||||
| 		reconcilerLoopSleepDuration, | ||||
| 		reconcilerReconstructSleepPeriod, | ||||
| 		reconcilerSyncStatesSleepPeriod, | ||||
| 		waitForAttachTimeout, | ||||
| 		nodeName, | ||||
| 		dsw, | ||||
| @@ -173,7 +173,7 @@ func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) { | ||||
| 		kubeClient, | ||||
| 		true, /* controllerAttachDetachEnabled */ | ||||
| 		reconcilerLoopSleepDuration, | ||||
| 		reconcilerReconstructSleepPeriod, | ||||
| 		reconcilerSyncStatesSleepPeriod, | ||||
| 		waitForAttachTimeout, | ||||
| 		nodeName, | ||||
| 		dsw, | ||||
| @@ -245,7 +245,7 @@ func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) { | ||||
| 		kubeClient, | ||||
| 		false, /* controllerAttachDetachEnabled */ | ||||
| 		reconcilerLoopSleepDuration, | ||||
| 		reconcilerReconstructSleepPeriod, | ||||
| 		reconcilerSyncStatesSleepPeriod, | ||||
| 		waitForAttachTimeout, | ||||
| 		nodeName, | ||||
| 		dsw, | ||||
| @@ -328,7 +328,7 @@ func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) { | ||||
| 		kubeClient, | ||||
| 		true, /* controllerAttachDetachEnabled */ | ||||
| 		reconcilerLoopSleepDuration, | ||||
| 		reconcilerReconstructSleepPeriod, | ||||
| 		reconcilerSyncStatesSleepPeriod, | ||||
| 		waitForAttachTimeout, | ||||
| 		nodeName, | ||||
| 		dsw, | ||||
|   | ||||
| @@ -49,9 +49,9 @@ const ( | ||||
| 	// between successive executions | ||||
| 	reconcilerLoopSleepPeriod time.Duration = 100 * time.Millisecond | ||||
|  | ||||
| 	// reconcilerReconstructSleepPeriod is the amount of time the reconciler reconstruct process | ||||
| 	// reconcilerSyncStatesSleepPeriod is the amount of time the reconciler reconstruct process | ||||
| 	// waits between successive executions | ||||
| 	reconcilerReconstructSleepPeriod time.Duration = 3 * time.Minute | ||||
| 	reconcilerSyncStatesSleepPeriod time.Duration = 3 * time.Minute | ||||
|  | ||||
| 	// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the | ||||
| 	// DesiredStateOfWorldPopulator loop waits between successive executions | ||||
| @@ -115,7 +115,7 @@ type VolumeManager interface { | ||||
| 	// from annotations on persistent volumes that the pod depends on. | ||||
| 	GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 | ||||
|  | ||||
| 	// Returns a list of all volumes that implement the volume.Attacher | ||||
| 	// GetVolumesInUse returns a list of all volumes that implement the volume.Attacher | ||||
| 	// interface and are currently in use according to the actual and desired | ||||
| 	// state of the world caches. A volume is considered "in use" as soon as it | ||||
| 	// is added to the desired state of world, indicating it *should* be | ||||
| @@ -126,6 +126,11 @@ type VolumeManager interface { | ||||
| 	// restarts. | ||||
| 	GetVolumesInUse() []api.UniqueVolumeName | ||||
|  | ||||
| 	// ReconcilerStatesHasBeenSynced returns true only after the actual states in reconciler | ||||
| 	// has been synced at least once after kubelet starts so that it is safe to update mounted | ||||
| 	// volume list retrieved from actual state. | ||||
| 	ReconcilerStatesHasBeenSynced() bool | ||||
|  | ||||
| 	// VolumeIsAttached returns true if the given volume is attached to this | ||||
| 	// node. | ||||
| 	VolumeIsAttached(volumeName api.UniqueVolumeName) bool | ||||
| @@ -168,7 +173,7 @@ func NewVolumeManager( | ||||
| 		kubeClient, | ||||
| 		controllerAttachDetachEnabled, | ||||
| 		reconcilerLoopSleepPeriod, | ||||
| 		reconcilerReconstructSleepPeriod, | ||||
| 		reconcilerSyncStatesSleepPeriod, | ||||
| 		waitForAttachTimeout, | ||||
| 		nodeName, | ||||
| 		vm.desiredStateOfWorld, | ||||
| @@ -305,6 +310,10 @@ func (vm *volumeManager) GetVolumesInUse() []api.UniqueVolumeName { | ||||
| 	return volumesToReportInUse | ||||
| } | ||||
|  | ||||
| func (vm *volumeManager) ReconcilerStatesHasBeenSynced() bool { | ||||
| 	return vm.reconciler.StatesHasBeenSynced() | ||||
| } | ||||
|  | ||||
| func (vm *volumeManager) VolumeIsAttached( | ||||
| 	volumeName api.UniqueVolumeName) bool { | ||||
| 	return vm.actualStateOfWorld.VolumeExists(volumeName) | ||||
|   | ||||
| @@ -101,9 +101,10 @@ func isBind(options []string) (bool, []string) { | ||||
|  | ||||
| // doMount runs the mount command. | ||||
| func doMount(mountCmd string, source string, target string, fstype string, options []string) error { | ||||
| 	glog.V(5).Infof("Mounting %s %s %s %v with command: %q", source, target, fstype, options, mountCmd) | ||||
| 	glog.V(4).Infof("Mounting %s %s %s %v with command: %q", source, target, fstype, options, mountCmd) | ||||
| 	mountArgs := makeMountArgs(source, target, fstype, options) | ||||
|  | ||||
| 	glog.V(4).Infof("Mounting cmd (%s) with arguments (%s)", mountCmd, mountArgs) | ||||
| 	command := exec.Command(mountCmd, mountArgs...) | ||||
| 	output, err := command.CombinedOutput() | ||||
| 	if err != nil { | ||||
| @@ -135,7 +136,7 @@ func makeMountArgs(source, target, fstype string, options []string) []string { | ||||
|  | ||||
| // Unmount unmounts the target. | ||||
| func (mounter *Mounter) Unmount(target string) error { | ||||
| 	glog.V(5).Infof("Unmounting %s", target) | ||||
| 	glog.V(4).Infof("Unmounting %s", target) | ||||
| 	command := exec.Command("umount", target) | ||||
| 	output, err := command.CombinedOutput() | ||||
| 	if err != nil { | ||||
| @@ -156,6 +157,10 @@ func (*Mounter) List() ([]MountPoint, error) { | ||||
| // will return true. When in fact /tmp/b is a mount point. If this situation | ||||
| // if of interest to you, don't use this function... | ||||
| func (mounter *Mounter) IsLikelyNotMountPoint(file string) (bool, error) { | ||||
| 	return IsNotMountPoint(file) | ||||
| } | ||||
|  | ||||
| func IsNotMountPoint(file string) (bool, error) { | ||||
| 	stat, err := os.Stat(file) | ||||
| 	if err != nil { | ||||
| 		return true, err | ||||
| @@ -173,9 +178,10 @@ func (mounter *Mounter) IsLikelyNotMountPoint(file string) (bool, error) { | ||||
| } | ||||
|  | ||||
| // DeviceOpened checks if block device in use by calling Open with O_EXCL flag. | ||||
| // Returns true if open returns errno EBUSY, and false if errno is nil. | ||||
| // Returns an error if errno is any error other than EBUSY. | ||||
| // Returns with error if pathname is not a device. | ||||
| // If pathname is not a device, log and return false with nil error. | ||||
| // If open returns errno EBUSY, return true with nil error. | ||||
| // If open returns nil, return false with nil error. | ||||
| // Otherwise, return false with error | ||||
| func (mounter *Mounter) DeviceOpened(pathname string) (bool, error) { | ||||
| 	return exclusiveOpenFailsOnDevice(pathname) | ||||
| } | ||||
| @@ -187,12 +193,17 @@ func (mounter *Mounter) PathIsDevice(pathname string) (bool, error) { | ||||
| } | ||||
|  | ||||
| func exclusiveOpenFailsOnDevice(pathname string) (bool, error) { | ||||
| 	if isDevice, err := pathIsDevice(pathname); !isDevice { | ||||
| 	isDevice, err := pathIsDevice(pathname) | ||||
| 	if err != nil { | ||||
| 		return false, fmt.Errorf( | ||||
| 			"PathIsDevice failed for path %q: %v", | ||||
| 			pathname, | ||||
| 			err) | ||||
| 	} | ||||
| 	if !isDevice { | ||||
| 		glog.Errorf("Path %q is not refering to a device.", pathname) | ||||
| 		return false, nil | ||||
| 	} | ||||
| 	fd, errno := syscall.Open(pathname, syscall.O_RDONLY|syscall.O_EXCL, 0) | ||||
| 	// If the device is in use, open will return an invalid fd. | ||||
| 	// When this happens, it is expected that Close will fail and throw an error. | ||||
|   | ||||
| @@ -247,7 +247,7 @@ func (b *gcePersistentDiskMounter) SetUp(fsGroup *int64) error { | ||||
| func (b *gcePersistentDiskMounter) SetUpAt(dir string, fsGroup *int64) error { | ||||
| 	// TODO: handle failed mounts here. | ||||
| 	notMnt, err := b.mounter.IsLikelyNotMountPoint(dir) | ||||
| 	glog.V(4).Infof("PersistentDisk set up: %s %v %v, pd name %v readOnly %v", dir, !notMnt, err, b.pdName, b.readOnly) | ||||
| 	glog.V(4).Infof("GCE PersistentDisk set up: Dir (%s) PD name (%q) Mounted (%t) Error (%v), ReadOnly (%t)", dir, b.pdName, !notMnt, err, b.readOnly) | ||||
| 	if err != nil && !os.IsNotExist(err) { | ||||
| 		glog.Errorf("cannot validate mount point: %s %v", dir, err) | ||||
| 		return err | ||||
|   | ||||
| @@ -758,11 +758,12 @@ func (oe *operationExecutor) generateMountVolumeFunc( | ||||
| 			} | ||||
|  | ||||
| 			glog.Infof( | ||||
| 				"MountVolume.MountDevice succeeded for volume %q (spec.Name: %q) pod %q (UID: %q).", | ||||
| 				"MountVolume.MountDevice succeeded for volume %q (spec.Name: %q) pod %q (UID: %q) device mount path %q", | ||||
| 				volumeToMount.VolumeName, | ||||
| 				volumeToMount.VolumeSpec.Name(), | ||||
| 				volumeToMount.PodName, | ||||
| 				volumeToMount.Pod.UID) | ||||
| 				volumeToMount.Pod.UID, | ||||
| 				deviceMountPath) | ||||
|  | ||||
| 			// Update actual state of world to reflect volume is globally mounted | ||||
| 			markDeviceMountedErr := actualStateOfWorld.MarkDeviceAsMounted( | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Jing Xu
					Jing Xu