Merge pull request #56836 from vladimirvivien/csi-teardown-fix
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. CSI bug fixes for teardown, nodepublish probe, target path creation **What this PR does / why we need it**: This PR addresses several critical bug fixes in CSI including mounter teardown fix, create nodeprobe prior to mount, and pre-create target path. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #56817, #56815, #56813 **Special notes for your reviewer**: **Release note**: ```release-note NONE ```
This commit is contained in:
		| @@ -14,6 +14,7 @@ go_library( | ||||
|         "//pkg/util/mount:go_default_library", | ||||
|         "//pkg/util/strings:go_default_library", | ||||
|         "//pkg/volume:go_default_library", | ||||
|         "//pkg/volume/util:go_default_library", | ||||
|         "//vendor/github.com/container-storage-interface/spec/lib/go/csi:go_default_library", | ||||
|         "//vendor/github.com/golang/glog:go_default_library", | ||||
|         "//vendor/golang.org/x/net/context:go_default_library", | ||||
| @@ -38,7 +39,6 @@ go_test( | ||||
|     importpath = "k8s.io/kubernetes/pkg/volume/csi", | ||||
|     library = ":go_default_library", | ||||
|     deps = [ | ||||
|         "//pkg/util/strings:go_default_library", | ||||
|         "//pkg/volume:go_default_library", | ||||
|         "//pkg/volume/csi/fake:go_default_library", | ||||
|         "//pkg/volume/testing:go_default_library", | ||||
|   | ||||
| @@ -32,6 +32,7 @@ import ( | ||||
|  | ||||
| type csiClient interface { | ||||
| 	AssertSupportedVersion(ctx grpctx.Context, ver *csipb.Version) error | ||||
| 	NodeProbe(ctx grpctx.Context, ver *csipb.Version) error | ||||
| 	NodePublishVolume( | ||||
| 		ctx grpctx.Context, | ||||
| 		volumeid string, | ||||
| @@ -135,6 +136,13 @@ func (c *csiDriverClient) AssertSupportedVersion(ctx grpctx.Context, ver *csipb. | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (c *csiDriverClient) NodeProbe(ctx grpctx.Context, ver *csipb.Version) error { | ||||
| 	glog.V(4).Info(log("sending NodeProbe rpc call to csi driver: [version %v]", ver)) | ||||
| 	req := &csipb.NodeProbeRequest{Version: ver} | ||||
| 	_, err := c.nodeClient.NodeProbe(ctx, req) | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| func (c *csiDriverClient) NodePublishVolume( | ||||
| 	ctx grpctx.Context, | ||||
| 	volID string, | ||||
| @@ -145,7 +153,7 @@ func (c *csiDriverClient) NodePublishVolume( | ||||
| 	volumeAttribs map[string]string, | ||||
| 	fsType string, | ||||
| ) error { | ||||
|  | ||||
| 	glog.V(4).Info(log("calling NodePublishVolume rpc [volid=%s,target_path=%s]", volID, targetPath)) | ||||
| 	if volID == "" { | ||||
| 		return errors.New("missing volume id") | ||||
| 	} | ||||
| @@ -182,7 +190,7 @@ func (c *csiDriverClient) NodePublishVolume( | ||||
| } | ||||
|  | ||||
| func (c *csiDriverClient) NodeUnpublishVolume(ctx grpctx.Context, volID string, targetPath string) error { | ||||
|  | ||||
| 	glog.V(4).Info(log("calling NodeUnpublishVolume rpc: [volid=%s, target_path=%s", volID, targetPath)) | ||||
| 	if volID == "" { | ||||
| 		return errors.New("missing volume id") | ||||
| 	} | ||||
|   | ||||
| @@ -62,6 +62,28 @@ func TestClientAssertSupportedVersion(t *testing.T) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestClientNodeProbe(t *testing.T) { | ||||
| 	testCases := []struct { | ||||
| 		testName string | ||||
| 		ver      *csipb.Version | ||||
| 		mustFail bool | ||||
| 		err      error | ||||
| 	}{ | ||||
| 		{testName: "supported version", ver: &csipb.Version{Major: 0, Minor: 1, Patch: 0}}, | ||||
| 		{testName: "grpc error", ver: &csipb.Version{Major: 0, Minor: 1, Patch: 0}, mustFail: true, err: errors.New("grpc error")}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range testCases { | ||||
| 		t.Log("case: ", tc.testName) | ||||
| 		client := setupClient(t) | ||||
| 		client.nodeClient.(*fake.NodeClient).SetNextError(tc.err) | ||||
| 		err := client.NodeProbe(grpctx.Background(), tc.ver) | ||||
| 		if tc.mustFail && err == nil { | ||||
| 			t.Error("must fail, but err = nil") | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestClientNodePublishVolume(t *testing.T) { | ||||
| 	testCases := []struct { | ||||
| 		name       string | ||||
|   | ||||
| @@ -20,6 +20,7 @@ import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"os" | ||||
| 	"path" | ||||
|  | ||||
| 	"github.com/golang/glog" | ||||
| @@ -30,20 +31,39 @@ import ( | ||||
| 	"k8s.io/client-go/kubernetes" | ||||
| 	kstrings "k8s.io/kubernetes/pkg/util/strings" | ||||
| 	"k8s.io/kubernetes/pkg/volume" | ||||
| 	"k8s.io/kubernetes/pkg/volume/util" | ||||
| ) | ||||
|  | ||||
| //TODO (vladimirvivien) move this in a central loc later | ||||
| var ( | ||||
| 	volDataKey = struct { | ||||
| 		specVolID, | ||||
| 		volHandle, | ||||
| 		driverName, | ||||
| 		nodeName, | ||||
| 		attachmentID string | ||||
| 	}{ | ||||
| 		"specVolID", | ||||
| 		"volumeHandle", | ||||
| 		"driverName", | ||||
| 		"nodeName", | ||||
| 		"attachmentID", | ||||
| 	} | ||||
| ) | ||||
|  | ||||
| type csiMountMgr struct { | ||||
| 	k8s        kubernetes.Interface | ||||
| 	csiClient  csiClient | ||||
| 	plugin     *csiPlugin | ||||
| 	driverName string | ||||
| 	volumeID   string | ||||
| 	readOnly   bool | ||||
| 	spec       *volume.Spec | ||||
| 	pod        *api.Pod | ||||
| 	podUID     types.UID | ||||
| 	options    volume.VolumeOptions | ||||
| 	volumeInfo map[string]string | ||||
| 	k8s          kubernetes.Interface | ||||
| 	csiClient    csiClient | ||||
| 	plugin       *csiPlugin | ||||
| 	driverName   string | ||||
| 	volumeID     string | ||||
| 	specVolumeID string | ||||
| 	readOnly     bool | ||||
| 	spec         *volume.Spec | ||||
| 	pod          *api.Pod | ||||
| 	podUID       types.UID | ||||
| 	options      volume.VolumeOptions | ||||
| 	volumeInfo   map[string]string | ||||
| 	volume.MetricsNil | ||||
| } | ||||
|  | ||||
| @@ -51,14 +71,14 @@ type csiMountMgr struct { | ||||
| var _ volume.Volume = &csiMountMgr{} | ||||
|  | ||||
| func (c *csiMountMgr) GetPath() string { | ||||
| 	return getTargetPath(c.podUID, c.driverName, c.volumeID, c.plugin.host) | ||||
| 	dir := path.Join(getTargetPath(c.podUID, c.specVolumeID, c.plugin.host), "/mount") | ||||
| 	glog.V(4).Info(log("mounter.GetPath generated [%s]", dir)) | ||||
| 	return dir | ||||
| } | ||||
|  | ||||
| func getTargetPath(uid types.UID, driverName string, volID string, host volume.VolumeHost) string { | ||||
| 	// driverName validated at Mounter creation | ||||
| 	// sanitize (replace / with ~) in volumeID before it's appended to path:w | ||||
| 	driverPath := fmt.Sprintf("%s/%s", driverName, kstrings.EscapeQualifiedNameForDisk(volID)) | ||||
| 	return host.GetPodVolumeDir(uid, kstrings.EscapeQualifiedNameForDisk(csiPluginName), driverPath) | ||||
| func getTargetPath(uid types.UID, specVolumeID string, host volume.VolumeHost) string { | ||||
| 	specVolID := kstrings.EscapeQualifiedNameForDisk(specVolumeID) | ||||
| 	return host.GetPodVolumeDir(uid, kstrings.EscapeQualifiedNameForDisk(csiPluginName), specVolID) | ||||
| } | ||||
|  | ||||
| // volume.Mounter methods | ||||
| @@ -77,6 +97,17 @@ func (c *csiMountMgr) SetUp(fsGroup *int64) error { | ||||
| func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { | ||||
| 	glog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir)) | ||||
|  | ||||
| 	mounted, err := isDirMounted(c.plugin, dir) | ||||
| 	if err != nil { | ||||
| 		glog.Error(log("mounter.SetUpAt failed while checking mount status for dir [%s]", dir)) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if mounted { | ||||
| 		glog.V(4).Info(log("mounter.SetUpAt skipping mount, dir already mounted [%s]", dir)) | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	csiSource, err := getCSISourceFromSpec(c.spec) | ||||
| 	if err != nil { | ||||
| 		glog.Error(log("mounter.SetupAt failed to get CSI persistent source: %v", err)) | ||||
| @@ -92,13 +123,19 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { | ||||
|  | ||||
| 	// ensure version is supported | ||||
| 	if err := csi.AssertSupportedVersion(ctx, csiVersion); err != nil { | ||||
| 		glog.Errorf(log("failed to assert version: %v", err)) | ||||
| 		glog.Error(log("mounter.SetUpAt failed to assert version: %v", err)) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// probe driver | ||||
| 	// TODO (vladimirvivien) move probe call where it is done only when it is needed. | ||||
| 	if err := csi.NodeProbe(ctx, csiVersion); err != nil { | ||||
| 		glog.Error(log("mounter.SetUpAt failed to probe driver: %v", err)) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName | ||||
| 	if c.volumeInfo == nil { | ||||
|  | ||||
| 		attachment, err := c.k8s.StorageV1alpha1().VolumeAttachments().Get(attachID, meta.GetOptions{}) | ||||
| 		if err != nil { | ||||
| 			glog.Error(log("mounter.SetupAt failed while getting volume attachment [id=%v]: %v", attachID, err)) | ||||
| @@ -121,6 +158,31 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// create target_dir before call to NodePublish | ||||
| 	if err := os.MkdirAll(dir, 0750); err != nil { | ||||
| 		glog.Error(log("mouter.SetUpAt failed to create dir %#v:  %v", dir, err)) | ||||
| 		return err | ||||
| 	} | ||||
| 	glog.V(4).Info(log("created target path successfully [%s]", dir)) | ||||
|  | ||||
| 	// persist volume info data for teardown | ||||
| 	volData := map[string]string{ | ||||
| 		volDataKey.specVolID:    c.spec.Name(), | ||||
| 		volDataKey.volHandle:    csiSource.VolumeHandle, | ||||
| 		volDataKey.driverName:   csiSource.Driver, | ||||
| 		volDataKey.nodeName:     nodeName, | ||||
| 		volDataKey.attachmentID: attachID, | ||||
| 	} | ||||
|  | ||||
| 	if err := saveVolumeData(c.plugin, c.podUID, c.spec.Name(), volData); err != nil { | ||||
| 		glog.Error(log("mounter.SetUpAt failed to save volume info data: %v", err)) | ||||
| 		if err := removeMountDir(c.plugin, dir); err != nil { | ||||
| 			glog.Error(log("mounter.SetUpAt failed to remove mount dir after a saveVolumeData() error [%s]: %v", dir, err)) | ||||
| 			return err | ||||
| 		} | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI | ||||
| 	accessMode := api.ReadWriteOnce | ||||
| 	if c.spec.PersistentVolume.Spec.AccessModes != nil { | ||||
| @@ -139,11 +201,15 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error { | ||||
| 	) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		glog.Errorf(log("Mounter.SetupAt failed: %v", err)) | ||||
| 		glog.Errorf(log("mounter.SetupAt failed: %v", err)) | ||||
| 		if err := removeMountDir(c.plugin, dir); err != nil { | ||||
| 			glog.Error(log("mounter.SetuAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, err)) | ||||
| 			return err | ||||
| 		} | ||||
| 		return err | ||||
| 	} | ||||
| 	glog.V(4).Infof(log("successfully mounted %s", dir)) | ||||
|  | ||||
| 	glog.V(4).Infof(log("mounter.SetUp successfully requested NodePublish [%s]", dir)) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -164,10 +230,30 @@ func (c *csiMountMgr) TearDown() error { | ||||
| func (c *csiMountMgr) TearDownAt(dir string) error { | ||||
| 	glog.V(4).Infof(log("Unmounter.TearDown(%s)", dir)) | ||||
|  | ||||
| 	// extract driverName and volID from path | ||||
| 	base, volID := path.Split(dir) | ||||
| 	volID = kstrings.UnescapeQualifiedNameForDisk(volID) | ||||
| 	driverName := path.Base(base) | ||||
| 	// is dir even mounted ? | ||||
| 	// TODO (vladimirvivien) this check may not work for an emptyDir or local storage | ||||
| 	// see https://github.com/kubernetes/kubernetes/pull/56836#discussion_r155834524 | ||||
| 	mounted, err := isDirMounted(c.plugin, dir) | ||||
| 	if err != nil { | ||||
| 		glog.Error(log("unmounter.Teardown failed while checking mount status for dir [%s]: %v", dir, err)) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if !mounted { | ||||
| 		glog.V(4).Info(log("unmounter.Teardown skipping unmout, dir not mounted [%s]", dir)) | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	// load volume info from file | ||||
| 	dataDir := path.Dir(dir) // dropoff /mount at end | ||||
| 	data, err := loadVolumeData(dataDir, volDataFileName) | ||||
| 	if err != nil { | ||||
| 		glog.Error(log("unmounter.Teardown failed to load volume data file using dir [%s]: %v", dir, err)) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	volID := data[volDataKey.volHandle] | ||||
| 	driverName := data[volDataKey.driverName] | ||||
|  | ||||
| 	if c.csiClient == nil { | ||||
| 		addr := fmt.Sprintf(csiAddrTemplate, driverName) | ||||
| @@ -183,18 +269,21 @@ func (c *csiMountMgr) TearDownAt(dir string) error { | ||||
|  | ||||
| 	// TODO make all assertion calls private within the client itself | ||||
| 	if err := csi.AssertSupportedVersion(ctx, csiVersion); err != nil { | ||||
| 		glog.Errorf(log("failed to assert version: %v", err)) | ||||
| 		glog.Errorf(log("mounter.SetUpAt failed to assert version: %v", err)) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	err := csi.NodeUnpublishVolume(ctx, volID, dir) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		glog.Errorf(log("Mounter.Setup failed: %v", err)) | ||||
| 	if err := csi.NodeUnpublishVolume(ctx, volID, dir); err != nil { | ||||
| 		glog.Errorf(log("mounter.SetUpAt failed: %v", err)) | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	glog.V(4).Infof(log("successfully unmounted %s", dir)) | ||||
| 	// clean mount point dir | ||||
| 	if err := removeMountDir(c.plugin, dir); err != nil { | ||||
| 		glog.Error(log("mounter.SetUpAt failed to clean mount dir [%s]: %v", dir, err)) | ||||
| 		return err | ||||
| 	} | ||||
| 	glog.V(4).Infof(log("mounte.SetUpAt successfully unmounted dir [%s]", dir)) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
| @@ -221,3 +310,92 @@ func getVolAttribsFromSpec(spec *volume.Spec) (map[string]string, error) { | ||||
| 	} | ||||
| 	return attribs, nil | ||||
| } | ||||
|  | ||||
| // saveVolumeData persists parameter data as json file using the locagion | ||||
| // generated by /var/lib/kubelet/pods/<podID>/volumes/kubernetes.io~csi/<specVolId>/volume_data.json | ||||
| func saveVolumeData(p *csiPlugin, podUID types.UID, specVolID string, data map[string]string) error { | ||||
| 	dir := getTargetPath(podUID, specVolID, p.host) | ||||
| 	dataFilePath := path.Join(dir, volDataFileName) | ||||
|  | ||||
| 	file, err := os.Create(dataFilePath) | ||||
| 	if err != nil { | ||||
| 		glog.Error(log("failed to save volume data file %s: %v", dataFilePath, err)) | ||||
| 		return err | ||||
| 	} | ||||
| 	defer file.Close() | ||||
| 	if err := json.NewEncoder(file).Encode(data); err != nil { | ||||
| 		glog.Error(log("failed to save volume data file %s: %v", dataFilePath, err)) | ||||
| 		return err | ||||
| 	} | ||||
| 	glog.V(4).Info(log("volume data file saved successfully [%s]", dataFilePath)) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // loadVolumeData uses the directory returned by mounter.GetPath with value | ||||
| // /var/lib/kubelet/pods/<podID>/volumes/kubernetes.io~csi/<specVolumeId>/mount. | ||||
| // The function extracts specVolumeID and uses it to load the json data file from dir | ||||
| // /var/lib/kubelet/pods/<podID>/volumes/kubernetes.io~csi/<specVolId>/volume_data.json | ||||
| func loadVolumeData(dir string, fileName string) (map[string]string, error) { | ||||
| 	// remove /mount at the end | ||||
| 	dataFileName := path.Join(dir, fileName) | ||||
| 	glog.V(4).Info(log("loading volume data file [%s]", dataFileName)) | ||||
|  | ||||
| 	file, err := os.Open(dataFileName) | ||||
| 	if err != nil { | ||||
| 		glog.Error(log("failed to open volume data file [%s]: %v", dataFileName, err)) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	defer file.Close() | ||||
| 	data := map[string]string{} | ||||
| 	if err := json.NewDecoder(file).Decode(&data); err != nil { | ||||
| 		glog.Error(log("failed to parse volume data file [%s]: %v", dataFileName, err)) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return data, nil | ||||
| } | ||||
|  | ||||
| // isDirMounted returns the !notMounted result from IsLikelyNotMountPoint check | ||||
| func isDirMounted(plug *csiPlugin, dir string) (bool, error) { | ||||
| 	mounter := plug.host.GetMounter(plug.GetPluginName()) | ||||
| 	notMnt, err := mounter.IsLikelyNotMountPoint(dir) | ||||
| 	if err != nil && !os.IsNotExist(err) { | ||||
| 		glog.Error(log("isDirMounted IsLikelyNotMountPoint test failed for dir [%v]", dir)) | ||||
| 		return false, err | ||||
| 	} | ||||
| 	return !notMnt, nil | ||||
| } | ||||
|  | ||||
| // removeMountDir cleans the mount dir when dir is not mounted and removed the volume data file in dir | ||||
| func removeMountDir(plug *csiPlugin, mountPath string) error { | ||||
| 	glog.V(4).Info(log("removing mount path [%s]", mountPath)) | ||||
| 	if pathExists, pathErr := util.PathExists(mountPath); pathErr != nil { | ||||
| 		glog.Error(log("failed while checking mount path stat [%s]", pathErr)) | ||||
| 		return pathErr | ||||
| 	} else if !pathExists { | ||||
| 		glog.Warning(log("skipping mount dir removal, path does not exist [%v]", mountPath)) | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	mounter := plug.host.GetMounter(plug.GetPluginName()) | ||||
| 	notMnt, err := mounter.IsLikelyNotMountPoint(mountPath) | ||||
| 	if err != nil { | ||||
| 		glog.Error(log("mount dir removal failed [%s]: %v", mountPath, err)) | ||||
| 		return err | ||||
| 	} | ||||
| 	if notMnt { | ||||
| 		glog.V(4).Info(log("dir not mounted, deleting it [%s]", mountPath)) | ||||
| 		if err := os.Remove(mountPath); err != nil && !os.IsNotExist(err) { | ||||
| 			glog.Error(log("failed to remove dir [%s]: %v", mountPath, err)) | ||||
| 			return err | ||||
| 		} | ||||
| 		// remove volume data file as well | ||||
| 		dataFile := path.Join(path.Dir(mountPath), volDataFileName) | ||||
| 		glog.V(4).Info(log("also deleting volume info data file [%s]", dataFile)) | ||||
| 		if err := os.Remove(dataFile); err != nil && !os.IsNotExist(err) { | ||||
| 			glog.Error(log("failed to delete volume data file [%s]: %v", dataFile, err)) | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
| @@ -17,7 +17,10 @@ limitations under the License. | ||||
| package csi | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"io/ioutil" | ||||
| 	"os" | ||||
| 	"path" | ||||
| 	"testing" | ||||
| @@ -43,28 +46,44 @@ func TestMounterGetPath(t *testing.T) { | ||||
| 	plug, tmpDir := newTestPlugin(t) | ||||
| 	defer os.RemoveAll(tmpDir) | ||||
|  | ||||
| 	pv := makeTestPV("test-pv", 10, testDriver, testVol) | ||||
|  | ||||
| 	mounter, err := plug.NewMounter( | ||||
| 		volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly), | ||||
| 		&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, | ||||
| 		volume.VolumeOptions{}, | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Failed to make a new Mounter: %v", err) | ||||
| 	} | ||||
| 	csiMounter := mounter.(*csiMountMgr) | ||||
| 	expectedPath := path.Join(tmpDir, fmt.Sprintf( | ||||
| 		"pods/%s/volumes/kubernetes.io~csi/%s/%s", | ||||
| 		testPodUID, | ||||
| 		csiMounter.driverName, | ||||
| 		csiMounter.volumeID, | ||||
| 	)) | ||||
| 	mountPath := csiMounter.GetPath() | ||||
| 	if mountPath != expectedPath { | ||||
| 		t.Errorf("Got unexpected path: %s", mountPath) | ||||
| 	// TODO (vladimirvivien) specName with slashes will not work | ||||
| 	testCases := []struct { | ||||
| 		name           string | ||||
| 		specVolumeName string | ||||
| 		path           string | ||||
| 	}{ | ||||
| 		{ | ||||
| 			name:           "simple specName", | ||||
| 			specVolumeName: "spec-0", | ||||
| 			path:           path.Join(tmpDir, fmt.Sprintf("pods/%s/volumes/kubernetes.io~csi/%s/%s", testPodUID, "spec-0", "/mount")), | ||||
| 		}, | ||||
| 		{ | ||||
| 			name:           "specName with dots", | ||||
| 			specVolumeName: "test.spec.1", | ||||
| 			path:           path.Join(tmpDir, fmt.Sprintf("pods/%s/volumes/kubernetes.io~csi/%s/%s", testPodUID, "test.spec.1", "/mount")), | ||||
| 		}, | ||||
| 	} | ||||
| 	for _, tc := range testCases { | ||||
| 		t.Log("test case:", tc.name) | ||||
| 		pv := makeTestPV(tc.specVolumeName, 10, testDriver, testVol) | ||||
| 		spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly) | ||||
| 		mounter, err := plug.NewMounter( | ||||
| 			spec, | ||||
| 			&api.Pod{ObjectMeta: meta.ObjectMeta{UID: testPodUID, Namespace: testns}}, | ||||
| 			volume.VolumeOptions{}, | ||||
| 		) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("Failed to make a new Mounter: %v", err) | ||||
| 		} | ||||
| 		csiMounter := mounter.(*csiMountMgr) | ||||
|  | ||||
| 		path := csiMounter.GetPath() | ||||
| 		t.Log("*** GetPath: ", path) | ||||
|  | ||||
| 		if tc.path != path { | ||||
| 			t.Errorf("expecting path %s, got %s", tc.path, path) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestMounterSetUp(t *testing.T) { | ||||
| @@ -125,6 +144,14 @@ func TestMounterSetUp(t *testing.T) { | ||||
| 	if err := csiMounter.SetUp(nil); err != nil { | ||||
| 		t.Fatalf("mounter.Setup failed: %v", err) | ||||
| 	} | ||||
| 	path := csiMounter.GetPath() | ||||
| 	if _, err := os.Stat(path); err != nil { | ||||
| 		if os.IsNotExist(err) { | ||||
| 			t.Errorf("SetUp() failed, volume path not created: %s", path) | ||||
| 		} else { | ||||
| 			t.Errorf("SetUp() failed: %v", err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// ensure call went all the way | ||||
| 	pubs := csiMounter.csiClient.(*csiDriverClient).nodeClient.(*fake.NodeClient).GetNodePublishedVolumes() | ||||
| @@ -149,6 +176,19 @@ func TestUnmounterTeardown(t *testing.T) { | ||||
|  | ||||
| 	dir := csiUnmounter.GetPath() | ||||
|  | ||||
| 	// save the data file prior to unmount | ||||
| 	if err := os.MkdirAll(dir, 0755); err != nil && !os.IsNotExist(err) { | ||||
| 		t.Errorf("failed to create dir [%s]: %v", dir, err) | ||||
| 	} | ||||
| 	if err := saveVolumeData( | ||||
| 		plug, | ||||
| 		testPodUID, | ||||
| 		"test-pv", | ||||
| 		map[string]string{volDataKey.specVolID: "test-pv", volDataKey.driverName: "driver", volDataKey.volHandle: "vol-handle"}, | ||||
| 	); err != nil { | ||||
| 		t.Fatal("failed to save volume data:", err) | ||||
| 	} | ||||
|  | ||||
| 	err = csiUnmounter.TearDownAt(dir) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| @@ -208,3 +248,51 @@ func TestGetVolAttribsFromSpec(t *testing.T) { | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestSaveVolumeData(t *testing.T) { | ||||
| 	plug, tmpDir := newTestPlugin(t) | ||||
| 	defer os.RemoveAll(tmpDir) | ||||
| 	testCases := []struct { | ||||
| 		name       string | ||||
| 		data       map[string]string | ||||
| 		shouldFail bool | ||||
| 	}{ | ||||
| 		{name: "test with data ok", data: map[string]string{"key0": "val0", "_key1": "val1", "key2": "val2"}}, | ||||
| 		{name: "test with data ok 2 ", data: map[string]string{"_key0_": "val0", "&key1": "val1", "key2": "val2"}}, | ||||
| 	} | ||||
|  | ||||
| 	for i, tc := range testCases { | ||||
| 		t.Log("test case:", tc.name) | ||||
| 		specVolID := fmt.Sprintf("spec-volid-%d", i) | ||||
| 		mountDir := path.Join(getTargetPath(testPodUID, specVolID, plug.host), "/mount") | ||||
| 		if err := os.MkdirAll(mountDir, 0755); err != nil && !os.IsNotExist(err) { | ||||
| 			t.Errorf("failed to create dir [%s]: %v", mountDir, err) | ||||
| 		} | ||||
|  | ||||
| 		err := saveVolumeData(plug, testPodUID, specVolID, tc.data) | ||||
|  | ||||
| 		if !tc.shouldFail && err != nil { | ||||
| 			t.Error("unexpected failure: ", err) | ||||
| 		} | ||||
| 		// did file get created | ||||
| 		dataDir := getTargetPath(testPodUID, specVolID, plug.host) | ||||
| 		file := path.Join(dataDir, volDataFileName) | ||||
| 		if _, err := os.Stat(file); err != nil { | ||||
| 			t.Error("failed to create data dir:", err) | ||||
| 		} | ||||
|  | ||||
| 		// validate content | ||||
| 		data, err := ioutil.ReadFile(file) | ||||
| 		if !tc.shouldFail && err != nil { | ||||
| 			t.Error("failed to read data file:", err) | ||||
| 		} | ||||
|  | ||||
| 		jsonData := new(bytes.Buffer) | ||||
| 		if err := json.NewEncoder(jsonData).Encode(tc.data); err != nil { | ||||
| 			t.Error("failed to encode json:", err) | ||||
| 		} | ||||
| 		if string(data) != jsonData.String() { | ||||
| 			t.Errorf("expecting encoded data %v, got %v", string(data), jsonData) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -19,7 +19,6 @@ package csi | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"path" | ||||
| 	"regexp" | ||||
| 	"time" | ||||
|  | ||||
| @@ -29,7 +28,6 @@ import ( | ||||
| 	meta "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	"k8s.io/kubernetes/pkg/util/mount" | ||||
| 	kstrings "k8s.io/kubernetes/pkg/util/strings" | ||||
| 	"k8s.io/kubernetes/pkg/volume" | ||||
| ) | ||||
|  | ||||
| @@ -44,6 +42,7 @@ const ( | ||||
| 	csiAddrTemplate = "/var/lib/kubelet/plugins/%v/csi.sock" | ||||
| 	csiTimeout      = 15 * time.Second | ||||
| 	volNameSep      = "^" | ||||
| 	volDataFileName = "vol_data.json" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| @@ -134,14 +133,15 @@ func (p *csiPlugin) NewMounter( | ||||
| 	} | ||||
|  | ||||
| 	mounter := &csiMountMgr{ | ||||
| 		plugin:     p, | ||||
| 		k8s:        k8s, | ||||
| 		spec:       spec, | ||||
| 		pod:        pod, | ||||
| 		podUID:     pod.UID, | ||||
| 		driverName: pvSource.Driver, | ||||
| 		volumeID:   pvSource.VolumeHandle, | ||||
| 		csiClient:  client, | ||||
| 		plugin:       p, | ||||
| 		k8s:          k8s, | ||||
| 		spec:         spec, | ||||
| 		pod:          pod, | ||||
| 		podUID:       pod.UID, | ||||
| 		driverName:   pvSource.Driver, | ||||
| 		volumeID:     pvSource.VolumeHandle, | ||||
| 		specVolumeID: spec.Name(), | ||||
| 		csiClient:    client, | ||||
| 	} | ||||
| 	return mounter, nil | ||||
| } | ||||
| @@ -149,37 +149,33 @@ func (p *csiPlugin) NewMounter( | ||||
| func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmounter, error) { | ||||
| 	glog.V(4).Infof(log("setting up unmounter for [name=%v, podUID=%v]", specName, podUID)) | ||||
| 	unmounter := &csiMountMgr{ | ||||
| 		plugin: p, | ||||
| 		podUID: podUID, | ||||
| 		plugin:       p, | ||||
| 		podUID:       podUID, | ||||
| 		specVolumeID: specName, | ||||
| 	} | ||||
| 	return unmounter, nil | ||||
| } | ||||
|  | ||||
| func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { | ||||
| 	glog.V(4).Infof(log("constructing volume spec [pv.Name=%v, path=%v]", volumeName, mountPath)) | ||||
| 	glog.V(4).Info(log("plugin.ConstructVolumeSpec [pv.Name=%v, path=%v]", volumeName, mountPath)) | ||||
|  | ||||
| 	// extract driverName/volumeId from end of mountPath | ||||
| 	dir, volID := path.Split(mountPath) | ||||
| 	volID = kstrings.UnescapeQualifiedNameForDisk(volID) | ||||
| 	driverName := path.Base(dir) | ||||
|  | ||||
| 	// TODO (vladimirvivien) consider moving this check in API validation | ||||
| 	if !isDriverNameValid(driverName) { | ||||
| 		glog.Error(log("failed while reconstructing volume spec csi: driver name extracted from path is invalid: [path=%s; driverName=%s]", mountPath, driverName)) | ||||
| 		return nil, errors.New("invalid csi driver name from path") | ||||
| 	volData, err := loadVolumeData(mountPath, volDataFileName) | ||||
| 	if err != nil { | ||||
| 		glog.Error(log("plugin.ConstructVolumeSpec failed loading volume data using [%s]: %v", mountPath, err)) | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	glog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [volumeID=%s; driverName=%s]", volID, driverName)) | ||||
| 	glog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [%#v]", volData)) | ||||
|  | ||||
| 	pv := &api.PersistentVolume{ | ||||
| 		ObjectMeta: meta.ObjectMeta{ | ||||
| 			Name: volumeName, | ||||
| 			Name: volData[volDataKey.specVolID], | ||||
| 		}, | ||||
| 		Spec: api.PersistentVolumeSpec{ | ||||
| 			PersistentVolumeSource: api.PersistentVolumeSource{ | ||||
| 				CSI: &api.CSIPersistentVolumeSource{ | ||||
| 					Driver:       driverName, | ||||
| 					VolumeHandle: volID, | ||||
| 					Driver:       volData[volDataKey.driverName], | ||||
| 					VolumeHandle: volData[volDataKey.volHandle], | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
|   | ||||
| @@ -19,6 +19,7 @@ package csi | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"os" | ||||
| 	"path" | ||||
| 	"testing" | ||||
|  | ||||
| 	api "k8s.io/api/core/v1" | ||||
| @@ -27,7 +28,6 @@ import ( | ||||
| 	"k8s.io/apimachinery/pkg/types" | ||||
| 	fakeclient "k8s.io/client-go/kubernetes/fake" | ||||
| 	utiltesting "k8s.io/client-go/util/testing" | ||||
| 	kstrings "k8s.io/kubernetes/pkg/util/strings" | ||||
| 	"k8s.io/kubernetes/pkg/volume" | ||||
| 	volumetest "k8s.io/kubernetes/pkg/volume/testing" | ||||
| ) | ||||
| @@ -140,17 +140,31 @@ func TestPluginConstructVolumeSpec(t *testing.T) { | ||||
|  | ||||
| 	testCases := []struct { | ||||
| 		name       string | ||||
| 		driverName string | ||||
| 		volID      string | ||||
| 		specVolID  string | ||||
| 		data       map[string]string | ||||
| 		shouldFail bool | ||||
| 	}{ | ||||
| 		{"valid driver and vol", "test.csi-driver", "abc-cde", false}, | ||||
| 		{"valid driver + vol with slash", "test.csi-driver", "a/b/c/d", false}, | ||||
| 		{"invalid driver name", "_test.csi.driver>", "a/b/c/d", true}, | ||||
| 		{ | ||||
| 			name:      "valid spec name", | ||||
| 			specVolID: "test.vol.id", | ||||
| 			data:      map[string]string{volDataKey.specVolID: "test.vol.id", volDataKey.volHandle: "test-vol0", volDataKey.driverName: "test-driver0"}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range testCases { | ||||
| 		dir := getTargetPath(testPodUID, tc.driverName, tc.volID, plug.host) | ||||
| 		t.Logf("test case: %s", tc.name) | ||||
| 		dir := getTargetPath(testPodUID, tc.specVolID, plug.host) | ||||
|  | ||||
| 		// create the data file | ||||
| 		if tc.data != nil { | ||||
| 			mountDir := path.Join(getTargetPath(testPodUID, tc.specVolID, plug.host), "/mount") | ||||
| 			if err := os.MkdirAll(mountDir, 0755); err != nil && !os.IsNotExist(err) { | ||||
| 				t.Errorf("failed to create dir [%s]: %v", mountDir, err) | ||||
| 			} | ||||
| 			if err := saveVolumeData(plug, testPodUID, tc.specVolID, tc.data); err != nil { | ||||
| 				t.Fatal(err) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		// rebuild spec | ||||
| 		spec, err := plug.ConstructVolumeSpec("test-pv", dir) | ||||
| @@ -161,13 +175,12 @@ func TestPluginConstructVolumeSpec(t *testing.T) { | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		volID := spec.PersistentVolume.Spec.CSI.VolumeHandle | ||||
| 		unsanitizedVolID := kstrings.UnescapeQualifiedNameForDisk(tc.volID) | ||||
| 		if volID != unsanitizedVolID { | ||||
| 			t.Errorf("expected unsanitized volID %s, got volID %s", unsanitizedVolID, volID) | ||||
| 		volHandle := spec.PersistentVolume.Spec.CSI.VolumeHandle | ||||
| 		if volHandle != tc.data[volDataKey.volHandle] { | ||||
| 			t.Errorf("expected volID %s, got volID %s", tc.data[volDataKey.volHandle], volHandle) | ||||
| 		} | ||||
|  | ||||
| 		if spec.Name() != "test-pv" { | ||||
| 		if spec.Name() != tc.specVolID { | ||||
| 			t.Errorf("Unexpected spec name %s", spec.Name()) | ||||
| 		} | ||||
| 	} | ||||
|   | ||||
| @@ -109,6 +109,17 @@ func (f *NodeClient) NodePublishVolume(ctx grpctx.Context, req *csipb.NodePublis | ||||
| 	return &csipb.NodePublishVolumeResponse{}, nil | ||||
| } | ||||
|  | ||||
| // NodeProbe implements csi NodeProbe | ||||
| func (f *NodeClient) NodeProbe(ctx context.Context, req *csipb.NodeProbeRequest, opts ...grpc.CallOption) (*csipb.NodeProbeResponse, error) { | ||||
| 	if f.nextErr != nil { | ||||
| 		return nil, f.nextErr | ||||
| 	} | ||||
| 	if req.Version == nil { | ||||
| 		return nil, errors.New("missing version") | ||||
| 	} | ||||
| 	return &csipb.NodeProbeResponse{}, nil | ||||
| } | ||||
|  | ||||
| // NodeUnpublishVolume implements csi method | ||||
| func (f *NodeClient) NodeUnpublishVolume(ctx context.Context, req *csipb.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeUnpublishVolumeResponse, error) { | ||||
| 	if f.nextErr != nil { | ||||
| @@ -130,11 +141,6 @@ func (f *NodeClient) GetNodeID(ctx context.Context, in *csipb.GetNodeIDRequest, | ||||
| 	return nil, nil | ||||
| } | ||||
|  | ||||
| // NodeProbe implements csi method | ||||
| func (f *NodeClient) NodeProbe(ctx context.Context, in *csipb.NodeProbeRequest, opts ...grpc.CallOption) (*csipb.NodeProbeResponse, error) { | ||||
| 	return nil, nil | ||||
| } | ||||
|  | ||||
| // NodeGetCapabilities implements csi method | ||||
| func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.NodeGetCapabilitiesResponse, error) { | ||||
| 	return nil, nil | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Submit Queue
					Kubernetes Submit Queue