add --concurrent-ephemeralvolume-syncs flag for kube-controller-manager
This commit is contained in:
@@ -401,8 +401,7 @@ func startEphemeralVolumeController(ctx ControllerContext) (http.Handler, bool,
|
||||
if err != nil {
|
||||
return nil, true, fmt.Errorf("failed to start ephemeral volume controller: %v", err)
|
||||
}
|
||||
// TODO (before beta at the latest): make this configurable similar to the EndpointController
|
||||
go ephemeralController.Run(1 /* int(ctx.ComponentConfig.EphemeralController.ConcurrentEphemeralVolumeSyncs) */, ctx.Stop)
|
||||
go ephemeralController.Run(int(ctx.ComponentConfig.EphemeralVolumeController.ConcurrentEphemeralVolumeSyncs), ctx.Stop)
|
||||
return nil, true, nil
|
||||
}
|
||||
return nil, false, nil
|
||||
|
@@ -0,0 +1,63 @@
|
||||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package options
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
|
||||
ephemeralvolumeconfig "k8s.io/kubernetes/pkg/controller/volume/ephemeral/config"
|
||||
)
|
||||
|
||||
// EphemeralVolumeControllerOptions holds the EphemeralVolumeController options.
|
||||
type EphemeralVolumeControllerOptions struct {
|
||||
*ephemeralvolumeconfig.EphemeralVolumeControllerConfiguration
|
||||
}
|
||||
|
||||
// AddFlags adds flags related to EphemeralVolumeController for controller manager to the specified FlagSet.
|
||||
func (o *EphemeralVolumeControllerOptions) AddFlags(fs *pflag.FlagSet) {
|
||||
if o == nil {
|
||||
return
|
||||
}
|
||||
|
||||
fs.Int32Var(&o.ConcurrentEphemeralVolumeSyncs, "concurrent-ephemeralvolume-syncs", o.ConcurrentEphemeralVolumeSyncs, "The number of ephemeral volume syncing operations that will be done concurrently. Larger number = faster ephemeral volume updating, but more CPU (and network) load")
|
||||
}
|
||||
|
||||
// ApplyTo fills up EphemeralVolumeController config with options.
|
||||
func (o *EphemeralVolumeControllerOptions) ApplyTo(cfg *ephemeralvolumeconfig.EphemeralVolumeControllerConfiguration) error {
|
||||
if o == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
cfg.ConcurrentEphemeralVolumeSyncs = o.ConcurrentEphemeralVolumeSyncs
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Validate checks validation of EphemeralVolumeControllerOptions.
|
||||
func (o *EphemeralVolumeControllerOptions) Validate() []error {
|
||||
if o == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
errs := []error{}
|
||||
if o.ConcurrentEphemeralVolumeSyncs < 1 {
|
||||
errs = append(errs, fmt.Errorf("concurrent-ephemeralvolume-syncs must be greater than 0, but got %d", o.ConcurrentEphemeralVolumeSyncs))
|
||||
}
|
||||
return errs
|
||||
}
|
@@ -70,6 +70,7 @@ type KubeControllerManagerOptions struct {
|
||||
EndpointController *EndpointControllerOptions
|
||||
EndpointSliceController *EndpointSliceControllerOptions
|
||||
EndpointSliceMirroringController *EndpointSliceMirroringControllerOptions
|
||||
EphemeralVolumeController *EphemeralVolumeControllerOptions
|
||||
GarbageCollectorController *GarbageCollectorControllerOptions
|
||||
HPAController *HPAControllerOptions
|
||||
JobController *JobControllerOptions
|
||||
@@ -136,6 +137,9 @@ func NewKubeControllerManagerOptions() (*KubeControllerManagerOptions, error) {
|
||||
EndpointSliceMirroringController: &EndpointSliceMirroringControllerOptions{
|
||||
&componentConfig.EndpointSliceMirroringController,
|
||||
},
|
||||
EphemeralVolumeController: &EphemeralVolumeControllerOptions{
|
||||
&componentConfig.EphemeralVolumeController,
|
||||
},
|
||||
GarbageCollectorController: &GarbageCollectorControllerOptions{
|
||||
&componentConfig.GarbageCollectorController,
|
||||
},
|
||||
@@ -252,6 +256,7 @@ func (s *KubeControllerManagerOptions) Flags(allControllers []string, disabledBy
|
||||
s.EndpointController.AddFlags(fss.FlagSet("endpoint controller"))
|
||||
s.EndpointSliceController.AddFlags(fss.FlagSet("endpointslice controller"))
|
||||
s.EndpointSliceMirroringController.AddFlags(fss.FlagSet("endpointslicemirroring controller"))
|
||||
s.EphemeralVolumeController.AddFlags(fss.FlagSet("ephemeralvolume controller"))
|
||||
s.GarbageCollectorController.AddFlags(fss.FlagSet("garbagecollector controller"))
|
||||
s.HPAController.AddFlags(fss.FlagSet("horizontalpodautoscaling controller"))
|
||||
s.JobController.AddFlags(fss.FlagSet("job controller"))
|
||||
@@ -312,6 +317,9 @@ func (s *KubeControllerManagerOptions) ApplyTo(c *kubecontrollerconfig.Config) e
|
||||
if err := s.EndpointSliceMirroringController.ApplyTo(&c.ComponentConfig.EndpointSliceMirroringController); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.EphemeralVolumeController.ApplyTo(&c.ComponentConfig.EphemeralVolumeController); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.GarbageCollectorController.ApplyTo(&c.ComponentConfig.GarbageCollectorController); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -386,6 +394,7 @@ func (s *KubeControllerManagerOptions) Validate(allControllers []string, disable
|
||||
errs = append(errs, s.EndpointController.Validate()...)
|
||||
errs = append(errs, s.EndpointSliceController.Validate()...)
|
||||
errs = append(errs, s.EndpointSliceMirroringController.Validate()...)
|
||||
errs = append(errs, s.EphemeralVolumeController.Validate()...)
|
||||
errs = append(errs, s.GarbageCollectorController.Validate()...)
|
||||
errs = append(errs, s.HPAController.Validate()...)
|
||||
errs = append(errs, s.JobController.Validate()...)
|
||||
|
@@ -59,6 +59,7 @@ import (
|
||||
statefulsetconfig "k8s.io/kubernetes/pkg/controller/statefulset/config"
|
||||
ttlafterfinishedconfig "k8s.io/kubernetes/pkg/controller/ttlafterfinished/config"
|
||||
attachdetachconfig "k8s.io/kubernetes/pkg/controller/volume/attachdetach/config"
|
||||
ephemeralvolumeconfig "k8s.io/kubernetes/pkg/controller/volume/ephemeral/config"
|
||||
persistentvolumeconfig "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/config"
|
||||
)
|
||||
|
||||
@@ -83,6 +84,7 @@ var args = []string{
|
||||
"--concurrent-deployment-syncs=10",
|
||||
"--concurrent-statefulset-syncs=15",
|
||||
"--concurrent-endpoint-syncs=10",
|
||||
"--concurrent-ephemeralvolume-syncs=10",
|
||||
"--concurrent-service-endpoint-syncs=10",
|
||||
"--concurrent-gc-syncs=30",
|
||||
"--concurrent-namespace-syncs=20",
|
||||
@@ -288,6 +290,11 @@ func TestAddFlags(t *testing.T) {
|
||||
MirroringMaxEndpointsPerSubset: 1000,
|
||||
},
|
||||
},
|
||||
EphemeralVolumeController: &EphemeralVolumeControllerOptions{
|
||||
&ephemeralvolumeconfig.EphemeralVolumeControllerConfiguration{
|
||||
ConcurrentEphemeralVolumeSyncs: 10,
|
||||
},
|
||||
},
|
||||
GarbageCollectorController: &GarbageCollectorControllerOptions{
|
||||
&garbagecollectorconfig.GarbageCollectorControllerConfiguration{
|
||||
ConcurrentGCSyncs: 30,
|
||||
@@ -545,6 +552,9 @@ func TestApplyTo(t *testing.T) {
|
||||
MirroringConcurrentServiceEndpointSyncs: 2,
|
||||
MirroringMaxEndpointsPerSubset: 1000,
|
||||
},
|
||||
EphemeralVolumeController: ephemeralvolumeconfig.EphemeralVolumeControllerConfiguration{
|
||||
ConcurrentEphemeralVolumeSyncs: 10,
|
||||
},
|
||||
GarbageCollectorController: garbagecollectorconfig.GarbageCollectorControllerConfiguration{
|
||||
ConcurrentGCSyncs: 30,
|
||||
GCIgnoredResources: []garbagecollectorconfig.GroupResource{
|
||||
|
Reference in New Issue
Block a user