Ubernetes Lite: Label volumes with zone information
When volumes are labeled, they will only be scheduled onto nodes in the same zone.
This commit is contained in:
		@@ -29,6 +29,7 @@ import (
 | 
				
			|||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api"
 | 
						"k8s.io/kubernetes/pkg/api"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/api/unversioned"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/cloudprovider"
 | 
						"k8s.io/kubernetes/pkg/cloudprovider"
 | 
				
			||||||
	utilerrors "k8s.io/kubernetes/pkg/util/errors"
 | 
						utilerrors "k8s.io/kubernetes/pkg/util/errors"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/util/sets"
 | 
						"k8s.io/kubernetes/pkg/util/sets"
 | 
				
			||||||
@@ -1644,6 +1645,33 @@ func (gce *GCECloud) DeleteDisk(diskToDelete string) error {
 | 
				
			|||||||
	return gce.waitForZoneOp(deleteOp, disk.Zone)
 | 
						return gce.waitForZoneOp(deleteOp, disk.Zone)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Builds the labels that should be automatically added to a PersistentVolume backed by a GCE PD
 | 
				
			||||||
 | 
					// Specifically, this builds FailureDomain (zone) and Region labels.
 | 
				
			||||||
 | 
					// The PersistentVolumeLabel admission controller calls this and adds the labels when a PV is created.
 | 
				
			||||||
 | 
					func (gce *GCECloud) GetAutoLabelsForPD(name string) (map[string]string, error) {
 | 
				
			||||||
 | 
						disk, err := gce.getDiskByNameUnknownZone(name)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						zone := disk.Zone
 | 
				
			||||||
 | 
						region, err := GetGCERegion(zone)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if zone == "" || region == "" {
 | 
				
			||||||
 | 
							// Unexpected, but sanity-check
 | 
				
			||||||
 | 
							return nil, fmt.Errorf("PD did not have zone/region information: %q", disk.Name)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						labels := make(map[string]string)
 | 
				
			||||||
 | 
						labels[unversioned.LabelZoneFailureDomain] = zone
 | 
				
			||||||
 | 
						labels[unversioned.LabelZoneRegion] = region
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return labels, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (gce *GCECloud) AttachDisk(diskName, instanceID string, readOnly bool) error {
 | 
					func (gce *GCECloud) AttachDisk(diskName, instanceID string, readOnly bool) error {
 | 
				
			||||||
	instance, err := gce.getInstanceByName(instanceID)
 | 
						instance, err := gce.getInstanceByName(instanceID)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -26,6 +26,7 @@ import (
 | 
				
			|||||||
	client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
						client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/cloudprovider"
 | 
						"k8s.io/kubernetes/pkg/cloudprovider"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
 | 
						"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func init() {
 | 
					func init() {
 | 
				
			||||||
@@ -42,6 +43,7 @@ type persistentVolumeLabel struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	mutex            sync.Mutex
 | 
						mutex            sync.Mutex
 | 
				
			||||||
	ebsVolumes       aws.Volumes
 | 
						ebsVolumes       aws.Volumes
 | 
				
			||||||
 | 
						gceCloudProvider *gce.GCECloud
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewPersistentVolumeLabel returns an admission.Interface implementation which adds labels to PersistentVolume CREATE requests,
 | 
					// NewPersistentVolumeLabel returns an admission.Interface implementation which adds labels to PersistentVolume CREATE requests,
 | 
				
			||||||
@@ -75,6 +77,13 @@ func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
		volumeLabels = labels
 | 
							volumeLabels = labels
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if volume.Spec.GCEPersistentDisk != nil {
 | 
				
			||||||
 | 
							labels, err := l.findGCEPDLabels(volume)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return admission.NewForbidden(a, fmt.Errorf("error querying GCE PD volume %s: %v", volume.Spec.GCEPersistentDisk.PDName, err))
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							volumeLabels = labels
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if len(volumeLabels) != 0 {
 | 
						if len(volumeLabels) != 0 {
 | 
				
			||||||
		if volume.Labels == nil {
 | 
							if volume.Labels == nil {
 | 
				
			||||||
@@ -129,3 +138,40 @@ func (l *persistentVolumeLabel) getEBSVolumes() (aws.Volumes, error) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	return l.ebsVolumes, nil
 | 
						return l.ebsVolumes, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (l *persistentVolumeLabel) findGCEPDLabels(volume *api.PersistentVolume) (map[string]string, error) {
 | 
				
			||||||
 | 
						provider, err := l.getGCECloudProvider()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if provider == nil {
 | 
				
			||||||
 | 
							return nil, fmt.Errorf("unable to build GCE cloud provider for PD")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						labels, err := provider.GetAutoLabelsForPD(volume.Spec.GCEPersistentDisk.PDName)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return nil, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return labels, err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// getGCECloudProvider returns the GCE cloud provider, for use for querying volume labels
 | 
				
			||||||
 | 
					func (l *persistentVolumeLabel) getGCECloudProvider() (*gce.GCECloud, error) {
 | 
				
			||||||
 | 
						l.mutex.Lock()
 | 
				
			||||||
 | 
						defer l.mutex.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if l.gceCloudProvider == nil {
 | 
				
			||||||
 | 
							cloudProvider, err := cloudprovider.GetCloudProvider("gce", nil)
 | 
				
			||||||
 | 
							if err != nil || cloudProvider == nil {
 | 
				
			||||||
 | 
								return nil, err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							gceCloudProvider, ok := cloudProvider.(*gce.GCECloud)
 | 
				
			||||||
 | 
							if !ok {
 | 
				
			||||||
 | 
								// GetCloudProvider has gone very wrong
 | 
				
			||||||
 | 
								return nil, fmt.Errorf("error retrieving GCE cloud provider")
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							l.gceCloudProvider = gceCloudProvider
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return l.gceCloudProvider, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user