Merge pull request #62827 from linyouchong/data-reace-csi-20180418
Automatic merge from submit-queue (batch tested with PRs 62876, 62733, 62827). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. fix csi data race in csi_attacher_test.go **What this PR does / why we need it**: fix csi data race in csi_attacher_test.go#TestAttacherWaitForVolumeAttachment **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #62630 **Special notes for your reviewer**: run `stress -p 500 ./csi.test -v 5 -alsologtostderr` , There is another failure I think we should fix it in another PR. ``` --- FAIL: TestAttacherMountDevice (0.07s) csi_attacher_test.go:495: Running test case: normal csi_attacher_test.go:534: test should not fail, but error occurred: mkdir path2: file exists ``` **Release note**: ```release-note NONE ``` /sig storage
This commit is contained in:
		| @@ -165,17 +165,7 @@ func TestAttacherAttach(t *testing.T) { | |||||||
| } | } | ||||||
|  |  | ||||||
| func TestAttacherWaitForVolumeAttachment(t *testing.T) { | func TestAttacherWaitForVolumeAttachment(t *testing.T) { | ||||||
|  |  | ||||||
| 	plug, fakeWatcher, tmpDir := newTestWatchPlugin(t) |  | ||||||
| 	defer os.RemoveAll(tmpDir) |  | ||||||
|  |  | ||||||
| 	attacher, err := plug.NewAttacher() |  | ||||||
| 	if err != nil { |  | ||||||
| 		t.Fatalf("failed to create new attacher: %v", err) |  | ||||||
| 	} |  | ||||||
| 	csiAttacher := attacher.(*csiAttacher) |  | ||||||
| 	nodeName := "test-node" | 	nodeName := "test-node" | ||||||
|  |  | ||||||
| 	testCases := []struct { | 	testCases := []struct { | ||||||
| 		name                 string | 		name                 string | ||||||
| 		initAttached         bool | 		initAttached         bool | ||||||
| @@ -183,21 +173,18 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { | |||||||
| 		trigerWatchEventTime time.Duration | 		trigerWatchEventTime time.Duration | ||||||
| 		initAttachErr        *storage.VolumeError | 		initAttachErr        *storage.VolumeError | ||||||
| 		finalAttachErr       *storage.VolumeError | 		finalAttachErr       *storage.VolumeError | ||||||
| 		sleepTime            time.Duration |  | ||||||
| 		timeout              time.Duration | 		timeout              time.Duration | ||||||
| 		shouldFail           bool | 		shouldFail           bool | ||||||
| 	}{ | 	}{ | ||||||
| 		{ | 		{ | ||||||
| 			name:         "attach success at get", | 			name:         "attach success at get", | ||||||
| 			initAttached: true, | 			initAttached: true, | ||||||
| 			sleepTime:    10 * time.Millisecond, |  | ||||||
| 			timeout:      50 * time.Millisecond, | 			timeout:      50 * time.Millisecond, | ||||||
| 			shouldFail:   false, | 			shouldFail:   false, | ||||||
| 		}, | 		}, | ||||||
| 		{ | 		{ | ||||||
| 			name:          "attachment error ant get", | 			name:          "attachment error ant get", | ||||||
| 			initAttachErr: &storage.VolumeError{Message: "missing volume"}, | 			initAttachErr: &storage.VolumeError{Message: "missing volume"}, | ||||||
| 			sleepTime:     10 * time.Millisecond, |  | ||||||
| 			timeout:       30 * time.Millisecond, | 			timeout:       30 * time.Millisecond, | ||||||
| 			shouldFail:    true, | 			shouldFail:    true, | ||||||
| 		}, | 		}, | ||||||
| @@ -207,7 +194,6 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { | |||||||
| 			finalAttached:        true, | 			finalAttached:        true, | ||||||
| 			trigerWatchEventTime: 5 * time.Millisecond, | 			trigerWatchEventTime: 5 * time.Millisecond, | ||||||
| 			timeout:              50 * time.Millisecond, | 			timeout:              50 * time.Millisecond, | ||||||
| 			sleepTime:            5 * time.Millisecond, |  | ||||||
| 			shouldFail:           false, | 			shouldFail:           false, | ||||||
| 		}, | 		}, | ||||||
| 		{ | 		{ | ||||||
| @@ -216,7 +202,6 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { | |||||||
| 			finalAttached:        false, | 			finalAttached:        false, | ||||||
| 			finalAttachErr:       &storage.VolumeError{Message: "missing volume"}, | 			finalAttachErr:       &storage.VolumeError{Message: "missing volume"}, | ||||||
| 			trigerWatchEventTime: 5 * time.Millisecond, | 			trigerWatchEventTime: 5 * time.Millisecond, | ||||||
| 			sleepTime:            10 * time.Millisecond, |  | ||||||
| 			timeout:              30 * time.Millisecond, | 			timeout:              30 * time.Millisecond, | ||||||
| 			shouldFail:           true, | 			shouldFail:           true, | ||||||
| 		}, | 		}, | ||||||
| @@ -226,13 +211,19 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { | |||||||
| 			finalAttached:        true, | 			finalAttached:        true, | ||||||
| 			trigerWatchEventTime: 100 * time.Millisecond, | 			trigerWatchEventTime: 100 * time.Millisecond, | ||||||
| 			timeout:              50 * time.Millisecond, | 			timeout:              50 * time.Millisecond, | ||||||
| 			sleepTime:            5 * time.Millisecond, |  | ||||||
| 			shouldFail:           true, | 			shouldFail:           true, | ||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	for i, tc := range testCases { | 	for i, tc := range testCases { | ||||||
| 		fakeWatcher.Reset() | 		plug, fakeWatcher, tmpDir := newTestWatchPlugin(t) | ||||||
|  | 		defer os.RemoveAll(tmpDir) | ||||||
|  |  | ||||||
|  | 		attacher, err := plug.NewAttacher() | ||||||
|  | 		if err != nil { | ||||||
|  | 			t.Fatalf("failed to create new attacher: %v", err) | ||||||
|  | 		} | ||||||
|  | 		csiAttacher := attacher.(*csiAttacher) | ||||||
| 		t.Logf("running test: %v", tc.name) | 		t.Logf("running test: %v", tc.name) | ||||||
| 		pvName := fmt.Sprintf("test-pv-%d", i) | 		pvName := fmt.Sprintf("test-pv-%d", i) | ||||||
| 		volID := fmt.Sprintf("test-vol-%d", i) | 		volID := fmt.Sprintf("test-vol-%d", i) | ||||||
| @@ -240,18 +231,21 @@ func TestAttacherWaitForVolumeAttachment(t *testing.T) { | |||||||
| 		attachment := makeTestAttachment(attachID, nodeName, pvName) | 		attachment := makeTestAttachment(attachID, nodeName, pvName) | ||||||
| 		attachment.Status.Attached = tc.initAttached | 		attachment.Status.Attached = tc.initAttached | ||||||
| 		attachment.Status.AttachError = tc.initAttachErr | 		attachment.Status.AttachError = tc.initAttachErr | ||||||
| 		csiAttacher.waitSleepTime = tc.sleepTime | 		_, err = csiAttacher.k8s.StorageV1beta1().VolumeAttachments().Create(attachment) | ||||||
| 		_, err := csiAttacher.k8s.StorageV1beta1().VolumeAttachments().Create(attachment) |  | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			t.Fatalf("failed to attach: %v", err) | 			t.Fatalf("failed to attach: %v", err) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | 		trigerWatchEventTime := tc.trigerWatchEventTime | ||||||
|  | 		finalAttached := tc.finalAttached | ||||||
|  | 		finalAttachErr := tc.finalAttachErr | ||||||
| 		// after timeout, fakeWatcher will be closed by csiAttacher.waitForVolumeAttachment | 		// after timeout, fakeWatcher will be closed by csiAttacher.waitForVolumeAttachment | ||||||
| 		if tc.trigerWatchEventTime > 0 && tc.trigerWatchEventTime < tc.timeout { | 		if tc.trigerWatchEventTime > 0 && tc.trigerWatchEventTime < tc.timeout { | ||||||
| 			go func() { | 			go func() { | ||||||
| 				time.Sleep(tc.trigerWatchEventTime) | 				time.Sleep(trigerWatchEventTime) | ||||||
| 				attachment.Status.Attached = tc.finalAttached | 				attachment := makeTestAttachment(attachID, nodeName, pvName) | ||||||
| 				attachment.Status.AttachError = tc.finalAttachErr | 				attachment.Status.Attached = finalAttached | ||||||
|  | 				attachment.Status.AttachError = finalAttachErr | ||||||
| 				fakeWatcher.Modify(attachment) | 				fakeWatcher.Modify(attachment) | ||||||
| 			}() | 			}() | ||||||
| 		} | 		} | ||||||
| @@ -684,14 +678,14 @@ func TestAttacherUnmountDevice(t *testing.T) { | |||||||
| } | } | ||||||
|  |  | ||||||
| // create a plugin mgr to load plugins and setup a fake client | // create a plugin mgr to load plugins and setup a fake client | ||||||
| func newTestWatchPlugin(t *testing.T) (*csiPlugin, *watch.FakeWatcher, string) { | func newTestWatchPlugin(t *testing.T) (*csiPlugin, *watch.RaceFreeFakeWatcher, string) { | ||||||
| 	tmpDir, err := utiltesting.MkTmpdir("csi-test") | 	tmpDir, err := utiltesting.MkTmpdir("csi-test") | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("can't create temp dir: %v", err) | 		t.Fatalf("can't create temp dir: %v", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	fakeClient := fakeclient.NewSimpleClientset() | 	fakeClient := fakeclient.NewSimpleClientset() | ||||||
| 	fakeWatcher := watch.NewFake() | 	fakeWatcher := watch.NewRaceFreeFake() | ||||||
| 	fakeClient.Fake.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatcher, nil)) | 	fakeClient.Fake.PrependWatchReactor("*", core.DefaultWatchReactor(fakeWatcher, nil)) | ||||||
| 	fakeClient.Fake.WatchReactionChain = fakeClient.Fake.WatchReactionChain[:1] | 	fakeClient.Fake.WatchReactionChain = fakeClient.Fake.WatchReactionChain[:1] | ||||||
| 	host := volumetest.NewFakeVolumeHost( | 	host := volumetest.NewFakeVolumeHost( | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Submit Queue
					Kubernetes Submit Queue