Merge pull request #688 from Sarsate/volume-cleanup-loop
Volume reconciliation loop
This commit is contained in:
		| @@ -21,6 +21,7 @@ import ( | |||||||
| 	"errors" | 	"errors" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"net/http" | 	"net/http" | ||||||
|  | 	"path" | ||||||
| 	"strconv" | 	"strconv" | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"sync" | 	"sync" | ||||||
| @@ -257,7 +258,7 @@ func milliCPUToShares(milliCPU int) int { | |||||||
| func (kl *Kubelet) mountExternalVolumes(manifest *api.ContainerManifest) (volumeMap, error) { | func (kl *Kubelet) mountExternalVolumes(manifest *api.ContainerManifest) (volumeMap, error) { | ||||||
| 	podVolumes := make(volumeMap) | 	podVolumes := make(volumeMap) | ||||||
| 	for _, vol := range manifest.Volumes { | 	for _, vol := range manifest.Volumes { | ||||||
| 		extVolume, err := volume.CreateVolume(&vol, manifest.ID, kl.rootDirectory) | 		extVolume, err := volume.CreateVolumeBuilder(&vol, manifest.ID, kl.rootDirectory) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
| @@ -449,6 +450,39 @@ type podContainer struct { | |||||||
| 	containerName string | 	containerName string | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Stores all volumes defined by the set of pods into a map. | ||||||
|  | // Keys for each entry are in the format (POD_ID)/(VOLUME_NAME) | ||||||
|  | func getDesiredVolumes(pods []Pod) map[string]api.Volume { | ||||||
|  | 	desiredVolumes := make(map[string]api.Volume) | ||||||
|  | 	for _, pod := range pods { | ||||||
|  | 		for _, volume := range pod.Manifest.Volumes { | ||||||
|  | 			identifier := path.Join(pod.Manifest.ID, volume.Name) | ||||||
|  | 			desiredVolumes[identifier] = volume | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return desiredVolumes | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Compares the map of current volumes to the map of desired volumes. | ||||||
|  | // If an active volume does not have a respective desired volume, clean it up. | ||||||
|  | func (kl *Kubelet) reconcileVolumes(pods []Pod) error { | ||||||
|  | 	desiredVolumes := getDesiredVolumes(pods) | ||||||
|  | 	currentVolumes := volume.GetCurrentVolumes(kl.rootDirectory) | ||||||
|  | 	for name, vol := range currentVolumes { | ||||||
|  | 		if _, ok := desiredVolumes[name]; !ok { | ||||||
|  | 			//TODO (jonesdl) We should somehow differentiate between volumes that are supposed | ||||||
|  | 			//to be deleted and volumes that are leftover after a crash. | ||||||
|  | 			glog.Infof("Orphaned volume %s found, tearing down volume", name) | ||||||
|  | 			//TODO (jonesdl) This should not block other kubelet synchronization procedures | ||||||
|  | 			err := vol.TearDown() | ||||||
|  | 			if err != nil { | ||||||
|  | 				glog.Infof("Could not tear down volume %s (%s)", name, err) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
| // SyncPods synchronizes the configured list of pods (desired state) with the host current state. | // SyncPods synchronizes the configured list of pods (desired state) with the host current state. | ||||||
| func (kl *Kubelet) SyncPods(pods []Pod) error { | func (kl *Kubelet) SyncPods(pods []Pod) error { | ||||||
| 	glog.Infof("Desired [%s]: %+v", kl.hostname, pods) | 	glog.Infof("Desired [%s]: %+v", kl.hostname, pods) | ||||||
| @@ -497,6 +531,10 @@ func (kl *Kubelet) SyncPods(pods []Pod) error { | |||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// Remove any orphaned volumes. | ||||||
|  | 	kl.reconcileVolumes(pods) | ||||||
|  |  | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -18,23 +18,33 @@ package volume | |||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"errors" | 	"errors" | ||||||
|  | 	"io/ioutil" | ||||||
| 	"os" | 	"os" | ||||||
| 	"path" | 	"path" | ||||||
|  |  | ||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||||
|  | 	"github.com/golang/glog" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var ErrUnsupportedVolumeType = errors.New("unsupported volume type") | var ErrUnsupportedVolumeType = errors.New("unsupported volume type") | ||||||
|  |  | ||||||
| // Interface is a directory used by pods or hosts. Interface implementations | // Interface is a directory used by pods or hosts. | ||||||
| // must be idempotent. | // All method implementations of methods in the volume interface must be idempotent | ||||||
| type Interface interface { | type Interface interface { | ||||||
| 	// SetUp prepares and mounts/unpacks the volume to a directory path. |  | ||||||
| 	SetUp() error |  | ||||||
|  |  | ||||||
| 	// GetPath returns the directory path the volume is mounted to. | 	// GetPath returns the directory path the volume is mounted to. | ||||||
| 	GetPath() string | 	GetPath() string | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // The Builder interface provides the method to set up/mount the volume. | ||||||
|  | type Builder interface { | ||||||
|  | 	// Uses Interface to provide the path for Docker binds. | ||||||
|  | 	Interface | ||||||
|  | 	// SetUp prepares and mounts/unpacks the volume to a directory path. | ||||||
|  | 	SetUp() error | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // The Cleaner interface provides the method to cleanup/unmount the volumes. | ||||||
|  | type Cleaner interface { | ||||||
| 	// TearDown unmounts the volume and removes traces of the SetUp procedure. | 	// TearDown unmounts the volume and removes traces of the SetUp procedure. | ||||||
| 	TearDown() error | 	TearDown() error | ||||||
| } | } | ||||||
| @@ -51,10 +61,6 @@ func (hostVol *HostDirectory) SetUp() error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (hostVol *HostDirectory) TearDown() error { |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (hostVol *HostDirectory) GetPath() string { | func (hostVol *HostDirectory) GetPath() string { | ||||||
| 	return hostVol.Path | 	return hostVol.Path | ||||||
| } | } | ||||||
| @@ -77,16 +83,36 @@ func (emptyDir *EmptyDirectory) SetUp() error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // TODO(jonesdl) when we can properly invoke TearDown(), we should delete |  | ||||||
| // the directory created by SetUp. |  | ||||||
| func (emptyDir *EmptyDirectory) TearDown() error { |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (emptyDir *EmptyDirectory) GetPath() string { | func (emptyDir *EmptyDirectory) GetPath() string { | ||||||
| 	return path.Join(emptyDir.RootDir, emptyDir.PodID, "volumes", "empty", emptyDir.Name) | 	return path.Join(emptyDir.RootDir, emptyDir.PodID, "volumes", "empty", emptyDir.Name) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (emptyDir *EmptyDirectory) renameDirectory() (string, error) { | ||||||
|  | 	oldPath := emptyDir.GetPath() | ||||||
|  | 	newPath, err := ioutil.TempDir(path.Dir(oldPath), emptyDir.Name+".deleting~") | ||||||
|  | 	if err != nil { | ||||||
|  | 		return "", err | ||||||
|  | 	} | ||||||
|  | 	err = os.Rename(oldPath, newPath) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return "", err | ||||||
|  | 	} | ||||||
|  | 	return newPath, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Simply delete everything in the directory. | ||||||
|  | func (emptyDir *EmptyDirectory) TearDown() error { | ||||||
|  | 	tmpDir, err := emptyDir.renameDirectory() | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	err = os.RemoveAll(tmpDir) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
| // Interprets API volume as a HostDirectory | // Interprets API volume as a HostDirectory | ||||||
| func createHostDirectory(volume *api.Volume) *HostDirectory { | func createHostDirectory(volume *api.Volume) *HostDirectory { | ||||||
| 	return &HostDirectory{volume.Source.HostDirectory.Path} | 	return &HostDirectory{volume.Source.HostDirectory.Path} | ||||||
| @@ -97,16 +123,16 @@ func createEmptyDirectory(volume *api.Volume, podID string, rootDir string) *Emp | |||||||
| 	return &EmptyDirectory{volume.Name, podID, rootDir} | 	return &EmptyDirectory{volume.Name, podID, rootDir} | ||||||
| } | } | ||||||
|  |  | ||||||
| // CreateVolume returns an Interface capable of mounting a volume described by an | // CreateVolumeBuilder returns a Builder capable of mounting a volume described by an | ||||||
| // *api.Volume and whether or not it is mounted, or an error. | // *api.Volume, or an error. | ||||||
| func CreateVolume(volume *api.Volume, podID string, rootDir string) (Interface, error) { | func CreateVolumeBuilder(volume *api.Volume, podID string, rootDir string) (Builder, error) { | ||||||
| 	source := volume.Source | 	source := volume.Source | ||||||
| 	// TODO(jonesdl) We will want to throw an error here when we no longer | 	// TODO(jonesdl) We will want to throw an error here when we no longer | ||||||
| 	// support the default behavior. | 	// support the default behavior. | ||||||
| 	if source == nil { | 	if source == nil { | ||||||
| 		return nil, nil | 		return nil, nil | ||||||
| 	} | 	} | ||||||
| 	var vol Interface | 	var vol Builder | ||||||
| 	// TODO(jonesdl) We should probably not check every pointer and directly | 	// TODO(jonesdl) We should probably not check every pointer and directly | ||||||
| 	// resolve these types instead. | 	// resolve these types instead. | ||||||
| 	if source.HostDirectory != nil { | 	if source.HostDirectory != nil { | ||||||
| @@ -118,3 +144,54 @@ func CreateVolume(volume *api.Volume, podID string, rootDir string) (Interface, | |||||||
| 	} | 	} | ||||||
| 	return vol, nil | 	return vol, nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // CreateVolumeCleaner returns a Cleaner capable of tearing down a volume. | ||||||
|  | func CreateVolumeCleaner(kind string, name string, podID string, rootDir string) (Cleaner, error) { | ||||||
|  | 	switch kind { | ||||||
|  | 	case "empty": | ||||||
|  | 		return &EmptyDirectory{name, podID, rootDir}, nil | ||||||
|  | 	default: | ||||||
|  | 		return nil, ErrUnsupportedVolumeType | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Examines directory structure to determine volumes that are presently | ||||||
|  | // active and mounted. Returns a map of Cleaner types. | ||||||
|  | func GetCurrentVolumes(rootDirectory string) map[string]Cleaner { | ||||||
|  | 	currentVolumes := make(map[string]Cleaner) | ||||||
|  | 	mountPath := rootDirectory | ||||||
|  | 	podIDDirs, err := ioutil.ReadDir(mountPath) | ||||||
|  | 	if err != nil { | ||||||
|  | 		glog.Errorf("Could not read directory: %s, (%s)", mountPath, err) | ||||||
|  | 	} | ||||||
|  | 	// Volume information is extracted from the directory structure: | ||||||
|  | 	// (ROOT_DIR)/(POD_ID)/volumes/(VOLUME_KIND)/(VOLUME_NAME) | ||||||
|  | 	for _, podIDDir := range podIDDirs { | ||||||
|  | 		podID := podIDDir.Name() | ||||||
|  | 		podIDPath := path.Join(mountPath, podID, "volumes") | ||||||
|  | 		volumeKindDirs, err := ioutil.ReadDir(podIDPath) | ||||||
|  | 		if err != nil { | ||||||
|  | 			glog.Errorf("Could not read directory: %s, (%s)", podIDPath, err) | ||||||
|  | 		} | ||||||
|  | 		for _, volumeKindDir := range volumeKindDirs { | ||||||
|  | 			volumeKind := volumeKindDir.Name() | ||||||
|  | 			volumeKindPath := path.Join(podIDPath, volumeKind) | ||||||
|  | 			volumeNameDirs, err := ioutil.ReadDir(volumeKindPath) | ||||||
|  | 			if err != nil { | ||||||
|  | 				glog.Errorf("Could not read directory: %s, (%s)", volumeKindPath, err) | ||||||
|  | 			} | ||||||
|  | 			for _, volumeNameDir := range volumeNameDirs { | ||||||
|  | 				volumeName := volumeNameDir.Name() | ||||||
|  | 				identifier := path.Join(podID, volumeName) | ||||||
|  | 				// TODO(thockin) This should instead return a reference to an extant volume object | ||||||
|  | 				cleaner, err := CreateVolumeCleaner(volumeKind, volumeName, podID, rootDirectory) | ||||||
|  | 				if err != nil { | ||||||
|  | 					glog.Errorf("Could not create volume cleaner: %s, (%s)", volumeNameDirs, err) | ||||||
|  | 					continue | ||||||
|  | 				} | ||||||
|  | 				currentVolumes[identifier] = cleaner | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return currentVolumes | ||||||
|  | } | ||||||
|   | |||||||
| @@ -25,7 +25,7 @@ import ( | |||||||
| 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | 	"github.com/GoogleCloudPlatform/kubernetes/pkg/api" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func TestCreateVolumes(t *testing.T) { | func TestCreateVolumeBuilders(t *testing.T) { | ||||||
| 	tempDir, err := ioutil.TempDir("", "CreateVolumes") | 	tempDir, err := ioutil.TempDir("", "CreateVolumes") | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Errorf("Unexpected error: %v", err) | 		t.Errorf("Unexpected error: %v", err) | ||||||
| @@ -35,6 +35,7 @@ func TestCreateVolumes(t *testing.T) { | |||||||
| 		volume api.Volume | 		volume api.Volume | ||||||
| 		path   string | 		path   string | ||||||
| 		podID  string | 		podID  string | ||||||
|  | 		kind   string | ||||||
| 	}{ | 	}{ | ||||||
| 		{ | 		{ | ||||||
| 			api.Volume{ | 			api.Volume{ | ||||||
| @@ -45,6 +46,7 @@ func TestCreateVolumes(t *testing.T) { | |||||||
| 			}, | 			}, | ||||||
| 			"/dir/path", | 			"/dir/path", | ||||||
| 			"my-id", | 			"my-id", | ||||||
|  | 			"", | ||||||
| 		}, | 		}, | ||||||
| 		{ | 		{ | ||||||
| 			api.Volume{ | 			api.Volume{ | ||||||
| @@ -55,8 +57,9 @@ func TestCreateVolumes(t *testing.T) { | |||||||
| 			}, | 			}, | ||||||
| 			path.Join(tempDir, "/my-id/volumes/empty/empty-dir"), | 			path.Join(tempDir, "/my-id/volumes/empty/empty-dir"), | ||||||
| 			"my-id", | 			"my-id", | ||||||
|  | 			"empty", | ||||||
| 		}, | 		}, | ||||||
| 		{api.Volume{}, "", ""}, | 		{api.Volume{}, "", "", ""}, | ||||||
| 		{ | 		{ | ||||||
| 			api.Volume{ | 			api.Volume{ | ||||||
| 				Name:   "empty-dir", | 				Name:   "empty-dir", | ||||||
| @@ -64,13 +67,14 @@ func TestCreateVolumes(t *testing.T) { | |||||||
| 			}, | 			}, | ||||||
| 			"", | 			"", | ||||||
| 			"", | 			"", | ||||||
|  | 			"", | ||||||
| 		}, | 		}, | ||||||
| 	} | 	} | ||||||
| 	for _, createVolumesTest := range createVolumesTests { | 	for _, createVolumesTest := range createVolumesTests { | ||||||
| 		tt := createVolumesTest | 		tt := createVolumesTest | ||||||
| 		v, err := CreateVolume(&tt.volume, tt.podID, tempDir) | 		vb, err := CreateVolumeBuilder(&tt.volume, tt.podID, tempDir) | ||||||
| 		if tt.volume.Source == nil { | 		if tt.volume.Source == nil { | ||||||
| 			if v != nil { | 			if vb != nil { | ||||||
| 				t.Errorf("Expected volume to be nil") | 				t.Errorf("Expected volume to be nil") | ||||||
| 			} | 			} | ||||||
| 			continue | 			continue | ||||||
| @@ -84,17 +88,56 @@ func TestCreateVolumes(t *testing.T) { | |||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			t.Errorf("Unexpected error: %v", err) | 			t.Errorf("Unexpected error: %v", err) | ||||||
| 		} | 		} | ||||||
| 		err = v.SetUp() | 		err = vb.SetUp() | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			t.Errorf("Unexpected error: %v", err) | 			t.Errorf("Unexpected error: %v", err) | ||||||
| 		} | 		} | ||||||
| 		path := v.GetPath() | 		path := vb.GetPath() | ||||||
| 		if path != tt.path { | 		if path != tt.path { | ||||||
| 			t.Errorf("Unexpected bind path. Expected %v, got %v", tt.path, path) | 			t.Errorf("Unexpected bind path. Expected %v, got %v", tt.path, path) | ||||||
| 		} | 		} | ||||||
| 		err = v.TearDown() | 		vc, err := CreateVolumeCleaner(tt.kind, tt.volume.Name, tt.podID, tempDir) | ||||||
|  | 		if tt.kind == "" { | ||||||
|  | 			if err != ErrUnsupportedVolumeType { | ||||||
|  | 				t.Errorf("Unexpected error: %v", err) | ||||||
|  | 			} | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		err = vc.TearDown() | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			t.Errorf("Unexpected error: %v", err) | 			t.Errorf("Unexpected error: %v", err) | ||||||
| 		} | 		} | ||||||
|  | 		if _, err := os.Stat(path); !os.IsNotExist(err) { | ||||||
|  | 			t.Errorf("TearDown() failed, original volume path not properly removed: %v", path) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestGetActiveVolumes(t *testing.T) { | ||||||
|  | 	tempDir, err := ioutil.TempDir("", "CreateVolumes") | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Errorf("Unexpected error: %v", err) | ||||||
|  | 	} | ||||||
|  | 	defer os.RemoveAll(tempDir) | ||||||
|  | 	getActiveVolumesTests := []struct { | ||||||
|  | 		name       string | ||||||
|  | 		podID      string | ||||||
|  | 		kind       string | ||||||
|  | 		identifier string | ||||||
|  | 	}{ | ||||||
|  | 		{"fakeName", "fakeID", "empty", "fakeID/fakeName"}, | ||||||
|  | 		{"fakeName2", "fakeID2", "empty", "fakeID2/fakeName2"}, | ||||||
|  | 	} | ||||||
|  | 	expectedIdentifiers := []string{} | ||||||
|  | 	for _, test := range getActiveVolumesTests { | ||||||
|  | 		volumeDir := path.Join(tempDir, test.podID, "volumes", test.kind, test.name) | ||||||
|  | 		os.MkdirAll(volumeDir, 0750) | ||||||
|  | 		expectedIdentifiers = append(expectedIdentifiers, test.identifier) | ||||||
|  | 	} | ||||||
|  | 	volumeMap := GetCurrentVolumes(tempDir) | ||||||
|  | 	for _, name := range expectedIdentifiers { | ||||||
|  | 		if _, ok := volumeMap[name]; !ok { | ||||||
|  | 			t.Errorf("Expected volume map entry not found: %v", name) | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Tim Hockin
					Tim Hockin