Fix AtomicWriter may not create user visible files after kubelet was restarted
This commit is contained in:
		@@ -101,9 +101,9 @@ const (
 | 
			
		||||
//     portion of the payload was deleted and is still present on disk.
 | 
			
		||||
//
 | 
			
		||||
//  4. The data in the current timestamped directory is compared to the projected
 | 
			
		||||
//     data to determine if an update is required.
 | 
			
		||||
//     data to determine if an update to data directory is required.
 | 
			
		||||
//
 | 
			
		||||
//  5. A new timestamped dir is created.
 | 
			
		||||
//  5. A new timestamped dir is created if an update is required.
 | 
			
		||||
//
 | 
			
		||||
//  6. The payload is written to the new timestamped directory.
 | 
			
		||||
//
 | 
			
		||||
@@ -159,6 +159,7 @@ func (w *AtomicWriter) Write(payload map[string]FileProjection, setPerms func(su
 | 
			
		||||
	oldTsPath := filepath.Join(w.targetDir, oldTsDir)
 | 
			
		||||
 | 
			
		||||
	var pathsToRemove sets.String
 | 
			
		||||
	shouldWrite := true
 | 
			
		||||
	// if there was no old version, there's nothing to remove
 | 
			
		||||
	if len(oldTsDir) != 0 {
 | 
			
		||||
		// (3)
 | 
			
		||||
@@ -173,57 +174,74 @@ func (w *AtomicWriter) Write(payload map[string]FileProjection, setPerms func(su
 | 
			
		||||
			klog.Errorf("%s: error determining whether payload should be written to disk: %v", w.logContext, err)
 | 
			
		||||
			return err
 | 
			
		||||
		} else if !should && len(pathsToRemove) == 0 {
 | 
			
		||||
			klog.V(4).Infof("%s: no update required for target directory %v", w.logContext, w.targetDir)
 | 
			
		||||
			return nil
 | 
			
		||||
			klog.V(4).Infof("%s: write not required for data directory %v", w.logContext, oldTsDir)
 | 
			
		||||
			// data directory is already up to date, but we need to make sure that
 | 
			
		||||
			// the user-visible symlinks are created.
 | 
			
		||||
			// See https://github.com/kubernetes/kubernetes/issues/121472 for more details.
 | 
			
		||||
			// Reset oldTsDir to empty string to avoid removing the data directory.
 | 
			
		||||
			shouldWrite = false
 | 
			
		||||
			oldTsDir = ""
 | 
			
		||||
		} else {
 | 
			
		||||
			klog.V(4).Infof("%s: write required for target directory %v", w.logContext, w.targetDir)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// (5)
 | 
			
		||||
	tsDir, err := w.newTimestampDir()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		klog.V(4).Infof("%s: error creating new ts data directory: %v", w.logContext, err)
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	tsDirName := filepath.Base(tsDir)
 | 
			
		||||
 | 
			
		||||
	// (6)
 | 
			
		||||
	if err = w.writePayloadToDir(cleanPayload, tsDir); err != nil {
 | 
			
		||||
		klog.Errorf("%s: error writing payload to ts data directory %s: %v", w.logContext, tsDir, err)
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	klog.V(4).Infof("%s: performed write of new data to ts data directory: %s", w.logContext, tsDir)
 | 
			
		||||
 | 
			
		||||
	// (7)
 | 
			
		||||
	if setPerms != nil {
 | 
			
		||||
		if err := setPerms(tsDirName); err != nil {
 | 
			
		||||
			klog.Errorf("%s: error applying ownership settings: %v", w.logContext, err)
 | 
			
		||||
	if shouldWrite {
 | 
			
		||||
		// (5)
 | 
			
		||||
		tsDir, err := w.newTimestampDir()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			klog.V(4).Infof("%s: error creating new ts data directory: %v", w.logContext, err)
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
		tsDirName := filepath.Base(tsDir)
 | 
			
		||||
 | 
			
		||||
	// (8)
 | 
			
		||||
	newDataDirPath := filepath.Join(w.targetDir, newDataDirName)
 | 
			
		||||
	if err = os.Symlink(tsDirName, newDataDirPath); err != nil {
 | 
			
		||||
		os.RemoveAll(tsDir)
 | 
			
		||||
		klog.Errorf("%s: error creating symbolic link for atomic update: %v", w.logContext, err)
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
		// (6)
 | 
			
		||||
		if err = w.writePayloadToDir(cleanPayload, tsDir); err != nil {
 | 
			
		||||
			klog.Errorf("%s: error writing payload to ts data directory %s: %v", w.logContext, tsDir, err)
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		klog.V(4).Infof("%s: performed write of new data to ts data directory: %s", w.logContext, tsDir)
 | 
			
		||||
 | 
			
		||||
	// (9)
 | 
			
		||||
	if runtime.GOOS == "windows" {
 | 
			
		||||
		os.Remove(dataDirPath)
 | 
			
		||||
		err = os.Symlink(tsDirName, dataDirPath)
 | 
			
		||||
		os.Remove(newDataDirPath)
 | 
			
		||||
	} else {
 | 
			
		||||
		err = os.Rename(newDataDirPath, dataDirPath)
 | 
			
		||||
	}
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		os.Remove(newDataDirPath)
 | 
			
		||||
		os.RemoveAll(tsDir)
 | 
			
		||||
		klog.Errorf("%s: error renaming symbolic link for data directory %s: %v", w.logContext, newDataDirPath, err)
 | 
			
		||||
		return err
 | 
			
		||||
		// (7)
 | 
			
		||||
		if setPerms != nil {
 | 
			
		||||
			if err := setPerms(tsDirName); err != nil {
 | 
			
		||||
				klog.Errorf("%s: error applying ownership settings: %v", w.logContext, err)
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// (8)
 | 
			
		||||
		newDataDirPath := filepath.Join(w.targetDir, newDataDirName)
 | 
			
		||||
		if err = os.Symlink(tsDirName, newDataDirPath); err != nil {
 | 
			
		||||
			if err := os.RemoveAll(tsDir); err != nil {
 | 
			
		||||
				klog.Errorf("%s: error removing new ts directory %s: %v", w.logContext, tsDir, err)
 | 
			
		||||
			}
 | 
			
		||||
			klog.Errorf("%s: error creating symbolic link for atomic update: %v", w.logContext, err)
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// (9)
 | 
			
		||||
		if runtime.GOOS == "windows" {
 | 
			
		||||
			if err := os.Remove(dataDirPath); err != nil {
 | 
			
		||||
				klog.Errorf("%s: error removing data dir directory %s: %v", w.logContext, dataDirPath, err)
 | 
			
		||||
			}
 | 
			
		||||
			err = os.Symlink(tsDirName, dataDirPath)
 | 
			
		||||
			if err := os.Remove(newDataDirPath); err != nil {
 | 
			
		||||
				klog.Errorf("%s: error removing new data dir directory %s: %v", w.logContext, newDataDirPath, err)
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			err = os.Rename(newDataDirPath, dataDirPath)
 | 
			
		||||
		}
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			if err := os.Remove(newDataDirPath); err != nil && err != os.ErrNotExist {
 | 
			
		||||
				klog.Errorf("%s: error removing new data dir directory %s: %v", w.logContext, newDataDirPath, err)
 | 
			
		||||
			}
 | 
			
		||||
			if err := os.RemoveAll(tsDir); err != nil {
 | 
			
		||||
				klog.Errorf("%s: error removing new ts directory %s: %v", w.logContext, tsDir, err)
 | 
			
		||||
			}
 | 
			
		||||
			klog.Errorf("%s: error renaming symbolic link for data directory %s: %v", w.logContext, newDataDirPath, err)
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// (10)
 | 
			
		||||
 
 | 
			
		||||
@@ -1035,3 +1035,61 @@ func TestSetPerms(t *testing.T) {
 | 
			
		||||
		t.Fatalf("unexpected error while writing: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWriteAgainAfterUnexpectedExit(t *testing.T) {
 | 
			
		||||
	testCases := []struct {
 | 
			
		||||
		name       string
 | 
			
		||||
		payload    map[string]FileProjection
 | 
			
		||||
		simulateFn func(targetDir string, payload map[string]FileProjection) error
 | 
			
		||||
	}{
 | 
			
		||||
		{
 | 
			
		||||
			name: "process killed before creating user visible files",
 | 
			
		||||
			payload: map[string]FileProjection{
 | 
			
		||||
				"foo": {Mode: 0644, Data: []byte("foo")},
 | 
			
		||||
				"bar": {Mode: 0644, Data: []byte("bar")},
 | 
			
		||||
			},
 | 
			
		||||
			simulateFn: func(targetDir string, payload map[string]FileProjection) error {
 | 
			
		||||
				for filename := range payload {
 | 
			
		||||
					path := filepath.Join(targetDir, filename)
 | 
			
		||||
					if err := os.RemoveAll(path); err != nil {
 | 
			
		||||
						return err
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				return nil
 | 
			
		||||
			},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, tc := range testCases {
 | 
			
		||||
		tc := tc
 | 
			
		||||
		t.Run(tc.name, func(t *testing.T) {
 | 
			
		||||
			targetDir, err := utiltesting.MkTmpdir("atomic-write")
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatalf("unexpected error creating tmp dir: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
			defer func() {
 | 
			
		||||
				err := os.RemoveAll(targetDir)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					t.Errorf("%v: unexpected error removing tmp dir: %v", tc.name, err)
 | 
			
		||||
				}
 | 
			
		||||
			}()
 | 
			
		||||
 | 
			
		||||
			writer := &AtomicWriter{targetDir: targetDir, logContext: "-test-"}
 | 
			
		||||
			err = writer.Write(tc.payload, nil)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatalf("unexpected error writing payload: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			err = tc.simulateFn(targetDir, tc.payload)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatalf("failed to simulate the unexpected exit: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			err = writer.Write(tc.payload, nil)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatalf("unexpected error writing payload again: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
			checkVolumeContents(targetDir, tc.name, tc.payload, t)
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user