Merge pull request #48636 from jingxu97/July/allocatable

Automatic merge from submit-queue (batch tested with PRs 48636, 49088, 49251, 49417, 49494)

Fix issues for local storage allocatable feature

This PR fixes the following issues:
1. Use ResourceStorageScratch instead of ResourceStorage API to represent
local storage capacity
2. In eviction manager, use container manager instead of node provider
(kubelet) to retrieve the node capacity and reserved resources. Node
provider (kubelet) has a feature gate so that storagescratch information
may not be exposed if feature gate is not set. On the other hand,
container manager has all the capacity and allocatable resource
information.

This PR fixes issue #47809
This commit is contained in:
Kubernetes Submit Queue 2017-07-24 19:30:33 -07:00 committed by GitHub
commit e623fed778
11 changed files with 125 additions and 101 deletions

View File

@ -944,7 +944,12 @@ func parseResourceList(m componentconfig.ConfigurationMap) (v1.ResourceList, err
if q.Sign() == -1 {
return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
}
rl[v1.ResourceName(k)] = q
// storage specified in configuration map is mapped to ResourceStorageScratch API
if v1.ResourceName(k) == v1.ResourceStorage {
rl[v1.ResourceStorageScratch] = q
} else {
rl[v1.ResourceName(k)] = q
}
default:
return nil, fmt.Errorf("cannot reserve %q resource", k)
}

View File

@ -217,6 +217,10 @@ func hardEvictionReservation(thresholds []evictionapi.Threshold, capacity v1.Res
memoryCapacity := capacity[v1.ResourceMemory]
value := evictionapi.GetThresholdQuantity(threshold.Value, &memoryCapacity)
ret[v1.ResourceMemory] = *value
case evictionapi.SignalNodeFsAvailable:
storageCapacity := capacity[v1.ResourceStorageScratch]
value := evictionapi.GetThresholdQuantity(threshold.Value, &storageCapacity)
ret[v1.ResourceStorageScratch] = *value
}
}
return ret

View File

@ -148,11 +148,11 @@ func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAd
}
// Start starts the control loop to observe and response to low compute resources.
func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, nodeProvider NodeProvider, monitoringInterval time.Duration) {
func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, capacityProvider CapacityProvider, monitoringInterval time.Duration) {
// start the eviction manager monitoring
go func() {
for {
if evictedPods := m.synchronize(diskInfoProvider, podFunc, nodeProvider); evictedPods != nil {
if evictedPods := m.synchronize(diskInfoProvider, podFunc, capacityProvider); evictedPods != nil {
glog.Infof("eviction manager: pods %s evicted, waiting for pod to be cleaned up", format.Pods(evictedPods))
m.waitForPodsCleanup(podCleanedUpFunc, evictedPods)
} else {
@ -211,7 +211,7 @@ func startMemoryThresholdNotifier(thresholds []evictionapi.Threshold, observatio
// synchronize is the main control loop that enforces eviction thresholds.
// Returns the pod that was killed, or nil if no pod was killed.
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, nodeProvider NodeProvider) []*v1.Pod {
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, capacityProvider CapacityProvider) []*v1.Pod {
// if we have nothing to do, just return
thresholds := m.config.Thresholds
if len(thresholds) == 0 {
@ -233,7 +233,7 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
activePods := podFunc()
// make observations and get a function to derive pod usage stats relative to those observations.
observations, statsFunc, err := makeSignalObservations(m.summaryProvider, nodeProvider, activePods, *m.dedicatedImageFs)
observations, statsFunc, err := makeSignalObservations(m.summaryProvider, capacityProvider, activePods, *m.dedicatedImageFs)
if err != nil {
glog.Errorf("eviction manager: unexpected err: %v", err)
return nil
@ -248,7 +248,7 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
err = startMemoryThresholdNotifier(m.config.Thresholds, observations, false, func(desc string) {
glog.Infof("soft memory eviction threshold crossed at %s", desc)
// TODO wait grace period for soft memory limit
m.synchronize(diskInfoProvider, podFunc, nodeProvider)
m.synchronize(diskInfoProvider, podFunc, capacityProvider)
})
if err != nil {
glog.Warningf("eviction manager: failed to create hard memory threshold notifier: %v", err)
@ -256,7 +256,7 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
// start hard memory notification
err = startMemoryThresholdNotifier(m.config.Thresholds, observations, true, func(desc string) {
glog.Infof("hard memory eviction threshold crossed at %s", desc)
m.synchronize(diskInfoProvider, podFunc, nodeProvider)
m.synchronize(diskInfoProvider, podFunc, capacityProvider)
})
if err != nil {
glog.Warningf("eviction manager: failed to create soft memory threshold notifier: %v", err)

View File

@ -59,22 +59,25 @@ func (m *mockDiskInfoProvider) HasDedicatedImageFs() (bool, error) {
return m.dedicatedImageFs, nil
}
func newMockNodeProvider(allocatableCapacity v1.ResourceList) *mockNodeProvider {
return &mockNodeProvider{
node: v1.Node{
Status: v1.NodeStatus{
Allocatable: allocatableCapacity,
},
},
func newMockCapacityProvider(capacity, reservation v1.ResourceList) *mockCapacityProvider {
return &mockCapacityProvider{
capacity: capacity,
reservation: reservation,
}
}
type mockNodeProvider struct {
node v1.Node
type mockCapacityProvider struct {
capacity v1.ResourceList
reservation v1.ResourceList
}
func (m *mockNodeProvider) GetNode() (*v1.Node, error) {
return &m.node, nil
func (m *mockCapacityProvider) GetCapacity() v1.ResourceList {
return m.capacity
}
func (m *mockCapacityProvider) GetNodeAllocatableReservation() v1.ResourceList {
return m.reservation
}
// mockDiskGC is used to simulate invoking image and container garbage collection.
@ -200,7 +203,7 @@ func TestMemoryPressure(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
podKiller := &mockPodKiller{}
diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false}
nodeProvider := newMockNodeProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("2Gi")})
capacityProvider := newMockCapacityProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("3Gi")}, v1.ResourceList{v1.ResourceMemory: *quantityMustParse("1Gi")})
imageGC := &mockDiskGC{imageBytesFreed: int64(0), err: nil}
nodeRef := &clientv1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
@ -243,7 +246,7 @@ func TestMemoryPressure(t *testing.T) {
burstablePodToAdmit, _ := podMaker("burst-admit", newResourceList("100m", "100Mi"), newResourceList("200m", "200Mi"), "0Gi")
// synchronize
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should not have memory pressure
if manager.IsUnderMemoryPressure() {
@ -261,7 +264,7 @@ func TestMemoryPressure(t *testing.T) {
// induce soft threshold
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("1500Mi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have memory pressure
if !manager.IsUnderMemoryPressure() {
@ -276,7 +279,7 @@ func TestMemoryPressure(t *testing.T) {
// step forward in time pass the grace period
fakeClock.Step(3 * time.Minute)
summaryProvider.result = summaryStatsMaker("1500Mi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have memory pressure
if !manager.IsUnderMemoryPressure() {
@ -301,7 +304,7 @@ func TestMemoryPressure(t *testing.T) {
// remove memory pressure
fakeClock.Step(20 * time.Minute)
summaryProvider.result = summaryStatsMaker("3Gi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should not have memory pressure
if manager.IsUnderMemoryPressure() {
@ -311,7 +314,7 @@ func TestMemoryPressure(t *testing.T) {
// induce memory pressure!
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("500Mi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have memory pressure
if !manager.IsUnderMemoryPressure() {
@ -339,7 +342,7 @@ func TestMemoryPressure(t *testing.T) {
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("2Gi", podStats)
podKiller.pod = nil // reset state
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have memory pressure (because transition period not yet met)
if !manager.IsUnderMemoryPressure() {
@ -363,7 +366,7 @@ func TestMemoryPressure(t *testing.T) {
fakeClock.Step(5 * time.Minute)
summaryProvider.result = summaryStatsMaker("2Gi", podStats)
podKiller.pod = nil // reset state
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should not have memory pressure (because transition period met)
if manager.IsUnderMemoryPressure() {
@ -418,7 +421,7 @@ func TestDiskPressureNodeFs(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
podKiller := &mockPodKiller{}
diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false}
nodeProvider := newMockNodeProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("2Gi")})
capacityProvider := newMockCapacityProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("3Gi")}, v1.ResourceList{v1.ResourceMemory: *quantityMustParse("1Gi")})
diskGC := &mockDiskGC{imageBytesFreed: int64(0), err: nil}
nodeRef := &clientv1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
@ -461,7 +464,7 @@ func TestDiskPressureNodeFs(t *testing.T) {
podToAdmit, _ := podMaker("pod-to-admit", newResourceList("", ""), newResourceList("", ""), "0Gi", "0Gi", "0Gi")
// synchronize
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should not have disk pressure
if manager.IsUnderDiskPressure() {
@ -476,7 +479,7 @@ func TestDiskPressureNodeFs(t *testing.T) {
// induce soft threshold
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("1.5Gi", "200Gi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have disk pressure
if !manager.IsUnderDiskPressure() {
@ -491,7 +494,7 @@ func TestDiskPressureNodeFs(t *testing.T) {
// step forward in time pass the grace period
fakeClock.Step(3 * time.Minute)
summaryProvider.result = summaryStatsMaker("1.5Gi", "200Gi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have disk pressure
if !manager.IsUnderDiskPressure() {
@ -516,7 +519,7 @@ func TestDiskPressureNodeFs(t *testing.T) {
// remove disk pressure
fakeClock.Step(20 * time.Minute)
summaryProvider.result = summaryStatsMaker("16Gi", "200Gi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should not have disk pressure
if manager.IsUnderDiskPressure() {
@ -526,7 +529,7 @@ func TestDiskPressureNodeFs(t *testing.T) {
// induce disk pressure!
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("500Mi", "200Gi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have disk pressure
if !manager.IsUnderDiskPressure() {
@ -551,7 +554,7 @@ func TestDiskPressureNodeFs(t *testing.T) {
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("16Gi", "200Gi", podStats)
podKiller.pod = nil // reset state
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have disk pressure (because transition period not yet met)
if !manager.IsUnderDiskPressure() {
@ -572,7 +575,7 @@ func TestDiskPressureNodeFs(t *testing.T) {
fakeClock.Step(5 * time.Minute)
summaryProvider.result = summaryStatsMaker("16Gi", "200Gi", podStats)
podKiller.pod = nil // reset state
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should not have disk pressure (because transition period met)
if manager.IsUnderDiskPressure() {
@ -617,7 +620,7 @@ func TestMinReclaim(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
podKiller := &mockPodKiller{}
diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false}
nodeProvider := newMockNodeProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("2Gi")})
capacityProvider := newMockCapacityProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("3Gi")}, v1.ResourceList{v1.ResourceMemory: *quantityMustParse("1Gi")})
diskGC := &mockDiskGC{imageBytesFreed: int64(0), err: nil}
nodeRef := &clientv1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
@ -652,7 +655,7 @@ func TestMinReclaim(t *testing.T) {
}
// synchronize
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should not have memory pressure
if manager.IsUnderMemoryPressure() {
@ -662,7 +665,7 @@ func TestMinReclaim(t *testing.T) {
// induce memory pressure!
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("500Mi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have memory pressure
if !manager.IsUnderMemoryPressure() {
@ -682,7 +685,7 @@ func TestMinReclaim(t *testing.T) {
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("1.2Gi", podStats)
podKiller.pod = nil // reset state
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have memory pressure (because transition period not yet met)
if !manager.IsUnderMemoryPressure() {
@ -702,7 +705,7 @@ func TestMinReclaim(t *testing.T) {
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("2Gi", podStats)
podKiller.pod = nil // reset state
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have memory pressure (because transition period not yet met)
if !manager.IsUnderMemoryPressure() {
@ -718,7 +721,7 @@ func TestMinReclaim(t *testing.T) {
fakeClock.Step(5 * time.Minute)
summaryProvider.result = summaryStatsMaker("2Gi", podStats)
podKiller.pod = nil // reset state
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should not have memory pressure (because transition period met)
if manager.IsUnderMemoryPressure() {
@ -757,7 +760,7 @@ func TestNodeReclaimFuncs(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
podKiller := &mockPodKiller{}
diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false}
nodeProvider := newMockNodeProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("2Gi")})
capacityProvider := newMockCapacityProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("3Gi")}, v1.ResourceList{v1.ResourceMemory: *quantityMustParse("1Gi")})
imageGcFree := resource.MustParse("700Mi")
diskGC := &mockDiskGC{imageBytesFreed: imageGcFree.Value(), err: nil}
nodeRef := &clientv1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
@ -793,7 +796,7 @@ func TestNodeReclaimFuncs(t *testing.T) {
}
// synchronize
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should not have disk pressure
if manager.IsUnderDiskPressure() {
@ -803,7 +806,7 @@ func TestNodeReclaimFuncs(t *testing.T) {
// induce hard threshold
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker(".9Gi", "200Gi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have disk pressure
if !manager.IsUnderDiskPressure() {
@ -827,7 +830,7 @@ func TestNodeReclaimFuncs(t *testing.T) {
// remove disk pressure
fakeClock.Step(20 * time.Minute)
summaryProvider.result = summaryStatsMaker("16Gi", "200Gi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should not have disk pressure
if manager.IsUnderDiskPressure() {
@ -837,7 +840,7 @@ func TestNodeReclaimFuncs(t *testing.T) {
// induce disk pressure!
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("400Mi", "200Gi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have disk pressure
if !manager.IsUnderDiskPressure() {
@ -864,7 +867,7 @@ func TestNodeReclaimFuncs(t *testing.T) {
diskGC.imageGCInvoked = false // reset state
diskGC.containerGCInvoked = false // reset state
podKiller.pod = nil // reset state
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have disk pressure (because transition period not yet met)
if !manager.IsUnderDiskPressure() {
@ -887,7 +890,7 @@ func TestNodeReclaimFuncs(t *testing.T) {
diskGC.imageGCInvoked = false // reset state
diskGC.containerGCInvoked = false // reset state
podKiller.pod = nil // reset state
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should not have disk pressure (because transition period met)
if manager.IsUnderDiskPressure() {
@ -955,7 +958,7 @@ func TestInodePressureNodeFsInodes(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
podKiller := &mockPodKiller{}
diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false}
nodeProvider := newMockNodeProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("2Gi")})
capacityProvider := newMockCapacityProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("3Gi")}, v1.ResourceList{v1.ResourceMemory: *quantityMustParse("1Gi")})
diskGC := &mockDiskGC{imageBytesFreed: int64(0), err: nil}
nodeRef := &clientv1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
@ -998,7 +1001,7 @@ func TestInodePressureNodeFsInodes(t *testing.T) {
podToAdmit, _ := podMaker("pod-to-admit", newResourceList("", ""), newResourceList("", ""), "0", "0", "0")
// synchronize
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should not have disk pressure
if manager.IsUnderDiskPressure() {
@ -1013,7 +1016,7 @@ func TestInodePressureNodeFsInodes(t *testing.T) {
// induce soft threshold
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("1.5Mi", "4Mi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have disk pressure
if !manager.IsUnderDiskPressure() {
@ -1028,7 +1031,7 @@ func TestInodePressureNodeFsInodes(t *testing.T) {
// step forward in time pass the grace period
fakeClock.Step(3 * time.Minute)
summaryProvider.result = summaryStatsMaker("1.5Mi", "4Mi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have disk pressure
if !manager.IsUnderDiskPressure() {
@ -1053,7 +1056,7 @@ func TestInodePressureNodeFsInodes(t *testing.T) {
// remove inode pressure
fakeClock.Step(20 * time.Minute)
summaryProvider.result = summaryStatsMaker("3Mi", "4Mi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should not have disk pressure
if manager.IsUnderDiskPressure() {
@ -1063,7 +1066,7 @@ func TestInodePressureNodeFsInodes(t *testing.T) {
// induce inode pressure!
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("0.5Mi", "4Mi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have disk pressure
if !manager.IsUnderDiskPressure() {
@ -1088,7 +1091,7 @@ func TestInodePressureNodeFsInodes(t *testing.T) {
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("3Mi", "4Mi", podStats)
podKiller.pod = nil // reset state
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have disk pressure (because transition period not yet met)
if !manager.IsUnderDiskPressure() {
@ -1109,7 +1112,7 @@ func TestInodePressureNodeFsInodes(t *testing.T) {
fakeClock.Step(5 * time.Minute)
summaryProvider.result = summaryStatsMaker("3Mi", "4Mi", podStats)
podKiller.pod = nil // reset state
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should not have disk pressure (because transition period met)
if manager.IsUnderDiskPressure() {
@ -1157,7 +1160,7 @@ func TestCriticalPodsAreNotEvicted(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
podKiller := &mockPodKiller{}
diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false}
nodeProvider := newMockNodeProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("2Gi")})
capacityProvider := newMockCapacityProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("3Gi")}, v1.ResourceList{v1.ResourceMemory: *quantityMustParse("1Gi")})
diskGC := &mockDiskGC{imageBytesFreed: int64(0), err: nil}
nodeRef := &clientv1.ObjectReference{
Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: "",
@ -1203,7 +1206,7 @@ func TestCriticalPodsAreNotEvicted(t *testing.T) {
// induce soft threshold
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("1500Mi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have memory pressure
if !manager.IsUnderMemoryPressure() {
@ -1218,7 +1221,7 @@ func TestCriticalPodsAreNotEvicted(t *testing.T) {
// step forward in time pass the grace period
fakeClock.Step(3 * time.Minute)
summaryProvider.result = summaryStatsMaker("1500Mi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have memory pressure
if !manager.IsUnderMemoryPressure() {
@ -1236,7 +1239,7 @@ func TestCriticalPodsAreNotEvicted(t *testing.T) {
// remove memory pressure
fakeClock.Step(20 * time.Minute)
summaryProvider.result = summaryStatsMaker("3Gi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should not have memory pressure
if manager.IsUnderMemoryPressure() {
@ -1249,7 +1252,7 @@ func TestCriticalPodsAreNotEvicted(t *testing.T) {
// induce memory pressure!
fakeClock.Step(1 * time.Minute)
summaryProvider.result = summaryStatsMaker("500Mi", podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have memory pressure
if !manager.IsUnderMemoryPressure() {
@ -1290,7 +1293,7 @@ func TestAllocatableMemoryPressure(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
podKiller := &mockPodKiller{}
diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false}
nodeProvider := newMockNodeProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("2Gi")})
capacityProvider := newMockCapacityProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("3Gi")}, v1.ResourceList{v1.ResourceMemory: *quantityMustParse("1Gi")})
diskGC := &mockDiskGC{imageBytesFreed: int64(0), err: nil}
nodeRef := &clientv1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
@ -1326,7 +1329,7 @@ func TestAllocatableMemoryPressure(t *testing.T) {
burstablePodToAdmit, _ := podMaker("burst-admit", newResourceList("100m", "100Mi"), newResourceList("200m", "200Mi"), "0Gi")
// synchronize
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should not have memory pressure
if manager.IsUnderMemoryPressure() {
@ -1346,7 +1349,7 @@ func TestAllocatableMemoryPressure(t *testing.T) {
pod, podStat := podMaker("guaranteed-high-2", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi"), "1Gi")
podStats[pod] = podStat
summaryProvider.result = summaryStatsMaker(constantCapacity, podStats)
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have memory pressure
if !manager.IsUnderMemoryPressure() {
@ -1382,7 +1385,7 @@ func TestAllocatableMemoryPressure(t *testing.T) {
}
summaryProvider.result = summaryStatsMaker(constantCapacity, podStats)
podKiller.pod = nil // reset state
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should have memory pressure (because transition period not yet met)
if !manager.IsUnderMemoryPressure() {
@ -1406,7 +1409,7 @@ func TestAllocatableMemoryPressure(t *testing.T) {
fakeClock.Step(5 * time.Minute)
summaryProvider.result = summaryStatsMaker(constantCapacity, podStats)
podKiller.pod = nil // reset state
manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider)
manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider)
// we should not have memory pressure (because transition period met)
if manager.IsUnderMemoryPressure() {

View File

@ -658,16 +658,11 @@ func (a byEvictionPriority) Less(i, j int) bool {
}
// makeSignalObservations derives observations using the specified summary provider.
func makeSignalObservations(summaryProvider stats.SummaryProvider, nodeProvider NodeProvider, pods []*v1.Pod, withImageFs bool) (signalObservations, statsFunc, error) {
func makeSignalObservations(summaryProvider stats.SummaryProvider, capacityProvider CapacityProvider, pods []*v1.Pod, withImageFs bool) (signalObservations, statsFunc, error) {
summary, err := summaryProvider.Get()
if err != nil {
return nil, nil, err
}
node, err := nodeProvider.GetNode()
if err != nil {
return nil, nil, err
}
// build the function to work against for pod stats
statsFunc := cachedStatsFunc(summary.Pods)
// build an evaluation context for current eviction signals
@ -714,8 +709,12 @@ func makeSignalObservations(summaryProvider stats.SummaryProvider, nodeProvider
}
}
}
if memoryAllocatableCapacity, ok := node.Status.Allocatable[v1.ResourceMemory]; ok {
memoryAllocatableAvailable := memoryAllocatableCapacity.Copy()
nodeCapacity := capacityProvider.GetCapacity()
allocatableReservation := capacityProvider.GetNodeAllocatableReservation()
memoryAllocatableCapacity, memoryAllocatableAvailable, exist := getResourceAllocatable(nodeCapacity, allocatableReservation, v1.ResourceMemory)
if exist {
for _, pod := range summary.Pods {
mu, err := podMemoryUsage(pod)
if err == nil {
@ -724,12 +723,12 @@ func makeSignalObservations(summaryProvider stats.SummaryProvider, nodeProvider
}
result[evictionapi.SignalAllocatableMemoryAvailable] = signalObservation{
available: memoryAllocatableAvailable,
capacity: memoryAllocatableCapacity.Copy(),
capacity: memoryAllocatableCapacity,
}
}
if storageScratchAllocatableCapacity, ok := node.Status.Allocatable[v1.ResourceStorage]; ok {
storageScratchAllocatable := storageScratchAllocatableCapacity.Copy()
storageScratchCapacity, storageScratchAllocatable, exist := getResourceAllocatable(nodeCapacity, allocatableReservation, v1.ResourceStorageScratch)
if exist {
for _, pod := range pods {
podStat, ok := statsFunc(pod)
if !ok {
@ -754,13 +753,25 @@ func makeSignalObservations(summaryProvider stats.SummaryProvider, nodeProvider
}
result[evictionapi.SignalAllocatableNodeFsAvailable] = signalObservation{
available: storageScratchAllocatable,
capacity: storageScratchAllocatableCapacity.Copy(),
capacity: storageScratchCapacity,
}
}
return result, statsFunc, nil
}
func getResourceAllocatable(capacity v1.ResourceList, reservation v1.ResourceList, resourceName v1.ResourceName) (*resource.Quantity, *resource.Quantity, bool) {
if capacity, ok := capacity[resourceName]; ok {
allocate := capacity.Copy()
if reserved, exists := reservation[resourceName]; exists {
allocate.Sub(reserved)
}
return capacity.Copy(), allocate, true
}
glog.Errorf("Could not find capacity information for resource %v", resourceName)
return nil, nil, false
}
// thresholdsMet returns the set of thresholds that were met independent of grace period
func thresholdsMet(thresholds []evictionapi.Threshold, observations signalObservations, enforceMinReclaim bool) []evictionapi.Threshold {
results := []evictionapi.Threshold{}

View File

@ -782,12 +782,12 @@ func TestMakeSignalObservations(t *testing.T) {
fakeStats.Pods = append(fakeStats.Pods, newPodStats(pod, containerWorkingSetBytes))
}
res := quantityMustParse("5Gi")
nodeProvider := newMockNodeProvider(v1.ResourceList{v1.ResourceMemory: *res})
capacityProvider := newMockCapacityProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("5Gi")}, v1.ResourceList{v1.ResourceMemory: *quantityMustParse("0Gi")})
// Allocatable thresholds are always 100%. Verify that Threshold == Capacity.
if res.CmpInt64(int64(allocatableMemoryCapacity)) != 0 {
t.Errorf("Expected Threshold %v to be equal to value %v", res.Value(), allocatableMemoryCapacity)
}
actualObservations, statsFunc, err := makeSignalObservations(provider, nodeProvider, pods, false)
actualObservations, statsFunc, err := makeSignalObservations(provider, capacityProvider, pods, false)
if err != nil {
t.Errorf("Unexpected err: %v", err)
}

View File

@ -53,7 +53,7 @@ type Config struct {
// Manager evaluates when an eviction threshold for node stability has been met on the node.
type Manager interface {
// Start starts the control loop to monitor eviction thresholds at specified interval.
Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, nodeProvider NodeProvider, monitoringInterval time.Duration)
Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, capacityProvider CapacityProvider, monitoringInterval time.Duration)
// IsUnderMemoryPressure returns true if the node is under memory pressure.
IsUnderMemoryPressure() bool
@ -68,10 +68,12 @@ type DiskInfoProvider interface {
HasDedicatedImageFs() (bool, error)
}
// NodeProvider is responsible for providing the node api object describing this node
type NodeProvider interface {
// GetNode returns the node info for this node
GetNode() (*v1.Node, error)
// CapacityProvider is responsible for providing the resource capacity and reservation information
type CapacityProvider interface {
// GetCapacity returns the amount of compute resources tracked by container manager available on the node.
GetCapacity() v1.ResourceList
// GetNodeAllocatable returns the amount of compute resources that have to be reserved from scheduling.
GetNodeAllocatableReservation() v1.ResourceList
}
// ImageGC is responsible for performing garbage collection of unused images.

View File

@ -1253,7 +1253,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
glog.Fatalf("Failed to start cAdvisor %v", err)
}
// eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
kl.evictionManager.Start(kl.cadvisor, kl.GetActivePods, kl.podResourcesAreReclaimed, kl, evictionMonitoringPeriod)
kl.evictionManager.Start(kl.cadvisor, kl.GetActivePods, kl.podResourcesAreReclaimed, kl.containerManager, evictionMonitoringPeriod)
}
// Run starts the kubelet reacting to config updates

View File

@ -80,24 +80,24 @@ var (
func makeResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA, storage int64) v1.NodeResources {
return v1.NodeResources{
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI),
v1.ResourceNvidiaGPU: *resource.NewQuantity(nvidiaGPUs, resource.DecimalSI),
opaqueResourceA: *resource.NewQuantity(opaqueA, resource.DecimalSI),
v1.ResourceStorage: *resource.NewQuantity(storage, resource.BinarySI),
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI),
v1.ResourceNvidiaGPU: *resource.NewQuantity(nvidiaGPUs, resource.DecimalSI),
opaqueResourceA: *resource.NewQuantity(opaqueA, resource.DecimalSI),
v1.ResourceStorageScratch: *resource.NewQuantity(storage, resource.BinarySI),
},
}
}
func makeAllocatableResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA, storage int64) v1.ResourceList {
return v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI),
v1.ResourceNvidiaGPU: *resource.NewQuantity(nvidiaGPUs, resource.DecimalSI),
opaqueResourceA: *resource.NewQuantity(opaqueA, resource.DecimalSI),
v1.ResourceStorage: *resource.NewQuantity(storage, resource.BinarySI),
v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
v1.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI),
v1.ResourceNvidiaGPU: *resource.NewQuantity(nvidiaGPUs, resource.DecimalSI),
opaqueResourceA: *resource.NewQuantity(opaqueA, resource.DecimalSI),
v1.ResourceStorageScratch: *resource.NewQuantity(storage, resource.BinarySI),
}
}

View File

@ -97,7 +97,7 @@ func (r *Resource) Add(rl v1.ResourceList) {
r.NvidiaGPU += rQuant.Value()
case v1.ResourcePods:
r.AllowedPodNumber += int(rQuant.Value())
case v1.ResourceStorage:
case v1.ResourceStorageScratch:
r.StorageScratch += rQuant.Value()
case v1.ResourceStorageOverlay:
r.StorageOverlay += rQuant.Value()
@ -116,7 +116,7 @@ func (r *Resource) ResourceList() v1.ResourceList {
v1.ResourceNvidiaGPU: *resource.NewQuantity(r.NvidiaGPU, resource.DecimalSI),
v1.ResourcePods: *resource.NewQuantity(int64(r.AllowedPodNumber), resource.BinarySI),
v1.ResourceStorageOverlay: *resource.NewQuantity(r.StorageOverlay, resource.BinarySI),
v1.ResourceStorage: *resource.NewQuantity(r.StorageScratch, resource.BinarySI),
v1.ResourceStorageScratch: *resource.NewQuantity(r.StorageScratch, resource.BinarySI),
}
for rName, rQuant := range r.OpaqueIntResources {
result[rName] = *resource.NewQuantity(rQuant, resource.DecimalSI)

View File

@ -52,8 +52,7 @@ var _ = framework.KubeDescribe("LocalStorageAllocatableEviction [Slow] [Serial]
diskReserve = uint64(0.8 * diskAvail / 1000000) // Reserve 0.8 * disk Capacity for kube-reserved scratch storage
maxDisk := 10000000 // Set dd command to read and write up to 10MB at a time
count := uint64(0.8 * diskAvail / float64(maxDisk))
command := fmt.Sprintf("dd if=/dev/urandom of=dummy bs=%d count=%d; sleep 0.5; while true; do sleep 5; done", maxDisk, count)
command := fmt.Sprintf("dd if=/dev/urandom of=dummy bs=%d count=%d; while true; do sleep 5; done", maxDisk, count)
podTestSpecs = []podTestSpec{
{
evictionPriority: 1, // This pod should be evicted before the innocent pod