Merge pull request #46456 from jingxu97/May/allocatable
Automatic merge from submit-queue Add local storage (scratch space) allocatable support This PR adds the support for allocatable local storage (scratch space). This feature is only for root file system which is shared by kubernetes componenets, users' containers and/or images. User could use --kube-reserved flag to reserve the storage for kube system components. If the allocatable storage for user's pods is used up, some pods will be evicted to free the storage resource. This feature is part of local storage capacity isolation and described in the proposal https://github.com/kubernetes/community/pull/306 **Release note**: ```release-note This feature exposes local storage capacity for the primary partitions, and supports & enforces storage reservation in Node Allocatable ```
This commit is contained in:
@@ -291,7 +291,7 @@ func (c *kubeletConfiguration) addFlags(fs *pflag.FlagSet) {
|
||||
|
||||
// Node Allocatable Flags
|
||||
fs.Var(&c.SystemReserved, "system-reserved", "A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=500Mi) pairs that describe resources reserved for non-kubernetes components. Currently only cpu and memory are supported. See http://kubernetes.io/docs/user-guide/compute-resources for more detail. [default=none]")
|
||||
fs.Var(&c.KubeReserved, "kube-reserved", "A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=500Mi) pairs that describe resources reserved for kubernetes system components. Currently only cpu and memory are supported. See http://kubernetes.io/docs/user-guide/compute-resources for more detail. [default=none]")
|
||||
fs.Var(&c.KubeReserved, "kube-reserved", "A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=500Mi, storage=1Gi) pairs that describe resources reserved for kubernetes system components. Currently cpu, memory and local storage for root file system are supported. See http://kubernetes.io/docs/user-guide/compute-resources for more detail. [default=none]")
|
||||
fs.StringSliceVar(&c.EnforceNodeAllocatable, "enforce-node-allocatable", c.EnforceNodeAllocatable, "A comma separated list of levels of node allocatable enforcement to be enforced by kubelet. Acceptible options are 'pods', 'system-reserved' & 'kube-reserved'. If the latter two options are specified, '--system-reserved-cgroup' & '--kube-reserved-cgroup' must also be set respectively. See https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node-allocatable.md for more details.")
|
||||
fs.StringVar(&c.SystemReservedCgroup, "system-reserved-cgroup", c.SystemReservedCgroup, "Absolute name of the top level cgroup that is used to manage non-kubernetes components for which compute resources were reserved via '--system-reserved' flag. Ex. '/system-reserved'. [default='']")
|
||||
fs.StringVar(&c.KubeReservedCgroup, "kube-reserved-cgroup", c.KubeReservedCgroup, "Absolute name of the top level cgroup that is used to manage kubernetes components for which compute resources were reserved via '--kube-reserved' flag. Ex. '/kube-reserved'. [default='']")
|
||||
|
@@ -1025,8 +1025,8 @@ func parseResourceList(m componentconfig.ConfigurationMap) (v1.ResourceList, err
|
||||
rl := make(v1.ResourceList)
|
||||
for k, v := range m {
|
||||
switch v1.ResourceName(k) {
|
||||
// Only CPU and memory resources are supported.
|
||||
case v1.ResourceCPU, v1.ResourceMemory:
|
||||
// CPU, memory and local storage resources are supported.
|
||||
case v1.ResourceCPU, v1.ResourceMemory, v1.ResourceStorage:
|
||||
q, err := resource.ParseQuantity(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@@ -551,7 +551,7 @@ type KubeletConfiguration struct {
|
||||
SystemReserved map[string]string `json:"systemReserved"`
|
||||
// A set of ResourceName=ResourceQuantity (e.g. cpu=200m,memory=150G) pairs
|
||||
// that describe resources reserved for kubernetes system components.
|
||||
// Currently only cpu and memory are supported. [default=none]
|
||||
// Currently cpu, memory and local storage for root file system are supported. [default=none]
|
||||
// See http://kubernetes.io/docs/user-guide/compute-resources for more detail.
|
||||
KubeReserved map[string]string `json:"kubeReserved"`
|
||||
|
||||
|
@@ -18,6 +18,7 @@ package cadvisor
|
||||
|
||||
import (
|
||||
cadvisorapi "github.com/google/cadvisor/info/v1"
|
||||
cadvisorapi2 "github.com/google/cadvisor/info/v2"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
)
|
||||
@@ -33,3 +34,21 @@ func CapacityFromMachineInfo(info *cadvisorapi.MachineInfo) v1.ResourceList {
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
func StorageScratchCapacityFromFsInfo(info cadvisorapi2.FsInfo) v1.ResourceList {
|
||||
c := v1.ResourceList{
|
||||
v1.ResourceStorage: *resource.NewQuantity(
|
||||
int64(info.Capacity),
|
||||
resource.BinarySI),
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
func StorageOverlayCapacityFromFsInfo(info cadvisorapi2.FsInfo) v1.ResourceList {
|
||||
c := v1.ResourceList{
|
||||
v1.ResourceStorageOverlay: *resource.NewQuantity(
|
||||
int64(info.Capacity),
|
||||
resource.BinarySI),
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
@@ -29,6 +29,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
clientv1 "k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
|
||||
"k8s.io/kubernetes/pkg/kubelet/events"
|
||||
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
|
||||
)
|
||||
@@ -180,9 +181,20 @@ func (cm *containerManagerImpl) getNodeAllocatableAbsolute() v1.ResourceList {
|
||||
|
||||
}
|
||||
|
||||
// GetNodeAllocatable returns amount of compute resource that have to be reserved on this node from scheduling.
|
||||
// GetNodeAllocatable returns amount of compute or storage resource that have to be reserved on this node from scheduling.
|
||||
func (cm *containerManagerImpl) GetNodeAllocatableReservation() v1.ResourceList {
|
||||
evictionReservation := hardEvictionReservation(cm.HardEvictionThresholds, cm.capacity)
|
||||
if _, ok := cm.capacity[v1.ResourceStorage]; !ok {
|
||||
if cm.cadvisorInterface != nil {
|
||||
if rootfs, err := cm.cadvisorInterface.RootFsInfo(); err == nil {
|
||||
for rName, rCap := range cadvisor.StorageScratchCapacityFromFsInfo(rootfs) {
|
||||
cm.capacity[rName] = rCap
|
||||
}
|
||||
} else {
|
||||
glog.Warning("Error getting rootfs info: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
result := make(v1.ResourceList)
|
||||
for k := range cm.capacity {
|
||||
value := resource.NewQuantity(0, resource.DecimalSI)
|
||||
|
@@ -38,6 +38,8 @@ const (
|
||||
SignalImageFsInodesFree Signal = "imagefs.inodesFree"
|
||||
// SignalAllocatableMemoryAvailable is amount of memory available for pod allocation (i.e. allocatable - workingSet (of pods), in bytes.
|
||||
SignalAllocatableMemoryAvailable Signal = "allocatableMemory.available"
|
||||
// SignalAllocatableNodeFsAvailable is amount of local storage available for pod allocation
|
||||
SignalAllocatableNodeFsAvailable Signal = "allocatableNodeFs.available"
|
||||
)
|
||||
|
||||
// ThresholdOperator is the operator used to express a Threshold.
|
||||
|
@@ -82,6 +82,8 @@ type managerImpl struct {
|
||||
lastObservations signalObservations
|
||||
// notifiersInitialized indicates if the threshold notifiers have been initialized (i.e. synchronize() has been called once)
|
||||
notifiersInitialized bool
|
||||
// dedicatedImageFs indicates if imagefs is on a separate device from the rootfs
|
||||
dedicatedImageFs *bool
|
||||
}
|
||||
|
||||
// ensure it implements the required interface
|
||||
@@ -106,6 +108,7 @@ func NewManager(
|
||||
nodeRef: nodeRef,
|
||||
nodeConditionsLastObservedAt: nodeConditionsObservedAt{},
|
||||
thresholdsFirstObservedAt: thresholdsObservedAt{},
|
||||
dedicatedImageFs: nil,
|
||||
}
|
||||
return manager, manager
|
||||
}
|
||||
@@ -211,21 +214,22 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
|
||||
}
|
||||
|
||||
glog.V(3).Infof("eviction manager: synchronize housekeeping")
|
||||
|
||||
// build the ranking functions (if not yet known)
|
||||
// TODO: have a function in cadvisor that lets us know if global housekeeping has completed
|
||||
if len(m.resourceToRankFunc) == 0 || len(m.resourceToNodeReclaimFuncs) == 0 {
|
||||
// this may error if cadvisor has yet to complete housekeeping, so we will just try again in next pass.
|
||||
hasDedicatedImageFs, err := diskInfoProvider.HasDedicatedImageFs()
|
||||
if err != nil {
|
||||
if m.dedicatedImageFs == nil {
|
||||
hasImageFs, ok := diskInfoProvider.HasDedicatedImageFs()
|
||||
if ok != nil {
|
||||
return nil
|
||||
}
|
||||
m.resourceToRankFunc = buildResourceToRankFunc(hasDedicatedImageFs)
|
||||
m.resourceToNodeReclaimFuncs = buildResourceToNodeReclaimFuncs(m.imageGC, hasDedicatedImageFs)
|
||||
m.dedicatedImageFs = &hasImageFs
|
||||
m.resourceToRankFunc = buildResourceToRankFunc(hasImageFs)
|
||||
m.resourceToNodeReclaimFuncs = buildResourceToNodeReclaimFuncs(m.imageGC, hasImageFs)
|
||||
|
||||
}
|
||||
|
||||
activePods := podFunc()
|
||||
// make observations and get a function to derive pod usage stats relative to those observations.
|
||||
observations, statsFunc, err := makeSignalObservations(m.summaryProvider, nodeProvider)
|
||||
observations, statsFunc, err := makeSignalObservations(m.summaryProvider, nodeProvider, activePods, *m.dedicatedImageFs)
|
||||
if err != nil {
|
||||
glog.Errorf("eviction manager: unexpected err: %v", err)
|
||||
return nil
|
||||
@@ -336,7 +340,11 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
|
||||
}
|
||||
|
||||
// the only candidates viable for eviction are those pods that had anything running.
|
||||
activePods := podFunc()
|
||||
if len(activePods) == 0 {
|
||||
glog.Errorf("eviction manager: eviction thresholds have been met, but no pods are active to evict")
|
||||
return nil
|
||||
}
|
||||
|
||||
// rank the running pods for eviction for the specified resource
|
||||
rank(activePods, statsFunc)
|
||||
|
||||
|
@@ -54,6 +54,8 @@ const (
|
||||
resourceNodeFs v1.ResourceName = "nodefs"
|
||||
// nodefs inodes, number. internal to this module, used to account for local node root filesystem inodes.
|
||||
resourceNodeFsInodes v1.ResourceName = "nodefsInodes"
|
||||
// container overlay storage, in bytes. internal to this module, used to account for local disk usage for container overlay.
|
||||
resourceOverlay v1.ResourceName = "overlay"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -74,19 +76,25 @@ func init() {
|
||||
signalToNodeCondition[evictionapi.SignalNodeFsAvailable] = v1.NodeDiskPressure
|
||||
signalToNodeCondition[evictionapi.SignalImageFsInodesFree] = v1.NodeDiskPressure
|
||||
signalToNodeCondition[evictionapi.SignalNodeFsInodesFree] = v1.NodeDiskPressure
|
||||
signalToNodeCondition[evictionapi.SignalAllocatableNodeFsAvailable] = v1.NodeDiskPressure
|
||||
|
||||
// map signals to resources (and vice-versa)
|
||||
signalToResource = map[evictionapi.Signal]v1.ResourceName{}
|
||||
signalToResource[evictionapi.SignalMemoryAvailable] = v1.ResourceMemory
|
||||
signalToResource[evictionapi.SignalAllocatableMemoryAvailable] = v1.ResourceMemory
|
||||
signalToResource[evictionapi.SignalAllocatableNodeFsAvailable] = resourceNodeFs
|
||||
signalToResource[evictionapi.SignalImageFsAvailable] = resourceImageFs
|
||||
signalToResource[evictionapi.SignalImageFsInodesFree] = resourceImageFsInodes
|
||||
signalToResource[evictionapi.SignalNodeFsAvailable] = resourceNodeFs
|
||||
signalToResource[evictionapi.SignalNodeFsInodesFree] = resourceNodeFsInodes
|
||||
|
||||
resourceToSignal = map[v1.ResourceName]evictionapi.Signal{}
|
||||
for key, value := range signalToResource {
|
||||
resourceToSignal[value] = key
|
||||
}
|
||||
// Hard-code here to make sure resourceNodeFs maps to evictionapi.SignalNodeFsAvailable
|
||||
// (TODO) resourceToSignal is a map from resource name to a list of signals
|
||||
resourceToSignal[resourceNodeFs] = evictionapi.SignalNodeFsAvailable
|
||||
}
|
||||
|
||||
// validSignal returns true if the signal is supported.
|
||||
@@ -234,6 +242,16 @@ func getAllocatableThreshold(allocatableConfig []string) []evictionapi.Threshold
|
||||
Quantity: resource.NewQuantity(int64(0), resource.BinarySI),
|
||||
},
|
||||
},
|
||||
{
|
||||
Signal: evictionapi.SignalAllocatableNodeFsAvailable,
|
||||
Operator: evictionapi.OpLessThan,
|
||||
Value: evictionapi.ThresholdValue{
|
||||
Quantity: resource.NewQuantity(int64(0), resource.BinarySI),
|
||||
},
|
||||
MinReclaim: &evictionapi.ThresholdValue{
|
||||
Quantity: resource.NewQuantity(int64(0), resource.BinarySI),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -382,10 +400,12 @@ func localVolumeNames(pod *v1.Pod) []string {
|
||||
func podDiskUsage(podStats statsapi.PodStats, pod *v1.Pod, statsToMeasure []fsStatsType) (v1.ResourceList, error) {
|
||||
disk := resource.Quantity{Format: resource.BinarySI}
|
||||
inodes := resource.Quantity{Format: resource.BinarySI}
|
||||
overlay := resource.Quantity{Format: resource.BinarySI}
|
||||
for _, container := range podStats.Containers {
|
||||
if hasFsStatsType(statsToMeasure, fsStatsRoot) {
|
||||
disk.Add(*diskUsage(container.Rootfs))
|
||||
inodes.Add(*inodeUsage(container.Rootfs))
|
||||
overlay.Add(*diskUsage(container.Rootfs))
|
||||
}
|
||||
if hasFsStatsType(statsToMeasure, fsStatsLogs) {
|
||||
disk.Add(*diskUsage(container.Logs))
|
||||
@@ -407,6 +427,7 @@ func podDiskUsage(podStats statsapi.PodStats, pod *v1.Pod, statsToMeasure []fsSt
|
||||
return v1.ResourceList{
|
||||
resourceDisk: disk,
|
||||
resourceInodes: inodes,
|
||||
resourceOverlay: overlay,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -637,7 +658,7 @@ func (a byEvictionPriority) Less(i, j int) bool {
|
||||
}
|
||||
|
||||
// makeSignalObservations derives observations using the specified summary provider.
|
||||
func makeSignalObservations(summaryProvider stats.SummaryProvider, nodeProvider NodeProvider) (signalObservations, statsFunc, error) {
|
||||
func makeSignalObservations(summaryProvider stats.SummaryProvider, nodeProvider NodeProvider, pods []*v1.Pod, withImageFs bool) (signalObservations, statsFunc, error) {
|
||||
summary, err := summaryProvider.Get()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
@@ -706,6 +727,37 @@ func makeSignalObservations(summaryProvider stats.SummaryProvider, nodeProvider
|
||||
capacity: memoryAllocatableCapacity.Copy(),
|
||||
}
|
||||
}
|
||||
|
||||
if storageScratchAllocatableCapacity, ok := node.Status.Allocatable[v1.ResourceStorage]; ok {
|
||||
storageScratchAllocatable := storageScratchAllocatableCapacity.Copy()
|
||||
for _, pod := range pods {
|
||||
podStat, ok := statsFunc(pod)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
usage, err := podDiskUsage(podStat, pod, []fsStatsType{fsStatsLogs, fsStatsLocalVolumeSource, fsStatsRoot})
|
||||
if err != nil {
|
||||
glog.Warningf("eviction manager: error getting pod disk usage %v", err)
|
||||
continue
|
||||
}
|
||||
// If there is a seperate imagefs set up for container runtimes, the scratch disk usage from nodefs should exclude the overlay usage
|
||||
if withImageFs {
|
||||
diskUsage := usage[resourceDisk]
|
||||
diskUsageP := &diskUsage
|
||||
diskUsagep := diskUsageP.Copy()
|
||||
diskUsagep.Sub(usage[resourceOverlay])
|
||||
storageScratchAllocatable.Sub(*diskUsagep)
|
||||
} else {
|
||||
storageScratchAllocatable.Sub(usage[resourceDisk])
|
||||
}
|
||||
}
|
||||
result[evictionapi.SignalAllocatableNodeFsAvailable] = signalObservation{
|
||||
available: storageScratchAllocatable,
|
||||
capacity: storageScratchAllocatableCapacity.Copy(),
|
||||
}
|
||||
}
|
||||
|
||||
return result, statsFunc, nil
|
||||
}
|
||||
|
||||
|
@@ -76,6 +76,16 @@ func TestParseThresholdConfig(t *testing.T) {
|
||||
Quantity: quantityMustParse("0"),
|
||||
},
|
||||
},
|
||||
{
|
||||
Signal: evictionapi.SignalAllocatableNodeFsAvailable,
|
||||
Operator: evictionapi.OpLessThan,
|
||||
Value: evictionapi.ThresholdValue{
|
||||
Quantity: quantityMustParse("0"),
|
||||
},
|
||||
MinReclaim: &evictionapi.ThresholdValue{
|
||||
Quantity: quantityMustParse("0"),
|
||||
},
|
||||
},
|
||||
{
|
||||
Signal: evictionapi.SignalMemoryAvailable,
|
||||
Operator: evictionapi.OpLessThan,
|
||||
@@ -777,8 +787,7 @@ func TestMakeSignalObservations(t *testing.T) {
|
||||
if res.CmpInt64(int64(allocatableMemoryCapacity)) != 0 {
|
||||
t.Errorf("Expected Threshold %v to be equal to value %v", res.Value(), allocatableMemoryCapacity)
|
||||
}
|
||||
actualObservations, statsFunc, err := makeSignalObservations(provider, nodeProvider)
|
||||
|
||||
actualObservations, statsFunc, err := makeSignalObservations(provider, nodeProvider, pods, false)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected err: %v", err)
|
||||
}
|
||||
|
@@ -37,6 +37,7 @@ import (
|
||||
clientgoclientset "k8s.io/client-go/kubernetes"
|
||||
|
||||
cadvisorapi "github.com/google/cadvisor/info/v1"
|
||||
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
@@ -927,6 +928,9 @@ type Kubelet struct {
|
||||
// Cached MachineInfo returned by cadvisor.
|
||||
machineInfo *cadvisorapi.MachineInfo
|
||||
|
||||
//Cached RootFsInfo returned by cadvisor
|
||||
rootfsInfo *cadvisorapiv2.FsInfo
|
||||
|
||||
// Handles certificate rotations.
|
||||
serverCertificateManager certificate.Manager
|
||||
|
||||
|
@@ -100,3 +100,15 @@ func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) {
|
||||
}
|
||||
return kl.machineInfo, nil
|
||||
}
|
||||
|
||||
// GetCachedRootFsInfo assumes that the rootfs info can't change without a reboot
|
||||
func (kl *Kubelet) GetCachedRootFsInfo() (cadvisorapiv2.FsInfo, error) {
|
||||
if kl.rootfsInfo == nil {
|
||||
info, err := kl.cadvisor.RootFsInfo()
|
||||
if err != nil {
|
||||
return cadvisorapiv2.FsInfo{}, err
|
||||
}
|
||||
kl.rootfsInfo = &info
|
||||
}
|
||||
return *kl.rootfsInfo, nil
|
||||
}
|
||||
|
@@ -553,6 +553,26 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
|
||||
node.Status.NodeInfo.BootID = info.BootID
|
||||
}
|
||||
|
||||
rootfs, err := kl.GetCachedRootFsInfo()
|
||||
if err != nil {
|
||||
node.Status.Capacity[v1.ResourceStorage] = resource.MustParse("0Gi")
|
||||
} else {
|
||||
for rName, rCap := range cadvisor.StorageScratchCapacityFromFsInfo(rootfs) {
|
||||
node.Status.Capacity[rName] = rCap
|
||||
}
|
||||
}
|
||||
|
||||
if hasDedicatedImageFs, _ := kl.HasDedicatedImageFs(); hasDedicatedImageFs {
|
||||
imagesfs, err := kl.ImagesFsInfo()
|
||||
if err != nil {
|
||||
node.Status.Capacity[v1.ResourceStorageOverlay] = resource.MustParse("0Gi")
|
||||
} else {
|
||||
for rName, rCap := range cadvisor.StorageOverlayCapacityFromFsInfo(imagesfs) {
|
||||
node.Status.Capacity[rName] = rCap
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set Allocatable.
|
||||
if node.Status.Allocatable == nil {
|
||||
node.Status.Allocatable = make(v1.ResourceList)
|
||||
|
@@ -210,11 +210,13 @@ func TestUpdateNewNodeStatus(t *testing.T) {
|
||||
Capacity: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
|
||||
v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI),
|
||||
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
|
||||
},
|
||||
Allocatable: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI),
|
||||
v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI),
|
||||
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
|
||||
},
|
||||
Addresses: []v1.NodeAddress{
|
||||
@@ -363,11 +365,13 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
|
||||
Capacity: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
|
||||
v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI),
|
||||
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
|
||||
},
|
||||
Allocatable: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(2800, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI),
|
||||
v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI),
|
||||
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
|
||||
},
|
||||
},
|
||||
@@ -446,11 +450,13 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
|
||||
Capacity: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
|
||||
v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI),
|
||||
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
|
||||
},
|
||||
Allocatable: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI),
|
||||
v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI),
|
||||
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
|
||||
},
|
||||
Addresses: []v1.NodeAddress{
|
||||
@@ -657,6 +663,7 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
|
||||
allocatable: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(100E6, resource.BinarySI),
|
||||
v1.ResourceStorage: *resource.NewQuantity(200*mb, resource.BinarySI),
|
||||
},
|
||||
}
|
||||
|
||||
@@ -729,11 +736,13 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
|
||||
Capacity: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
|
||||
v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI),
|
||||
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
|
||||
},
|
||||
Allocatable: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI),
|
||||
v1.ResourceStorage: *resource.NewQuantity(300*mb, resource.BinarySI),
|
||||
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
|
||||
},
|
||||
Addresses: []v1.NodeAddress{
|
||||
@@ -1143,11 +1152,13 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) {
|
||||
Capacity: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
|
||||
v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI),
|
||||
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
|
||||
},
|
||||
Allocatable: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
|
||||
v1.ResourceStorage: *resource.NewQuantity(500*mb, resource.BinarySI),
|
||||
v1.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
|
||||
},
|
||||
},
|
||||
|
@@ -510,6 +510,8 @@ func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource {
|
||||
result.MilliCPU += rQuantity.MilliValue()
|
||||
case v1.ResourceNvidiaGPU:
|
||||
result.NvidiaGPU += rQuantity.Value()
|
||||
case v1.ResourceStorageOverlay:
|
||||
result.StorageOverlay += rQuantity.Value()
|
||||
default:
|
||||
if v1helper.IsOpaqueIntResourceName(rName) {
|
||||
result.AddOpaque(rName, rQuantity.Value())
|
||||
@@ -517,6 +519,15 @@ func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Account for storage requested by emptydir volumes
|
||||
// If the storage medium is memory, should exclude the size
|
||||
for _, vol := range pod.Spec.Volumes {
|
||||
if vol.EmptyDir != nil && vol.EmptyDir.Medium != v1.StorageMediumMemory {
|
||||
|
||||
result.StorageScratch += vol.EmptyDir.SizeLimit.Value()
|
||||
}
|
||||
}
|
||||
|
||||
// take max_resource(sum_pod, any_init_container)
|
||||
for _, container := range pod.Spec.InitContainers {
|
||||
for rName, rQuantity := range container.Resources.Requests {
|
||||
@@ -533,6 +544,10 @@ func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource {
|
||||
if gpu := rQuantity.Value(); gpu > result.NvidiaGPU {
|
||||
result.NvidiaGPU = gpu
|
||||
}
|
||||
case v1.ResourceStorageOverlay:
|
||||
if overlay := rQuantity.Value(); overlay > result.StorageOverlay {
|
||||
result.StorageOverlay = overlay
|
||||
}
|
||||
default:
|
||||
if v1helper.IsOpaqueIntResourceName(rName) {
|
||||
value := rQuantity.Value()
|
||||
@@ -583,6 +598,23 @@ func PodFitsResources(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.No
|
||||
if allocatable.NvidiaGPU < podRequest.NvidiaGPU+nodeInfo.RequestedResource().NvidiaGPU {
|
||||
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceNvidiaGPU, podRequest.NvidiaGPU, nodeInfo.RequestedResource().NvidiaGPU, allocatable.NvidiaGPU))
|
||||
}
|
||||
|
||||
scratchSpaceRequest := podRequest.StorageScratch
|
||||
if allocatable.StorageOverlay == 0 {
|
||||
scratchSpaceRequest += podRequest.StorageOverlay
|
||||
//scratchSpaceRequest += nodeInfo.RequestedResource().StorageOverlay
|
||||
nodeScratchRequest := nodeInfo.RequestedResource().StorageOverlay + nodeInfo.RequestedResource().StorageScratch
|
||||
if allocatable.StorageScratch < scratchSpaceRequest+nodeScratchRequest {
|
||||
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceStorageScratch, scratchSpaceRequest, nodeScratchRequest, allocatable.StorageScratch))
|
||||
}
|
||||
|
||||
} else if allocatable.StorageScratch < scratchSpaceRequest+nodeInfo.RequestedResource().StorageScratch {
|
||||
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceStorageScratch, scratchSpaceRequest, nodeInfo.RequestedResource().StorageScratch, allocatable.StorageScratch))
|
||||
}
|
||||
if allocatable.StorageOverlay > 0 && allocatable.StorageOverlay < podRequest.StorageOverlay+nodeInfo.RequestedResource().StorageOverlay {
|
||||
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceStorageOverlay, podRequest.StorageOverlay, nodeInfo.RequestedResource().StorageOverlay, allocatable.StorageOverlay))
|
||||
}
|
||||
|
||||
for rName, rQuant := range podRequest.OpaqueIntResources {
|
||||
if allocatable.OpaqueIntResources[rName] < rQuant+nodeInfo.RequestedResource().OpaqueIntResources[rName] {
|
||||
predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.OpaqueIntResources[rName], nodeInfo.RequestedResource().OpaqueIntResources[rName], allocatable.OpaqueIntResources[rName]))
|
||||
|
@@ -78,7 +78,7 @@ var (
|
||||
opaqueResourceB = v1helper.OpaqueIntResourceName("BBB")
|
||||
)
|
||||
|
||||
func makeResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA int64) v1.NodeResources {
|
||||
func makeResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA, storage int64) v1.NodeResources {
|
||||
return v1.NodeResources{
|
||||
Capacity: v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
|
||||
@@ -86,17 +86,19 @@ func makeResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA int64) v1.NodeRes
|
||||
v1.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI),
|
||||
v1.ResourceNvidiaGPU: *resource.NewQuantity(nvidiaGPUs, resource.DecimalSI),
|
||||
opaqueResourceA: *resource.NewQuantity(opaqueA, resource.DecimalSI),
|
||||
v1.ResourceStorage: *resource.NewQuantity(storage, resource.BinarySI),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func makeAllocatableResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA int64) v1.ResourceList {
|
||||
func makeAllocatableResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA, storage int64) v1.ResourceList {
|
||||
return v1.ResourceList{
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
|
||||
v1.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI),
|
||||
v1.ResourceNvidiaGPU: *resource.NewQuantity(nvidiaGPUs, resource.DecimalSI),
|
||||
opaqueResourceA: *resource.NewQuantity(opaqueA, resource.DecimalSI),
|
||||
v1.ResourceStorage: *resource.NewQuantity(storage, resource.BinarySI),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,6 +116,25 @@ func newResourcePod(usage ...schedulercache.Resource) *v1.Pod {
|
||||
}
|
||||
}
|
||||
|
||||
func addStorageLimit(pod *v1.Pod, sizeLimit int64, medium v1.StorageMedium) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
Spec: v1.PodSpec{
|
||||
Containers: pod.Spec.Containers,
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
Name: "emptyDirVolumeName",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
EmptyDir: &v1.EmptyDirVolumeSource{
|
||||
SizeLimit: *resource.NewQuantity(sizeLimit, resource.BinarySI),
|
||||
Medium: medium,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newResourceInitPod(pod *v1.Pod, usage ...schedulercache.Resource) *v1.Pod {
|
||||
pod.Spec.InitContainers = newResourcePod(usage...).Spec.Containers
|
||||
return pod
|
||||
@@ -331,7 +352,7 @@ func TestPodFitsResources(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, test := range enoughPodsTests {
|
||||
node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 5)}}
|
||||
node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 5, 20).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 5, 20)}}
|
||||
test.nodeInfo.SetNode(&node)
|
||||
fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
|
||||
if err != nil {
|
||||
@@ -386,7 +407,7 @@ func TestPodFitsResources(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for _, test := range notEnoughPodsTests {
|
||||
node := v1.Node{Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 0, 1, 0)}}
|
||||
node := v1.Node{Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 0, 1, 0, 0)}}
|
||||
test.nodeInfo.SetNode(&node)
|
||||
fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
|
||||
if err != nil {
|
||||
@@ -399,6 +420,86 @@ func TestPodFitsResources(t *testing.T) {
|
||||
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits)
|
||||
}
|
||||
}
|
||||
|
||||
storagePodsTests := []struct {
|
||||
pod *v1.Pod
|
||||
emptyDirLimit int64
|
||||
storageMedium v1.StorageMedium
|
||||
nodeInfo *schedulercache.NodeInfo
|
||||
fits bool
|
||||
test string
|
||||
reasons []algorithm.PredicateFailureReason
|
||||
}{
|
||||
{
|
||||
pod: newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1, StorageOverlay: 1}),
|
||||
nodeInfo: schedulercache.NewNodeInfo(
|
||||
newResourcePod(schedulercache.Resource{MilliCPU: 10, Memory: 10, StorageOverlay: 20})),
|
||||
fits: false,
|
||||
test: "due to init container scratch disk",
|
||||
reasons: []algorithm.PredicateFailureReason{
|
||||
NewInsufficientResourceError(v1.ResourceCPU, 1, 10, 10),
|
||||
NewInsufficientResourceError(v1.ResourceStorageScratch, 1, 20, 20),
|
||||
},
|
||||
},
|
||||
{
|
||||
pod: newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1, StorageOverlay: 10}),
|
||||
nodeInfo: schedulercache.NewNodeInfo(
|
||||
newResourcePod(schedulercache.Resource{MilliCPU: 2, Memory: 10})),
|
||||
fits: true,
|
||||
test: "pod fit",
|
||||
},
|
||||
{
|
||||
pod: newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1, StorageOverlay: 18}),
|
||||
nodeInfo: schedulercache.NewNodeInfo(
|
||||
newResourcePod(schedulercache.Resource{MilliCPU: 2, Memory: 2, StorageOverlay: 5})),
|
||||
fits: false,
|
||||
test: "request exceeds allocatable",
|
||||
reasons: []algorithm.PredicateFailureReason{
|
||||
NewInsufficientResourceError(v1.ResourceStorageScratch, 18, 5, 20),
|
||||
},
|
||||
},
|
||||
{
|
||||
pod: newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1, StorageOverlay: 10}),
|
||||
emptyDirLimit: 15,
|
||||
storageMedium: v1.StorageMediumDefault,
|
||||
nodeInfo: schedulercache.NewNodeInfo(
|
||||
newResourcePod(schedulercache.Resource{MilliCPU: 2, Memory: 2, StorageOverlay: 5})),
|
||||
fits: false,
|
||||
test: "storage scratchrequest exceeds allocatable",
|
||||
reasons: []algorithm.PredicateFailureReason{
|
||||
NewInsufficientResourceError(v1.ResourceStorageScratch, 25, 5, 20),
|
||||
},
|
||||
},
|
||||
{
|
||||
pod: newResourcePod(schedulercache.Resource{MilliCPU: 1, Memory: 1, StorageOverlay: 10}),
|
||||
emptyDirLimit: 15,
|
||||
storageMedium: v1.StorageMediumMemory,
|
||||
nodeInfo: schedulercache.NewNodeInfo(
|
||||
newResourcePod(schedulercache.Resource{MilliCPU: 2, Memory: 2, StorageOverlay: 5})),
|
||||
fits: true,
|
||||
test: "storage scratchrequest exceeds allocatable",
|
||||
reasons: []algorithm.PredicateFailureReason{
|
||||
NewInsufficientResourceError(v1.ResourceStorageScratch, 25, 5, 20),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range storagePodsTests {
|
||||
node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 5, 20).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 5, 20)}}
|
||||
test.nodeInfo.SetNode(&node)
|
||||
pod := addStorageLimit(test.pod, test.emptyDirLimit, test.storageMedium)
|
||||
fits, reasons, err := PodFitsResources(pod, PredicateMetadata(pod, nil), test.nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("%s: unexpected error: %v", test.test, err)
|
||||
}
|
||||
if !fits && !reflect.DeepEqual(reasons, test.reasons) {
|
||||
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, test.reasons)
|
||||
}
|
||||
if fits != test.fits {
|
||||
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestPodFitsHost(t *testing.T) {
|
||||
@@ -1845,7 +1946,7 @@ func TestRunGeneralPredicates(t *testing.T) {
|
||||
newResourcePod(schedulercache.Resource{MilliCPU: 9, Memory: 19})),
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "machine1"},
|
||||
Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0)},
|
||||
Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0, 0)},
|
||||
},
|
||||
fits: true,
|
||||
wErr: nil,
|
||||
@@ -1857,7 +1958,7 @@ func TestRunGeneralPredicates(t *testing.T) {
|
||||
newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 19})),
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "machine1"},
|
||||
Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0)},
|
||||
Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0, 0)},
|
||||
},
|
||||
fits: false,
|
||||
wErr: nil,
|
||||
@@ -1871,7 +1972,7 @@ func TestRunGeneralPredicates(t *testing.T) {
|
||||
pod: &v1.Pod{},
|
||||
nodeInfo: schedulercache.NewNodeInfo(
|
||||
newResourcePod(schedulercache.Resource{MilliCPU: 9, Memory: 19})),
|
||||
node: &v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0)}},
|
||||
node: &v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0, 0)}},
|
||||
fits: true,
|
||||
wErr: nil,
|
||||
test: "no resources/port/host requested always fits on GPU machine",
|
||||
@@ -1880,7 +1981,7 @@ func TestRunGeneralPredicates(t *testing.T) {
|
||||
pod: newResourcePod(schedulercache.Resource{MilliCPU: 3, Memory: 1, NvidiaGPU: 1}),
|
||||
nodeInfo: schedulercache.NewNodeInfo(
|
||||
newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 10, NvidiaGPU: 1})),
|
||||
node: &v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0)}},
|
||||
node: &v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0, 0)}},
|
||||
fits: false,
|
||||
wErr: nil,
|
||||
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(v1.ResourceNvidiaGPU, 1, 1, 1)},
|
||||
@@ -1890,7 +1991,7 @@ func TestRunGeneralPredicates(t *testing.T) {
|
||||
pod: newResourcePod(schedulercache.Resource{MilliCPU: 3, Memory: 1, NvidiaGPU: 1}),
|
||||
nodeInfo: schedulercache.NewNodeInfo(
|
||||
newResourcePod(schedulercache.Resource{MilliCPU: 5, Memory: 10, NvidiaGPU: 0})),
|
||||
node: &v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0)}},
|
||||
node: &v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 1, 32, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 1, 32, 0, 0)}},
|
||||
fits: true,
|
||||
wErr: nil,
|
||||
test: "enough GPU resource",
|
||||
@@ -1904,7 +2005,7 @@ func TestRunGeneralPredicates(t *testing.T) {
|
||||
nodeInfo: schedulercache.NewNodeInfo(),
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "machine1"},
|
||||
Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0)},
|
||||
Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0, 0)},
|
||||
},
|
||||
fits: false,
|
||||
wErr: nil,
|
||||
@@ -1916,7 +2017,7 @@ func TestRunGeneralPredicates(t *testing.T) {
|
||||
nodeInfo: schedulercache.NewNodeInfo(newPodWithPort(123)),
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "machine1"},
|
||||
Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0)},
|
||||
Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 0, 0).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 0, 0)},
|
||||
},
|
||||
fits: false,
|
||||
wErr: nil,
|
||||
@@ -3251,7 +3352,7 @@ func TestPodSchedulesOnNodeWithMemoryPressureCondition(t *testing.T) {
|
||||
ImagePullPolicy: "Always",
|
||||
// at least one requirement -> burstable pod
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: makeAllocatableResources(100, 100, 100, 100, 0),
|
||||
Requests: makeAllocatableResources(100, 100, 100, 100, 0, 0),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@@ -69,6 +69,8 @@ type Resource struct {
|
||||
MilliCPU int64
|
||||
Memory int64
|
||||
NvidiaGPU int64
|
||||
StorageScratch int64
|
||||
StorageOverlay int64
|
||||
OpaqueIntResources map[v1.ResourceName]int64
|
||||
}
|
||||
|
||||
@@ -77,6 +79,7 @@ func (r *Resource) ResourceList() v1.ResourceList {
|
||||
v1.ResourceCPU: *resource.NewMilliQuantity(r.MilliCPU, resource.DecimalSI),
|
||||
v1.ResourceMemory: *resource.NewQuantity(r.Memory, resource.BinarySI),
|
||||
v1.ResourceNvidiaGPU: *resource.NewQuantity(r.NvidiaGPU, resource.DecimalSI),
|
||||
v1.ResourceStorageOverlay: *resource.NewQuantity(r.StorageOverlay, resource.BinarySI),
|
||||
}
|
||||
for rName, rQuant := range r.OpaqueIntResources {
|
||||
result[rName] = *resource.NewQuantity(rQuant, resource.DecimalSI)
|
||||
@@ -89,6 +92,8 @@ func (r *Resource) Clone() *Resource {
|
||||
MilliCPU: r.MilliCPU,
|
||||
Memory: r.Memory,
|
||||
NvidiaGPU: r.NvidiaGPU,
|
||||
StorageOverlay: r.StorageOverlay,
|
||||
StorageScratch: r.StorageScratch,
|
||||
}
|
||||
if r.OpaqueIntResources != nil {
|
||||
res.OpaqueIntResources = make(map[v1.ResourceName]int64)
|
||||
@@ -262,6 +267,8 @@ func (n *NodeInfo) addPod(pod *v1.Pod) {
|
||||
n.requestedResource.MilliCPU += res.MilliCPU
|
||||
n.requestedResource.Memory += res.Memory
|
||||
n.requestedResource.NvidiaGPU += res.NvidiaGPU
|
||||
n.requestedResource.StorageOverlay += res.StorageOverlay
|
||||
n.requestedResource.StorageScratch += res.StorageScratch
|
||||
if n.requestedResource.OpaqueIntResources == nil && len(res.OpaqueIntResources) > 0 {
|
||||
n.requestedResource.OpaqueIntResources = map[v1.ResourceName]int64{}
|
||||
}
|
||||
@@ -347,6 +354,8 @@ func calculateResource(pod *v1.Pod) (res Resource, non0_cpu int64, non0_mem int6
|
||||
res.Memory += rQuant.Value()
|
||||
case v1.ResourceNvidiaGPU:
|
||||
res.NvidiaGPU += rQuant.Value()
|
||||
case v1.ResourceStorageOverlay:
|
||||
res.StorageOverlay += rQuant.Value()
|
||||
default:
|
||||
if v1helper.IsOpaqueIntResourceName(rName) {
|
||||
res.AddOpaque(rName, rQuant.Value())
|
||||
@@ -359,6 +368,15 @@ func calculateResource(pod *v1.Pod) (res Resource, non0_cpu int64, non0_mem int6
|
||||
non0_mem += non0_mem_req
|
||||
// No non-zero resources for GPUs or opaque resources.
|
||||
}
|
||||
|
||||
// Account for storage requested by emptydir volumes
|
||||
// If the storage medium is memory, should exclude the size
|
||||
for _, vol := range pod.Spec.Volumes {
|
||||
if vol.EmptyDir != nil && vol.EmptyDir.Medium != v1.StorageMediumMemory {
|
||||
res.StorageScratch += vol.EmptyDir.SizeLimit.Value()
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -389,6 +407,10 @@ func (n *NodeInfo) SetNode(node *v1.Node) error {
|
||||
n.allocatableResource.NvidiaGPU = rQuant.Value()
|
||||
case v1.ResourcePods:
|
||||
n.allowedPodNumber = int(rQuant.Value())
|
||||
case v1.ResourceStorage:
|
||||
n.allocatableResource.StorageScratch = rQuant.Value()
|
||||
case v1.ResourceStorageOverlay:
|
||||
n.allocatableResource.StorageOverlay = rQuant.Value()
|
||||
default:
|
||||
if v1helper.IsOpaqueIntResourceName(rName) {
|
||||
n.allocatableResource.SetOpaque(rName, rQuant.Value())
|
||||
|
@@ -76,6 +76,7 @@ go_test(
|
||||
"inode_eviction_test.go",
|
||||
"kubelet_test.go",
|
||||
"lifecycle_hook_test.go",
|
||||
"local_storage_allocatable_eviction_test.go",
|
||||
"log_path_test.go",
|
||||
"memory_eviction_test.go",
|
||||
"mirror_pod_test.go",
|
||||
|
322
test/e2e_node/local_storage_allocatable_eviction_test.go
Normal file
322
test/e2e_node/local_storage_allocatable_eviction_test.go
Normal file
@@ -0,0 +1,322 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2e_node
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
nodeutil "k8s.io/kubernetes/pkg/api/v1/node"
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
// Eviction Policy is described here:
|
||||
// https://github.com/kubernetes/kubernetes/blob/master/docs/proposals/kubelet-eviction.md
|
||||
|
||||
var _ = framework.KubeDescribe("LocalStorageAllocatableEviction [Slow] [Serial] [Disruptive] [Flaky]", func() {
|
||||
f := framework.NewDefaultFramework("localstorageallocatable-eviction-test")
|
||||
evictionTestTimeout := 15 * time.Minute
|
||||
testCondition := "Evict pod due to local storage allocatable violation"
|
||||
conditionType := v1.NodeDiskPressure
|
||||
var podTestSpecs []podTestSpec
|
||||
//podTestSpecsS := make([]podTestSpec, 5)
|
||||
var diskReserve uint64
|
||||
Context(fmt.Sprintf("when we run containers that should cause %s", testCondition), func() {
|
||||
|
||||
BeforeEach(func() {
|
||||
diskAvail, err := getDiskUsage()
|
||||
if err != nil {
|
||||
framework.ExpectNoError(err)
|
||||
}
|
||||
|
||||
diskReserve = uint64(0.8 * diskAvail / 1000000) // Reserve 0.8 * disk Capacity for kube-reserved scratch storage
|
||||
maxDisk := 10000000 // Set dd command to read and write up to 10MB at a time
|
||||
count := uint64(0.8 * diskAvail / float64(maxDisk))
|
||||
command := fmt.Sprintf("dd if=/dev/urandom of=dummy bs=%d count=%d; sleep 0.5; while true; do sleep 5; done", maxDisk, count)
|
||||
|
||||
podTestSpecs = []podTestSpec{
|
||||
{
|
||||
evictionPriority: 1, // This pod should be evicted before the innocent pod
|
||||
pod: v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "container-disk-hog-pod"},
|
||||
Spec: v1.PodSpec{
|
||||
RestartPolicy: v1.RestartPolicyNever,
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Image: "gcr.io/google_containers/busybox:1.24",
|
||||
Name: "container-disk-hog-pod",
|
||||
Command: []string{"sh", "-c", command},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
evictionPriority: 0, // This pod should never be evicted
|
||||
pod: v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "idle-pod"},
|
||||
Spec: v1.PodSpec{
|
||||
RestartPolicy: v1.RestartPolicyNever,
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Image: "gcr.io/google_containers/busybox:1.24",
|
||||
Name: "idle-pod",
|
||||
Command: []string{"sh", "-c",
|
||||
fmt.Sprintf("while true; do sleep 5; done")},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
})
|
||||
|
||||
// Set up --kube-reserved for scratch storage
|
||||
tempSetCurrentKubeletConfig(f, func(initialConfig *componentconfig.KubeletConfiguration) {
|
||||
framework.Logf("Set up --kube-reserved for local storage reserved %dMi", diskReserve)
|
||||
initialConfig.KubeReserved = componentconfig.ConfigurationMap(map[string]string{"storage": fmt.Sprintf("%dMi", diskReserve)})
|
||||
|
||||
})
|
||||
|
||||
// Place the remainder of the test within a context so that the kubelet config is set before and after the test.
|
||||
Context("With kubeconfig updated", func() {
|
||||
runLocalStorageEvictionTest(f, conditionType, testCondition, &podTestSpecs, evictionTestTimeout, hasDiskPressure)
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
// Returns TRUE if the node has disk pressure, FALSE otherwise
|
||||
func hasDiskPressure(f *framework.Framework, conditionType v1.NodeConditionType, testCondition string) (bool, error) {
|
||||
localNodeStatus := getLocalNode(f).Status
|
||||
_, pressure := nodeutil.GetNodeCondition(&localNodeStatus, conditionType)
|
||||
Expect(pressure).NotTo(BeNil())
|
||||
hasPressure := pressure.Status == v1.ConditionTrue
|
||||
By(fmt.Sprintf("checking if pod has %s: %v", testCondition, hasPressure))
|
||||
|
||||
// Additional Logging relating to disk
|
||||
summary, err := getNodeSummary()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if summary.Node.Runtime != nil && summary.Node.Runtime.ImageFs != nil && summary.Node.Runtime.ImageFs.UsedBytes != nil {
|
||||
framework.Logf("imageFsInfo.UsedBytes: %d", *summary.Node.Runtime.ImageFs.UsedBytes)
|
||||
}
|
||||
if summary.Node.Fs != nil && summary.Node.Fs.UsedBytes != nil {
|
||||
framework.Logf("rootFsInfo.UsedBytes: %d", *summary.Node.Fs.UsedBytes)
|
||||
}
|
||||
for _, pod := range summary.Pods {
|
||||
framework.Logf("Pod: %s", pod.PodRef.Name)
|
||||
for _, container := range pod.Containers {
|
||||
if container.Rootfs != nil && container.Rootfs.UsedBytes != nil {
|
||||
framework.Logf("--- summary Container: %s UsedBytes: %d", container.Name, *container.Rootfs.UsedBytes)
|
||||
}
|
||||
}
|
||||
for _, volume := range pod.VolumeStats {
|
||||
if volume.FsStats.UsedBytes != nil {
|
||||
framework.Logf("--- summary Volume: %s UsedBytes: %d", volume.Name, *volume.FsStats.UsedBytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
return hasPressure, nil
|
||||
}
|
||||
|
||||
// Pass podTestSpecsP as references so that it could be set up in the first BeforeEach clause
|
||||
func runLocalStorageEvictionTest(f *framework.Framework, conditionType v1.NodeConditionType, testCondition string, podTestSpecsP *[]podTestSpec, evictionTestTimeout time.Duration,
|
||||
hasPressureCondition func(*framework.Framework, v1.NodeConditionType, string) (bool, error)) {
|
||||
BeforeEach(func() {
|
||||
|
||||
By("seting up pods to be used by tests")
|
||||
for _, spec := range *podTestSpecsP {
|
||||
By(fmt.Sprintf("creating pod with container: %s", spec.pod.Name))
|
||||
f.PodClient().CreateSync(&spec.pod)
|
||||
}
|
||||
})
|
||||
|
||||
It(fmt.Sprintf("should eventually see %s, and then evict all of the correct pods", testCondition), func() {
|
||||
Expect(podTestSpecsP).NotTo(BeNil())
|
||||
podTestSpecs := *podTestSpecsP
|
||||
|
||||
Eventually(func() error {
|
||||
hasPressure, err := hasPressureCondition(f, conditionType, testCondition)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if hasPressure {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Condition: %s not encountered", testCondition)
|
||||
}, evictionTestTimeout, evictionPollInterval).Should(BeNil())
|
||||
|
||||
Eventually(func() error {
|
||||
// Gather current information
|
||||
updatedPodList, err := f.ClientSet.Core().Pods(f.Namespace.Name).List(metav1.ListOptions{})
|
||||
updatedPods := updatedPodList.Items
|
||||
for _, p := range updatedPods {
|
||||
framework.Logf("fetching pod %s; phase= %v", p.Name, p.Status.Phase)
|
||||
}
|
||||
_, err = hasPressureCondition(f, conditionType, testCondition)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
By("checking eviction ordering and ensuring important pods dont fail")
|
||||
done := true
|
||||
for _, priorityPodSpec := range podTestSpecs {
|
||||
var priorityPod v1.Pod
|
||||
for _, p := range updatedPods {
|
||||
if p.Name == priorityPodSpec.pod.Name {
|
||||
priorityPod = p
|
||||
}
|
||||
}
|
||||
Expect(priorityPod).NotTo(BeNil())
|
||||
|
||||
// Check eviction ordering.
|
||||
// Note: it is alright for a priority 1 and priority 2 pod (for example) to fail in the same round
|
||||
for _, lowPriorityPodSpec := range podTestSpecs {
|
||||
var lowPriorityPod v1.Pod
|
||||
for _, p := range updatedPods {
|
||||
if p.Name == lowPriorityPodSpec.pod.Name {
|
||||
lowPriorityPod = p
|
||||
}
|
||||
}
|
||||
Expect(lowPriorityPod).NotTo(BeNil())
|
||||
if priorityPodSpec.evictionPriority < lowPriorityPodSpec.evictionPriority && lowPriorityPod.Status.Phase == v1.PodRunning {
|
||||
Expect(priorityPod.Status.Phase).NotTo(Equal(v1.PodFailed),
|
||||
fmt.Sprintf("%s pod failed before %s pod", priorityPodSpec.pod.Name, lowPriorityPodSpec.pod.Name))
|
||||
}
|
||||
}
|
||||
|
||||
// EvictionPriority 0 pods should not fail
|
||||
if priorityPodSpec.evictionPriority == 0 {
|
||||
Expect(priorityPod.Status.Phase).NotTo(Equal(v1.PodFailed),
|
||||
fmt.Sprintf("%s pod failed (and shouldn't have failed)", priorityPod.Name))
|
||||
}
|
||||
|
||||
// If a pod that is not evictionPriority 0 has not been evicted, we are not done
|
||||
if priorityPodSpec.evictionPriority != 0 && priorityPod.Status.Phase != v1.PodFailed {
|
||||
done = false
|
||||
}
|
||||
}
|
||||
if done {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("pods that caused %s have not been evicted.", testCondition)
|
||||
}, evictionTestTimeout, evictionPollInterval).Should(BeNil())
|
||||
|
||||
// We observe pressure from the API server. The eviction manager observes pressure from the kubelet internal stats.
|
||||
// This means the eviction manager will observe pressure before we will, creating a delay between when the eviction manager
|
||||
// evicts a pod, and when we observe the pressure by querrying the API server. Add a delay here to account for this delay
|
||||
By("making sure pressure from test has surfaced before continuing")
|
||||
time.Sleep(pressureDelay)
|
||||
|
||||
By("making sure conditions eventually return to normal")
|
||||
Eventually(func() error {
|
||||
hasPressure, err := hasPressureCondition(f, conditionType, testCondition)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if hasPressure {
|
||||
return fmt.Errorf("Conditions havent returned to normal, we still have %s", testCondition)
|
||||
}
|
||||
return nil
|
||||
}, evictionTestTimeout, evictionPollInterval).Should(BeNil())
|
||||
|
||||
By("making sure conditions do not return, and that pods that shouldnt fail dont fail")
|
||||
Consistently(func() error {
|
||||
hasPressure, err := hasPressureCondition(f, conditionType, testCondition)
|
||||
if err != nil {
|
||||
// Race conditions sometimes occur when checking pressure condition due to #38710 (Docker bug)
|
||||
// Do not fail the test when this occurs, since this is expected to happen occasionally.
|
||||
framework.Logf("Failed to check pressure condition. Error: %v", err)
|
||||
return nil
|
||||
}
|
||||
if hasPressure {
|
||||
return fmt.Errorf("%s dissappeared and then reappeared", testCondition)
|
||||
}
|
||||
// Gather current information
|
||||
updatedPodList, _ := f.ClientSet.Core().Pods(f.Namespace.Name).List(metav1.ListOptions{})
|
||||
for _, priorityPodSpec := range podTestSpecs {
|
||||
// EvictionPriority 0 pods should not fail
|
||||
if priorityPodSpec.evictionPriority == 0 {
|
||||
for _, p := range updatedPodList.Items {
|
||||
if p.Name == priorityPodSpec.pod.Name && p.Status.Phase == v1.PodFailed {
|
||||
return fmt.Errorf("%s pod failed (delayed) and shouldn't have failed", p.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}, postTestConditionMonitoringPeriod, evictionPollInterval).Should(BeNil())
|
||||
|
||||
By("making sure we can start a new pod after the test")
|
||||
podName := "test-admit-pod"
|
||||
f.PodClient().CreateSync(&v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: podName,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
RestartPolicy: v1.RestartPolicyNever,
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Image: framework.GetPauseImageNameForHostArch(),
|
||||
Name: podName,
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
By("deleting pods")
|
||||
for _, spec := range *podTestSpecsP {
|
||||
By(fmt.Sprintf("deleting pod: %s", spec.pod.Name))
|
||||
f.PodClient().DeleteSync(spec.pod.Name, &metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout)
|
||||
}
|
||||
|
||||
if CurrentGinkgoTestDescription().Failed {
|
||||
if framework.TestContext.DumpLogsOnFailure {
|
||||
logPodEvents(f)
|
||||
logNodeEvents(f)
|
||||
}
|
||||
By("sleeping to allow for cleanup of test")
|
||||
time.Sleep(postTestConditionMonitoringPeriod)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func getDiskUsage() (float64, error) {
|
||||
summary, err := getNodeSummary()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if nodeFs := summary.Node.Fs; nodeFs != nil {
|
||||
return float64(*nodeFs.AvailableBytes), nil
|
||||
}
|
||||
|
||||
return 0, fmt.Errorf("fail to get nodefs available bytes")
|
||||
|
||||
}
|
Reference in New Issue
Block a user