Merge pull request #114418 from xuzhenglun/master
Reserve Nodeport Ranges For Dynamic And Static Port Allocation
This commit is contained in:
commit
5bb7326c36
@ -712,6 +712,13 @@ const (
|
||||
// Subdivide the ClusterIP range for dynamic and static IP allocation.
|
||||
ServiceIPStaticSubrange featuregate.Feature = "ServiceIPStaticSubrange"
|
||||
|
||||
// owner: @xuzhenglun
|
||||
// kep: http://kep.k8s.io/3682
|
||||
// alpha: v1.27
|
||||
//
|
||||
// Subdivide the NodePort range for dynamic and static port allocation.
|
||||
ServiceNodePortStaticSubrange featuregate.Feature = "ServiceNodePortStaticSubrange"
|
||||
|
||||
// owner: @derekwaynecarr
|
||||
// alpha: v1.20
|
||||
// beta: v1.22
|
||||
@ -1024,6 +1031,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
||||
|
||||
ServiceInternalTrafficPolicy: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.28
|
||||
|
||||
ServiceNodePortStaticSubrange: {Default: false, PreRelease: featuregate.Alpha},
|
||||
|
||||
SizeMemoryBackedVolumes: {Default: true, PreRelease: featuregate.Beta},
|
||||
|
||||
StatefulSetAutoDeletePVC: {Default: false, PreRelease: featuregate.Alpha},
|
||||
|
@ -237,8 +237,8 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
|
||||
}
|
||||
|
||||
var serviceNodePortRegistry rangeallocation.RangeRegistry
|
||||
serviceNodePortAllocator, err := portallocator.New(c.ServiceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) {
|
||||
mem := allocator.NewAllocationMap(max, rangeSpec)
|
||||
serviceNodePortAllocator, err := portallocator.New(c.ServiceNodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
|
||||
mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
|
||||
// TODO etcdallocator package to return a storage interface via the storageFactory
|
||||
etcd, err := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", serviceStorageConfig.ForResource(api.Resource("servicenodeportallocations")))
|
||||
if err != nil {
|
||||
@ -250,6 +250,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
|
||||
if err != nil {
|
||||
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster port allocator: %v", err)
|
||||
}
|
||||
serviceNodePortAllocator.EnableMetrics()
|
||||
restStorage.ServiceNodePortAllocator = serviceNodePortRegistry
|
||||
|
||||
controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)
|
||||
|
@ -21,10 +21,11 @@ import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/net"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
|
||||
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/klog/v2"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
|
||||
)
|
||||
|
||||
// Interface manages the allocation of ports out of a range. Interface
|
||||
@ -36,6 +37,7 @@ type Interface interface {
|
||||
ForEach(func(int))
|
||||
Has(int) bool
|
||||
Destroy()
|
||||
EnableMetrics()
|
||||
}
|
||||
|
||||
var (
|
||||
@ -56,28 +58,42 @@ type PortAllocator struct {
|
||||
portRange net.PortRange
|
||||
|
||||
alloc allocator.Interface
|
||||
|
||||
// metrics is a metrics recorder that can be disabled
|
||||
metrics metricsRecorderInterface
|
||||
}
|
||||
|
||||
// PortAllocator implements Interface and Snapshottable
|
||||
var _ Interface = &PortAllocator{}
|
||||
|
||||
// New creates a PortAllocator over a net.PortRange, calling allocatorFactory to construct the backing store.
|
||||
func New(pr net.PortRange, allocatorFactory allocator.AllocatorFactory) (*PortAllocator, error) {
|
||||
func New(pr net.PortRange, allocatorFactory allocator.AllocatorWithOffsetFactory) (*PortAllocator, error) {
|
||||
max := pr.Size
|
||||
rangeSpec := pr.String()
|
||||
|
||||
a := &PortAllocator{
|
||||
portRange: pr,
|
||||
metrics: &emptyMetricsRecorder{},
|
||||
}
|
||||
|
||||
var offset = 0
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceNodePortStaticSubrange) {
|
||||
offset = calculateRangeOffset(pr)
|
||||
}
|
||||
|
||||
var err error
|
||||
a.alloc, err = allocatorFactory(max, rangeSpec)
|
||||
a.alloc, err = allocatorFactory(max, rangeSpec, offset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return a, err
|
||||
}
|
||||
|
||||
// NewInMemory creates an in-memory allocator.
|
||||
func NewInMemory(pr net.PortRange) (*PortAllocator, error) {
|
||||
return New(pr, func(max int, rangeSpec string) (allocator.Interface, error) {
|
||||
return allocator.NewAllocationMap(max, rangeSpec), nil
|
||||
return New(pr, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
|
||||
return allocator.NewAllocationMapWithOffset(max, rangeSpec, offset), nil
|
||||
})
|
||||
}
|
||||
|
||||
@ -114,6 +130,9 @@ func (r *PortAllocator) Used() int {
|
||||
func (r *PortAllocator) Allocate(port int) error {
|
||||
ok, offset := r.contains(port)
|
||||
if !ok {
|
||||
// update metrics
|
||||
r.metrics.incrementAllocationErrors("static")
|
||||
|
||||
// include valid port range in error
|
||||
validPorts := r.portRange.String()
|
||||
return &ErrNotInRange{validPorts}
|
||||
@ -121,11 +140,21 @@ func (r *PortAllocator) Allocate(port int) error {
|
||||
|
||||
allocated, err := r.alloc.Allocate(offset)
|
||||
if err != nil {
|
||||
// update metrics
|
||||
r.metrics.incrementAllocationErrors("static")
|
||||
return err
|
||||
}
|
||||
if !allocated {
|
||||
// update metrics
|
||||
r.metrics.incrementAllocationErrors("static")
|
||||
return ErrAllocated
|
||||
}
|
||||
|
||||
// update metrics
|
||||
r.metrics.incrementAllocations("static")
|
||||
r.metrics.setAllocated(r.Used())
|
||||
r.metrics.setAvailable(r.Free())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -134,11 +163,19 @@ func (r *PortAllocator) Allocate(port int) error {
|
||||
func (r *PortAllocator) AllocateNext() (int, error) {
|
||||
offset, ok, err := r.alloc.AllocateNext()
|
||||
if err != nil {
|
||||
r.metrics.incrementAllocationErrors("dynamic")
|
||||
return 0, err
|
||||
}
|
||||
if !ok {
|
||||
r.metrics.incrementAllocationErrors("dynamic")
|
||||
return 0, ErrFull
|
||||
}
|
||||
|
||||
// update metrics
|
||||
r.metrics.incrementAllocations("dynamic")
|
||||
r.metrics.setAllocated(r.Used())
|
||||
r.metrics.setAvailable(r.Free())
|
||||
|
||||
return r.portRange.Base + offset, nil
|
||||
}
|
||||
|
||||
@ -159,7 +196,13 @@ func (r *PortAllocator) Release(port int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
return r.alloc.Release(offset)
|
||||
err := r.alloc.Release(offset)
|
||||
if err == nil {
|
||||
// update metrics
|
||||
r.metrics.setAllocated(r.Used())
|
||||
r.metrics.setAvailable(r.Free())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Has returns true if the provided port is already allocated and a call
|
||||
@ -213,3 +256,37 @@ func (r *PortAllocator) contains(port int) (bool, int) {
|
||||
func (r *PortAllocator) Destroy() {
|
||||
r.alloc.Destroy()
|
||||
}
|
||||
|
||||
// EnableMetrics enables metrics recording.
|
||||
func (r *PortAllocator) EnableMetrics() {
|
||||
registerMetrics()
|
||||
r.metrics = &metricsRecorder{}
|
||||
}
|
||||
|
||||
// calculateRangeOffset estimates the offset used on the range for statically allocation based on
|
||||
// the following formula `min(max($min, rangeSize/$step), $max)`, described as ~never less than
|
||||
// $min or more than $max, with a graduated step function between them~. The function returns 0
|
||||
// if any of the parameters is invalid.
|
||||
func calculateRangeOffset(pr net.PortRange) int {
|
||||
// default values for min(max($min, rangeSize/$step), $max)
|
||||
const (
|
||||
min = 16
|
||||
max = 128
|
||||
step = 32
|
||||
)
|
||||
|
||||
rangeSize := pr.Size
|
||||
// offset should always be smaller than the range size
|
||||
if rangeSize <= min {
|
||||
return 0
|
||||
}
|
||||
|
||||
offset := rangeSize / step
|
||||
if offset < min {
|
||||
return min
|
||||
}
|
||||
if offset > max {
|
||||
return max
|
||||
}
|
||||
return int(offset)
|
||||
}
|
||||
|
@ -17,13 +17,16 @@ limitations under the License.
|
||||
package portallocator
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
func TestAllocate(t *testing.T) {
|
||||
@ -118,6 +121,58 @@ func TestAllocate(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocateReserved(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceNodePortStaticSubrange, true)()
|
||||
|
||||
pr, err := net.ParsePortRange("30000-30128")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
r, err := NewInMemory(*pr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// allocate all ports on the dynamic block
|
||||
// dynamic block size is min(max(16,128/32),128) = 16
|
||||
dynamicOffset := calculateRangeOffset(*pr)
|
||||
dynamicBlockSize := pr.Size - dynamicOffset
|
||||
for i := 0; i < dynamicBlockSize; i++ {
|
||||
if _, err := r.AllocateNext(); err != nil {
|
||||
t.Errorf("Unexpected error trying to allocate: %v", err)
|
||||
}
|
||||
}
|
||||
for i := dynamicOffset; i < pr.Size; i++ {
|
||||
port := i + pr.Base
|
||||
if !r.Has(port) {
|
||||
t.Errorf("Port %d expected to be allocated", port)
|
||||
}
|
||||
}
|
||||
if f := r.Free(); f != dynamicOffset {
|
||||
t.Errorf("expected %d free ports, got %d", dynamicOffset, f)
|
||||
}
|
||||
// allocate all ports on the static block
|
||||
for i := 0; i < dynamicOffset; i++ {
|
||||
port := i + pr.Base
|
||||
if err := r.Allocate(port); err != nil {
|
||||
t.Errorf("Unexpected error trying to allocate Port %d: %v", port, err)
|
||||
}
|
||||
}
|
||||
if f := r.Free(); f != 0 {
|
||||
t.Errorf("expected free equal to 0 got: %d", f)
|
||||
}
|
||||
// release one port in the allocated block and another a new one randomly
|
||||
if err := r.Release(30053); err != nil {
|
||||
t.Fatalf("Unexpected error trying to release port 30053: %v", err)
|
||||
}
|
||||
if _, err := r.AllocateNext(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if f := r.Free(); f != 0 {
|
||||
t.Errorf("expected free equal to 0 got: %d", f)
|
||||
}
|
||||
}
|
||||
|
||||
func TestForEach(t *testing.T) {
|
||||
pr, err := net.ParsePortRange("10000-10200")
|
||||
if err != nil {
|
||||
@ -262,3 +317,358 @@ func TestNewFromSnapshot(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Test_calculateRangeOffset(t *testing.T) {
|
||||
type args struct {
|
||||
pr net.PortRange
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want int
|
||||
}{
|
||||
{
|
||||
name: "default node port range",
|
||||
args: args{
|
||||
pr: net.PortRange{
|
||||
Base: 30000,
|
||||
Size: 2768,
|
||||
},
|
||||
},
|
||||
want: 86,
|
||||
},
|
||||
{
|
||||
name: "very small node port range",
|
||||
args: args{
|
||||
pr: net.PortRange{
|
||||
Base: 30000,
|
||||
Size: 10,
|
||||
},
|
||||
},
|
||||
want: 0,
|
||||
},
|
||||
{
|
||||
name: "small node port range (lower boundary)",
|
||||
args: args{
|
||||
pr: net.PortRange{
|
||||
Base: 30000,
|
||||
Size: 16,
|
||||
},
|
||||
},
|
||||
want: 0,
|
||||
},
|
||||
{
|
||||
name: "small node port range",
|
||||
args: args{
|
||||
pr: net.PortRange{
|
||||
Base: 30000,
|
||||
Size: 128,
|
||||
},
|
||||
},
|
||||
want: 16,
|
||||
},
|
||||
{
|
||||
name: "medium node port range",
|
||||
args: args{
|
||||
pr: net.PortRange{
|
||||
Base: 30000,
|
||||
Size: 2048,
|
||||
},
|
||||
},
|
||||
want: 64,
|
||||
},
|
||||
{
|
||||
name: "large node port range (upper boundary)",
|
||||
args: args{
|
||||
pr: net.PortRange{
|
||||
Base: 30000,
|
||||
Size: 4096,
|
||||
},
|
||||
},
|
||||
want: 128,
|
||||
},
|
||||
{
|
||||
name: "large node port range",
|
||||
args: args{
|
||||
pr: net.PortRange{
|
||||
Base: 30000,
|
||||
Size: 8192,
|
||||
},
|
||||
},
|
||||
want: 128,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := calculateRangeOffset(tt.args.pr); got != tt.want {
|
||||
t.Errorf("calculateRangeOffset() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodePortMetrics(t *testing.T) {
|
||||
clearMetrics()
|
||||
// create node port allocator
|
||||
portRange := "30000-32767"
|
||||
pr, err := net.ParsePortRange(portRange)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
a, err := NewInMemory(*pr)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating nodeport allocator: %v", err)
|
||||
}
|
||||
a.EnableMetrics()
|
||||
|
||||
// Check initial state
|
||||
em := testMetrics{
|
||||
free: 0,
|
||||
used: 0,
|
||||
allocated: 0,
|
||||
errors: 0,
|
||||
}
|
||||
expectMetrics(t, em)
|
||||
|
||||
// allocate 2 ports
|
||||
found := sets.NewInt()
|
||||
for i := 0; i < 2; i++ {
|
||||
port, err := a.AllocateNext()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if found.Has(port) {
|
||||
t.Fatalf("already reserved: %d", port)
|
||||
}
|
||||
found.Insert(port)
|
||||
}
|
||||
|
||||
em = testMetrics{
|
||||
free: 2768 - 2,
|
||||
used: 2,
|
||||
allocated: 2,
|
||||
errors: 0,
|
||||
}
|
||||
expectMetrics(t, em)
|
||||
|
||||
// try to allocate the same ports
|
||||
for s := range found {
|
||||
if !a.Has(s) {
|
||||
t.Fatalf("missing: %d", s)
|
||||
}
|
||||
if err := a.Allocate(s); err != ErrAllocated {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
em = testMetrics{
|
||||
free: 2768 - 2,
|
||||
used: 2,
|
||||
allocated: 2,
|
||||
errors: 2,
|
||||
}
|
||||
expectMetrics(t, em)
|
||||
|
||||
// release the ports allocated
|
||||
for s := range found {
|
||||
if !a.Has(s) {
|
||||
t.Fatalf("missing: %d", s)
|
||||
}
|
||||
if err := a.Release(s); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
em = testMetrics{
|
||||
free: 2768,
|
||||
used: 0,
|
||||
allocated: 2,
|
||||
errors: 2,
|
||||
}
|
||||
expectMetrics(t, em)
|
||||
|
||||
// allocate 3000 ports for each allocator
|
||||
// the full range and 232 more (2768 + 232 = 3000)
|
||||
for i := 0; i < 3000; i++ {
|
||||
a.AllocateNext()
|
||||
}
|
||||
em = testMetrics{
|
||||
free: 0,
|
||||
used: 2768,
|
||||
allocated: 2768 + 2, // this is a counter, we already had 2 allocations and we did 2768 more
|
||||
errors: 232 + 2, // this is a counter, we already had 2 errors and we did 232 more
|
||||
}
|
||||
expectMetrics(t, em)
|
||||
}
|
||||
|
||||
func TestNodePortAllocatedMetrics(t *testing.T) {
|
||||
clearMetrics()
|
||||
|
||||
// create NodePort allocator
|
||||
portRange := "30000-32767"
|
||||
pr, err := net.ParsePortRange(portRange)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
a, err := NewInMemory(*pr)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating nodeport allocator: %v", err)
|
||||
}
|
||||
a.EnableMetrics()
|
||||
|
||||
em := testMetrics{
|
||||
free: 0,
|
||||
used: 0,
|
||||
allocated: 0,
|
||||
errors: 0,
|
||||
}
|
||||
expectMetrics(t, em)
|
||||
|
||||
// allocate 2 dynamic port
|
||||
found := sets.NewInt()
|
||||
for i := 0; i < 2; i++ {
|
||||
port, err := a.AllocateNext()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if found.Has(port) {
|
||||
t.Fatalf("already reserved: %d", port)
|
||||
}
|
||||
found.Insert(port)
|
||||
}
|
||||
|
||||
dynamicAllocated, err := testutil.GetCounterMetricValue(nodePortAllocations.WithLabelValues("dynamic"))
|
||||
if err != nil {
|
||||
t.Errorf("failed to get %s value, err: %v", nodePortAllocations.Name, err)
|
||||
}
|
||||
if dynamicAllocated != 2 {
|
||||
t.Fatalf("Expected 2 received %f", dynamicAllocated)
|
||||
}
|
||||
|
||||
// try to allocate the same ports
|
||||
for s := range found {
|
||||
if !a.Has(s) {
|
||||
t.Fatalf("missing: %d", s)
|
||||
}
|
||||
if err := a.Allocate(s); err != ErrAllocated {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
staticErrors, err := testutil.GetCounterMetricValue(nodePortAllocationErrors.WithLabelValues("static"))
|
||||
if err != nil {
|
||||
t.Errorf("failed to get %s value, err: %v", nodePortAllocationErrors.Name, err)
|
||||
}
|
||||
if staticErrors != 2 {
|
||||
t.Fatalf("Expected 2 received %f", staticErrors)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetricsDisabled(t *testing.T) {
|
||||
clearMetrics()
|
||||
|
||||
// create NodePort allocator
|
||||
portRange := "30000-32766"
|
||||
pr, err := net.ParsePortRange(portRange)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
a, err := NewInMemory(*pr)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating nodeport allocator: %v", err)
|
||||
}
|
||||
a.EnableMetrics()
|
||||
|
||||
// create metrics disabled allocator with same port range
|
||||
// this metrics should be ignored
|
||||
b, err := NewInMemory(*pr)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating nodeport allocator: %v", err)
|
||||
}
|
||||
|
||||
// Check initial state
|
||||
em := testMetrics{
|
||||
free: 0,
|
||||
used: 0,
|
||||
allocated: 0,
|
||||
errors: 0,
|
||||
}
|
||||
expectMetrics(t, em)
|
||||
|
||||
// allocate in metrics enabled allocator
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := a.AllocateNext()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
em = testMetrics{
|
||||
free: 2767 - 100,
|
||||
used: 100,
|
||||
allocated: 100,
|
||||
errors: 0,
|
||||
}
|
||||
expectMetrics(t, em)
|
||||
|
||||
// allocate in metrics disabled allocator
|
||||
for i := 0; i < 200; i++ {
|
||||
_, err := b.AllocateNext()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
// the metrics should not be changed
|
||||
expectMetrics(t, em)
|
||||
}
|
||||
|
||||
// Metrics helpers
|
||||
func clearMetrics() {
|
||||
nodePortAllocated.Set(0)
|
||||
nodePortAvailable.Set(0)
|
||||
nodePortAllocations.Reset()
|
||||
nodePortAllocationErrors.Reset()
|
||||
}
|
||||
|
||||
type testMetrics struct {
|
||||
free float64
|
||||
used float64
|
||||
allocated float64
|
||||
errors float64
|
||||
}
|
||||
|
||||
func expectMetrics(t *testing.T, em testMetrics) {
|
||||
var m testMetrics
|
||||
var err error
|
||||
m.free, err = testutil.GetGaugeMetricValue(nodePortAvailable)
|
||||
if err != nil {
|
||||
t.Errorf("failed to get %s value, err: %v", nodePortAvailable.Name, err)
|
||||
}
|
||||
m.used, err = testutil.GetGaugeMetricValue(nodePortAllocated)
|
||||
if err != nil {
|
||||
t.Errorf("failed to get %s value, err: %v", nodePortAllocated.Name, err)
|
||||
}
|
||||
staticAllocated, err := testutil.GetCounterMetricValue(nodePortAllocations.WithLabelValues("static"))
|
||||
if err != nil {
|
||||
t.Errorf("failed to get %s value, err: %v", nodePortAllocations.Name, err)
|
||||
}
|
||||
staticErrors, err := testutil.GetCounterMetricValue(nodePortAllocationErrors.WithLabelValues("static"))
|
||||
if err != nil {
|
||||
t.Errorf("failed to get %s value, err: %v", nodePortAllocationErrors.Name, err)
|
||||
}
|
||||
dynamicAllocated, err := testutil.GetCounterMetricValue(nodePortAllocations.WithLabelValues("dynamic"))
|
||||
if err != nil {
|
||||
t.Errorf("failed to get %s value, err: %v", nodePortAllocations.Name, err)
|
||||
}
|
||||
dynamicErrors, err := testutil.GetCounterMetricValue(nodePortAllocationErrors.WithLabelValues("dynamic"))
|
||||
if err != nil {
|
||||
t.Errorf("failed to get %s value, err: %v", nodePortAllocationErrors.Name, err)
|
||||
}
|
||||
|
||||
m.allocated = staticAllocated + dynamicAllocated
|
||||
m.errors = staticErrors + dynamicErrors
|
||||
|
||||
if m != em {
|
||||
t.Fatalf("metrics error: expected %v, received %v", em, m)
|
||||
}
|
||||
}
|
||||
|
120
pkg/registry/core/service/portallocator/metrics.go
Normal file
120
pkg/registry/core/service/portallocator/metrics.go
Normal file
@ -0,0 +1,120 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package portallocator
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/component-base/metrics"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
)
|
||||
|
||||
const (
|
||||
namespace = "kube_apiserver"
|
||||
subsystem = "nodeport_allocator"
|
||||
)
|
||||
|
||||
var (
|
||||
// nodePortAllocated indicates the amount of ports allocated by NodePort Service.
|
||||
nodePortAllocated = metrics.NewGauge(
|
||||
&metrics.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "allocated_ports",
|
||||
Help: "Gauge measuring the number of allocated NodePorts for Services",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
)
|
||||
// nodePortAvailable indicates the amount of ports available by NodePort Service.
|
||||
nodePortAvailable = metrics.NewGauge(
|
||||
&metrics.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "available_ports",
|
||||
Help: "Gauge measuring the number of available NodePorts for Services",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
)
|
||||
// nodePortAllocation counts the total number of ports allocation and allocation mode: static or dynamic.
|
||||
nodePortAllocations = metrics.NewCounterVec(
|
||||
&metrics.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "allocation_total",
|
||||
Help: "Number of NodePort allocations",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
[]string{"scope"},
|
||||
)
|
||||
// nodePortAllocationErrors counts the number of error trying to allocate a nodePort and allocation mode: static or dynamic.
|
||||
nodePortAllocationErrors = metrics.NewCounterVec(
|
||||
&metrics.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "allocation_errors_total",
|
||||
Help: "Number of errors trying to allocate NodePort",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
[]string{"scope"},
|
||||
)
|
||||
)
|
||||
|
||||
var registerMetricsOnce sync.Once
|
||||
|
||||
func registerMetrics() {
|
||||
registerMetricsOnce.Do(func() {
|
||||
legacyregistry.MustRegister(nodePortAllocated)
|
||||
legacyregistry.MustRegister(nodePortAvailable)
|
||||
legacyregistry.MustRegister(nodePortAllocations)
|
||||
legacyregistry.MustRegister(nodePortAllocationErrors)
|
||||
})
|
||||
}
|
||||
|
||||
// metricsRecorderInterface is the interface to record metrics.
|
||||
type metricsRecorderInterface interface {
|
||||
setAllocated(allocated int)
|
||||
setAvailable(available int)
|
||||
incrementAllocations(scope string)
|
||||
incrementAllocationErrors(scope string)
|
||||
}
|
||||
|
||||
// metricsRecorder implements metricsRecorderInterface.
|
||||
type metricsRecorder struct{}
|
||||
|
||||
func (m *metricsRecorder) setAllocated(allocated int) {
|
||||
nodePortAllocated.Set(float64(allocated))
|
||||
}
|
||||
|
||||
func (m *metricsRecorder) setAvailable(available int) {
|
||||
nodePortAvailable.Set(float64(available))
|
||||
}
|
||||
|
||||
func (m *metricsRecorder) incrementAllocations(scope string) {
|
||||
nodePortAllocations.WithLabelValues(scope).Inc()
|
||||
}
|
||||
|
||||
func (m *metricsRecorder) incrementAllocationErrors(scope string) {
|
||||
nodePortAllocationErrors.WithLabelValues(scope).Inc()
|
||||
}
|
||||
|
||||
// emptyMetricsRecorder is a null object implements metricsRecorderInterface.
|
||||
type emptyMetricsRecorder struct{}
|
||||
|
||||
func (*emptyMetricsRecorder) setAllocated(allocated int) {}
|
||||
func (*emptyMetricsRecorder) setAvailable(available int) {}
|
||||
func (*emptyMetricsRecorder) incrementAllocations(scope string) {}
|
||||
func (*emptyMetricsRecorder) incrementAllocationErrors(scope string) {}
|
@ -27,8 +27,11 @@ import (
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
_ "k8s.io/kubernetes/pkg/apis/core/install"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
|
||||
allocatorstore "k8s.io/kubernetes/pkg/registry/core/service/allocator/storage"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
|
||||
@ -46,8 +49,8 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, portallocator.Inter
|
||||
serviceNodePortRange := utilnet.PortRange{Base: basePortRange, Size: sizePortRange}
|
||||
configForAllocations := etcdStorage.ForResource(api.Resource("servicenodeportallocations"))
|
||||
var backing allocator.Interface
|
||||
storage, err := portallocator.New(serviceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) {
|
||||
mem := allocator.NewAllocationMap(max, rangeSpec)
|
||||
storage, err := portallocator.New(serviceNodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
|
||||
mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
|
||||
backing = mem
|
||||
etcd, err := allocatorstore.NewEtcd(mem, "/ranges/servicenodeports", configForAllocations)
|
||||
if err != nil {
|
||||
@ -183,3 +186,79 @@ func TestReallocate(t *testing.T) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestAllocateReserved(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceNodePortStaticSubrange, true)()
|
||||
|
||||
_, storage, _, si, destroyFunc := newStorage(t)
|
||||
defer destroyFunc()
|
||||
if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// allocate all ports on the dynamic block
|
||||
// default range size is 2768, and dynamic block size is min(max(16,2768/32),128) = 86
|
||||
dynamicOffset := 86
|
||||
dynamicBlockSize := sizePortRange - dynamicOffset
|
||||
for i := 0; i < dynamicBlockSize; i++ {
|
||||
if _, err := storage.AllocateNext(); err != nil {
|
||||
t.Errorf("Unexpected error trying to allocate: %v", err)
|
||||
}
|
||||
}
|
||||
for i := dynamicOffset; i < sizePortRange; i++ {
|
||||
port := i + basePortRange
|
||||
if !storage.Has(port) {
|
||||
t.Errorf("Port %d expected to be allocated", port)
|
||||
}
|
||||
}
|
||||
|
||||
// allocate all ports on the static block
|
||||
for i := 0; i < dynamicOffset; i++ {
|
||||
port := i + basePortRange
|
||||
if err := storage.Allocate(port); err != nil {
|
||||
t.Errorf("Unexpected error trying to allocate Port %d: %v", port, err)
|
||||
}
|
||||
}
|
||||
if _, err := storage.AllocateNext(); err == nil {
|
||||
t.Error("Allocator expected to be full")
|
||||
}
|
||||
// release one port in the allocated block and another a new one randomly
|
||||
if err := storage.Release(basePortRange + 53); err != nil {
|
||||
t.Fatalf("Unexpected error trying to release port 30053: %v", err)
|
||||
}
|
||||
if _, err := storage.AllocateNext(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if _, err := storage.AllocateNext(); err == nil {
|
||||
t.Error("Allocator expected to be full")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocateReservedDynamicBlockExhausted(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceNodePortStaticSubrange, true)()
|
||||
|
||||
_, storage, _, si, destroyFunc := newStorage(t)
|
||||
defer destroyFunc()
|
||||
if err := si.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// allocate all ports both on the dynamic and reserved blocks
|
||||
// once the dynamic block has been exhausted
|
||||
// the dynamic allocator will use the reserved block
|
||||
for i := 0; i < sizePortRange; i++ {
|
||||
if _, err := storage.AllocateNext(); err != nil {
|
||||
t.Errorf("Unexpected error trying to allocate: %v", err)
|
||||
}
|
||||
}
|
||||
for i := 0; i < sizePortRange; i++ {
|
||||
port := i + basePortRange
|
||||
if !storage.Has(port) {
|
||||
t.Errorf("Port %d expected to be allocated", port)
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := storage.AllocateNext(); err == nil {
|
||||
t.Error("Allocator expected to be full")
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user