Merge pull request #114301 from harshanarayana/kubelet/log-rotate-improvements
kubelet: enable configurable rotation duration and parallel rotate
This commit is contained in:
		
							
								
								
									
										13
									
								
								pkg/generated/openapi/zz_generated.openapi.go
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										13
									
								
								pkg/generated/openapi/zz_generated.openapi.go
									
									
									
										generated
									
									
									
								
							@@ -58809,6 +58809,19 @@ func schema_k8sio_kubelet_config_v1beta1_KubeletConfiguration(ref common.Referen
 | 
				
			|||||||
							Format:      "int32",
 | 
												Format:      "int32",
 | 
				
			||||||
						},
 | 
											},
 | 
				
			||||||
					},
 | 
										},
 | 
				
			||||||
 | 
										"containerLogMaxWorkers": {
 | 
				
			||||||
 | 
											SchemaProps: spec.SchemaProps{
 | 
				
			||||||
 | 
												Description: "ContainerLogMaxWorkers specifies the maximum number of concurrent workers to spawn for performing the log rotate operations. Set this count to 1 for disabling the concurrent log rotation workflows Default: 1",
 | 
				
			||||||
 | 
												Type:        []string{"integer"},
 | 
				
			||||||
 | 
												Format:      "int32",
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
										"containerLogMonitorInterval": {
 | 
				
			||||||
 | 
											SchemaProps: spec.SchemaProps{
 | 
				
			||||||
 | 
												Description: "ContainerLogMonitorInterval specifies the duration at which the container logs are monitored for performing the log rotate operation. This defaults to 10 * time.Seconds. But can be customized to a smaller value based on the log generation rate and the size required to be rotated against Default: 10s",
 | 
				
			||||||
 | 
												Ref:         ref("k8s.io/apimachinery/pkg/apis/meta/v1.Duration"),
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
					"configMapAndSecretChangeDetectionStrategy": {
 | 
										"configMapAndSecretChangeDetectionStrategy": {
 | 
				
			||||||
						SchemaProps: spec.SchemaProps{
 | 
											SchemaProps: spec.SchemaProps{
 | 
				
			||||||
							Description: "configMapAndSecretChangeDetectionStrategy is a mode in which ConfigMap and Secret managers are running. Valid values include:\n\n- `Get`: kubelet fetches necessary objects directly from the API server; - `Cache`: kubelet uses TTL cache for object fetched from the API server; - `Watch`: kubelet uses watches to observe changes to objects that are in its interest.\n\nDefault: \"Watch\"",
 | 
												Description: "configMapAndSecretChangeDetectionStrategy is a mode in which ConfigMap and Secret managers are running. Valid values include:\n\n- `Get`: kubelet fetches necessary objects directly from the API server; - `Cache`: kubelet uses TTL cache for object fetched from the API server; - `Watch`: kubelet uses watches to observe changes to objects that are in its interest.\n\nDefault: \"Watch\"",
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -106,6 +106,8 @@ func Funcs(codecs runtimeserializer.CodecFactory) []interface{} {
 | 
				
			|||||||
			obj.StaticPodURLHeader = make(map[string][]string)
 | 
								obj.StaticPodURLHeader = make(map[string][]string)
 | 
				
			||||||
			obj.ContainerLogMaxFiles = 5
 | 
								obj.ContainerLogMaxFiles = 5
 | 
				
			||||||
			obj.ContainerLogMaxSize = "10Mi"
 | 
								obj.ContainerLogMaxSize = "10Mi"
 | 
				
			||||||
 | 
								obj.ContainerLogMaxWorkers = 1
 | 
				
			||||||
 | 
								obj.ContainerLogMonitorInterval = metav1.Duration{Duration: 10 * time.Second}
 | 
				
			||||||
			obj.ConfigMapAndSecretChangeDetectionStrategy = "Watch"
 | 
								obj.ConfigMapAndSecretChangeDetectionStrategy = "Watch"
 | 
				
			||||||
			obj.AllowedUnsafeSysctls = []string{}
 | 
								obj.AllowedUnsafeSysctls = []string{}
 | 
				
			||||||
			obj.VolumePluginDir = kubeletconfigv1beta1.DefaultVolumePluginDir
 | 
								obj.VolumePluginDir = kubeletconfigv1beta1.DefaultVolumePluginDir
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -185,6 +185,8 @@ var (
 | 
				
			|||||||
		"ConfigMapAndSecretChangeDetectionStrategy",
 | 
							"ConfigMapAndSecretChangeDetectionStrategy",
 | 
				
			||||||
		"ContainerLogMaxFiles",
 | 
							"ContainerLogMaxFiles",
 | 
				
			||||||
		"ContainerLogMaxSize",
 | 
							"ContainerLogMaxSize",
 | 
				
			||||||
 | 
							"ContainerLogMaxWorkers",
 | 
				
			||||||
 | 
							"ContainerLogMonitorInterval",
 | 
				
			||||||
		"ContentType",
 | 
							"ContentType",
 | 
				
			||||||
		"EnableContentionProfiling",
 | 
							"EnableContentionProfiling",
 | 
				
			||||||
		"EnableControllerAttachDetach",
 | 
							"EnableControllerAttachDetach",
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -17,6 +17,8 @@ cgroupsPerQOS: true
 | 
				
			|||||||
configMapAndSecretChangeDetectionStrategy: Watch
 | 
					configMapAndSecretChangeDetectionStrategy: Watch
 | 
				
			||||||
containerLogMaxFiles: 5
 | 
					containerLogMaxFiles: 5
 | 
				
			||||||
containerLogMaxSize: 10Mi
 | 
					containerLogMaxSize: 10Mi
 | 
				
			||||||
 | 
					containerLogMaxWorkers: 1
 | 
				
			||||||
 | 
					containerLogMonitorInterval: 10s
 | 
				
			||||||
containerRuntimeEndpoint: unix:///run/containerd/containerd.sock
 | 
					containerRuntimeEndpoint: unix:///run/containerd/containerd.sock
 | 
				
			||||||
contentType: application/vnd.kubernetes.protobuf
 | 
					contentType: application/vnd.kubernetes.protobuf
 | 
				
			||||||
cpuCFSQuota: true
 | 
					cpuCFSQuota: true
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -17,6 +17,8 @@ cgroupsPerQOS: true
 | 
				
			|||||||
configMapAndSecretChangeDetectionStrategy: Watch
 | 
					configMapAndSecretChangeDetectionStrategy: Watch
 | 
				
			||||||
containerLogMaxFiles: 5
 | 
					containerLogMaxFiles: 5
 | 
				
			||||||
containerLogMaxSize: 10Mi
 | 
					containerLogMaxSize: 10Mi
 | 
				
			||||||
 | 
					containerLogMaxWorkers: 1
 | 
				
			||||||
 | 
					containerLogMonitorInterval: 10s
 | 
				
			||||||
containerRuntimeEndpoint: unix:///run/containerd/containerd.sock
 | 
					containerRuntimeEndpoint: unix:///run/containerd/containerd.sock
 | 
				
			||||||
contentType: application/vnd.kubernetes.protobuf
 | 
					contentType: application/vnd.kubernetes.protobuf
 | 
				
			||||||
cpuCFSQuota: true
 | 
					cpuCFSQuota: true
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -347,6 +347,11 @@ type KubeletConfiguration struct {
 | 
				
			|||||||
	ContainerLogMaxSize string
 | 
						ContainerLogMaxSize string
 | 
				
			||||||
	// Maximum number of container log files that can be present for a container.
 | 
						// Maximum number of container log files that can be present for a container.
 | 
				
			||||||
	ContainerLogMaxFiles int32
 | 
						ContainerLogMaxFiles int32
 | 
				
			||||||
 | 
						// Maximum number of concurrent log rotation workers to spawn for processing the log rotation
 | 
				
			||||||
 | 
						// requests
 | 
				
			||||||
 | 
						ContainerLogMaxWorkers int32
 | 
				
			||||||
 | 
						// Interval at which the container logs are monitored for rotation
 | 
				
			||||||
 | 
						ContainerLogMonitorInterval metav1.Duration
 | 
				
			||||||
	// ConfigMapAndSecretChangeDetectionStrategy is a mode in which config map and secret managers are running.
 | 
						// ConfigMapAndSecretChangeDetectionStrategy is a mode in which config map and secret managers are running.
 | 
				
			||||||
	ConfigMapAndSecretChangeDetectionStrategy ResourceChangeDetectionStrategy
 | 
						ConfigMapAndSecretChangeDetectionStrategy ResourceChangeDetectionStrategy
 | 
				
			||||||
	// A comma separated allowlist of unsafe sysctls or sysctl patterns (ending in `*`).
 | 
						// A comma separated allowlist of unsafe sysctls or sysctl patterns (ending in `*`).
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -239,6 +239,12 @@ func SetDefaults_KubeletConfiguration(obj *kubeletconfigv1beta1.KubeletConfigura
 | 
				
			|||||||
	if obj.ContainerLogMaxFiles == nil {
 | 
						if obj.ContainerLogMaxFiles == nil {
 | 
				
			||||||
		obj.ContainerLogMaxFiles = utilpointer.Int32(5)
 | 
							obj.ContainerLogMaxFiles = utilpointer.Int32(5)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if obj.ContainerLogMaxWorkers == nil {
 | 
				
			||||||
 | 
							obj.ContainerLogMaxWorkers = utilpointer.Int32(1)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if obj.ContainerLogMonitorInterval == nil {
 | 
				
			||||||
 | 
							obj.ContainerLogMonitorInterval = &metav1.Duration{Duration: 10 * time.Second}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	if obj.ConfigMapAndSecretChangeDetectionStrategy == "" {
 | 
						if obj.ConfigMapAndSecretChangeDetectionStrategy == "" {
 | 
				
			||||||
		obj.ConfigMapAndSecretChangeDetectionStrategy = kubeletconfigv1beta1.WatchChangeDetectionStrategy
 | 
							obj.ConfigMapAndSecretChangeDetectionStrategy = kubeletconfigv1beta1.WatchChangeDetectionStrategy
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -112,6 +112,8 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
 | 
				
			|||||||
				FailSwapOn:                                utilpointer.Bool(true),
 | 
									FailSwapOn:                                utilpointer.Bool(true),
 | 
				
			||||||
				ContainerLogMaxSize:                       "10Mi",
 | 
									ContainerLogMaxSize:                       "10Mi",
 | 
				
			||||||
				ContainerLogMaxFiles:                      utilpointer.Int32(5),
 | 
									ContainerLogMaxFiles:                      utilpointer.Int32(5),
 | 
				
			||||||
 | 
									ContainerLogMaxWorkers:                    utilpointer.Int32(1),
 | 
				
			||||||
 | 
									ContainerLogMonitorInterval:               &metav1.Duration{Duration: 10 * time.Second},
 | 
				
			||||||
				ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
 | 
									ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
 | 
				
			||||||
				EnforceNodeAllocatable:                    DefaultNodeAllocatableEnforcement,
 | 
									EnforceNodeAllocatable:                    DefaultNodeAllocatableEnforcement,
 | 
				
			||||||
				VolumePluginDir:                           DefaultVolumePluginDir,
 | 
									VolumePluginDir:                           DefaultVolumePluginDir,
 | 
				
			||||||
@@ -227,6 +229,8 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
 | 
				
			|||||||
				MemorySwap:                       v1beta1.MemorySwapConfiguration{SwapBehavior: ""},
 | 
									MemorySwap:                       v1beta1.MemorySwapConfiguration{SwapBehavior: ""},
 | 
				
			||||||
				ContainerLogMaxSize:              "",
 | 
									ContainerLogMaxSize:              "",
 | 
				
			||||||
				ContainerLogMaxFiles:             utilpointer.Int32(0),
 | 
									ContainerLogMaxFiles:             utilpointer.Int32(0),
 | 
				
			||||||
 | 
									ContainerLogMaxWorkers:           utilpointer.Int32(1),
 | 
				
			||||||
 | 
									ContainerLogMonitorInterval:      &metav1.Duration{Duration: 10 * time.Second},
 | 
				
			||||||
				ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
 | 
									ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
 | 
				
			||||||
				SystemReserved:              map[string]string{},
 | 
									SystemReserved:              map[string]string{},
 | 
				
			||||||
				KubeReserved:                map[string]string{},
 | 
									KubeReserved:                map[string]string{},
 | 
				
			||||||
@@ -333,6 +337,8 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
 | 
				
			|||||||
				MemorySwap:                                v1beta1.MemorySwapConfiguration{SwapBehavior: ""},
 | 
									MemorySwap:                                v1beta1.MemorySwapConfiguration{SwapBehavior: ""},
 | 
				
			||||||
				ContainerLogMaxSize:                       "10Mi",
 | 
									ContainerLogMaxSize:                       "10Mi",
 | 
				
			||||||
				ContainerLogMaxFiles:                      utilpointer.Int32(0),
 | 
									ContainerLogMaxFiles:                      utilpointer.Int32(0),
 | 
				
			||||||
 | 
									ContainerLogMaxWorkers:                    utilpointer.Int32(1),
 | 
				
			||||||
 | 
									ContainerLogMonitorInterval:               &metav1.Duration{Duration: 10 * time.Second},
 | 
				
			||||||
				ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
 | 
									ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
 | 
				
			||||||
				SystemReserved:                            map[string]string{},
 | 
									SystemReserved:                            map[string]string{},
 | 
				
			||||||
				KubeReserved:                              map[string]string{},
 | 
									KubeReserved:                              map[string]string{},
 | 
				
			||||||
@@ -465,6 +471,8 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
 | 
				
			|||||||
				MemorySwap:                                v1beta1.MemorySwapConfiguration{SwapBehavior: "UnlimitedSwap"},
 | 
									MemorySwap:                                v1beta1.MemorySwapConfiguration{SwapBehavior: "UnlimitedSwap"},
 | 
				
			||||||
				ContainerLogMaxSize:                       "1Mi",
 | 
									ContainerLogMaxSize:                       "1Mi",
 | 
				
			||||||
				ContainerLogMaxFiles:                      utilpointer.Int32(1),
 | 
									ContainerLogMaxFiles:                      utilpointer.Int32(1),
 | 
				
			||||||
 | 
									ContainerLogMaxWorkers:                    utilpointer.Int32(1),
 | 
				
			||||||
 | 
									ContainerLogMonitorInterval:               &metav1.Duration{Duration: 10 * time.Second},
 | 
				
			||||||
				ConfigMapAndSecretChangeDetectionStrategy: v1beta1.TTLCacheChangeDetectionStrategy,
 | 
									ConfigMapAndSecretChangeDetectionStrategy: v1beta1.TTLCacheChangeDetectionStrategy,
 | 
				
			||||||
				SystemReserved: map[string]string{
 | 
									SystemReserved: map[string]string{
 | 
				
			||||||
					"memory": "1Gi",
 | 
										"memory": "1Gi",
 | 
				
			||||||
@@ -611,6 +619,8 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
 | 
				
			|||||||
				MemorySwap:                                v1beta1.MemorySwapConfiguration{SwapBehavior: "UnlimitedSwap"},
 | 
									MemorySwap:                                v1beta1.MemorySwapConfiguration{SwapBehavior: "UnlimitedSwap"},
 | 
				
			||||||
				ContainerLogMaxSize:                       "1Mi",
 | 
									ContainerLogMaxSize:                       "1Mi",
 | 
				
			||||||
				ContainerLogMaxFiles:                      utilpointer.Int32(1),
 | 
									ContainerLogMaxFiles:                      utilpointer.Int32(1),
 | 
				
			||||||
 | 
									ContainerLogMaxWorkers:                    utilpointer.Int32(1),
 | 
				
			||||||
 | 
									ContainerLogMonitorInterval:               &metav1.Duration{Duration: 10 * time.Second},
 | 
				
			||||||
				ConfigMapAndSecretChangeDetectionStrategy: v1beta1.TTLCacheChangeDetectionStrategy,
 | 
									ConfigMapAndSecretChangeDetectionStrategy: v1beta1.TTLCacheChangeDetectionStrategy,
 | 
				
			||||||
				SystemReserved: map[string]string{
 | 
									SystemReserved: map[string]string{
 | 
				
			||||||
					"memory": "1Gi",
 | 
										"memory": "1Gi",
 | 
				
			||||||
@@ -720,7 +730,9 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
 | 
				
			|||||||
				IPTablesDropBit:                           utilpointer.Int32Ptr(DefaultIPTablesDropBit),
 | 
									IPTablesDropBit:                           utilpointer.Int32Ptr(DefaultIPTablesDropBit),
 | 
				
			||||||
				FailSwapOn:                                utilpointer.Bool(true),
 | 
									FailSwapOn:                                utilpointer.Bool(true),
 | 
				
			||||||
				ContainerLogMaxSize:                       "10Mi",
 | 
									ContainerLogMaxSize:                       "10Mi",
 | 
				
			||||||
				ContainerLogMaxFiles:                      utilpointer.Int32Ptr(5),
 | 
									ContainerLogMaxFiles:                      utilpointer.Int32(5),
 | 
				
			||||||
 | 
									ContainerLogMaxWorkers:                    utilpointer.Int32(1),
 | 
				
			||||||
 | 
									ContainerLogMonitorInterval:               &metav1.Duration{Duration: 10 * time.Second},
 | 
				
			||||||
				ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
 | 
									ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
 | 
				
			||||||
				EnforceNodeAllocatable:                    DefaultNodeAllocatableEnforcement,
 | 
									EnforceNodeAllocatable:                    DefaultNodeAllocatableEnforcement,
 | 
				
			||||||
				VolumePluginDir:                           DefaultVolumePluginDir,
 | 
									VolumePluginDir:                           DefaultVolumePluginDir,
 | 
				
			||||||
@@ -809,7 +821,9 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
 | 
				
			|||||||
				IPTablesDropBit:                           utilpointer.Int32Ptr(DefaultIPTablesDropBit),
 | 
									IPTablesDropBit:                           utilpointer.Int32Ptr(DefaultIPTablesDropBit),
 | 
				
			||||||
				FailSwapOn:                                utilpointer.Bool(true),
 | 
									FailSwapOn:                                utilpointer.Bool(true),
 | 
				
			||||||
				ContainerLogMaxSize:                       "10Mi",
 | 
									ContainerLogMaxSize:                       "10Mi",
 | 
				
			||||||
				ContainerLogMaxFiles:                      utilpointer.Int32Ptr(5),
 | 
									ContainerLogMaxFiles:                      utilpointer.Int32(5),
 | 
				
			||||||
 | 
									ContainerLogMaxWorkers:                    utilpointer.Int32(1),
 | 
				
			||||||
 | 
									ContainerLogMonitorInterval:               &metav1.Duration{Duration: 10 * time.Second},
 | 
				
			||||||
				ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
 | 
									ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
 | 
				
			||||||
				EnforceNodeAllocatable:                    DefaultNodeAllocatableEnforcement,
 | 
									EnforceNodeAllocatable:                    DefaultNodeAllocatableEnforcement,
 | 
				
			||||||
				VolumePluginDir:                           DefaultVolumePluginDir,
 | 
									VolumePluginDir:                           DefaultVolumePluginDir,
 | 
				
			||||||
@@ -899,6 +913,8 @@ func TestSetDefaultsKubeletConfiguration(t *testing.T) {
 | 
				
			|||||||
				FailSwapOn:                                utilpointer.Bool(true),
 | 
									FailSwapOn:                                utilpointer.Bool(true),
 | 
				
			||||||
				ContainerLogMaxSize:                       "10Mi",
 | 
									ContainerLogMaxSize:                       "10Mi",
 | 
				
			||||||
				ContainerLogMaxFiles:                      utilpointer.Int32(5),
 | 
									ContainerLogMaxFiles:                      utilpointer.Int32(5),
 | 
				
			||||||
 | 
									ContainerLogMaxWorkers:                    utilpointer.Int32(1),
 | 
				
			||||||
 | 
									ContainerLogMonitorInterval:               &metav1.Duration{Duration: 10 * time.Second},
 | 
				
			||||||
				ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
 | 
									ConfigMapAndSecretChangeDetectionStrategy: v1beta1.WatchChangeDetectionStrategy,
 | 
				
			||||||
				EnforceNodeAllocatable:                    DefaultNodeAllocatableEnforcement,
 | 
									EnforceNodeAllocatable:                    DefaultNodeAllocatableEnforcement,
 | 
				
			||||||
				VolumePluginDir:                           DefaultVolumePluginDir,
 | 
									VolumePluginDir:                           DefaultVolumePluginDir,
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -476,6 +476,12 @@ func autoConvert_v1beta1_KubeletConfiguration_To_config_KubeletConfiguration(in
 | 
				
			|||||||
	if err := v1.Convert_Pointer_int32_To_int32(&in.ContainerLogMaxFiles, &out.ContainerLogMaxFiles, s); err != nil {
 | 
						if err := v1.Convert_Pointer_int32_To_int32(&in.ContainerLogMaxFiles, &out.ContainerLogMaxFiles, s); err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if err := v1.Convert_Pointer_int32_To_int32(&in.ContainerLogMaxWorkers, &out.ContainerLogMaxWorkers, s); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if err := v1.Convert_Pointer_v1_Duration_To_v1_Duration(&in.ContainerLogMonitorInterval, &out.ContainerLogMonitorInterval, s); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	out.ConfigMapAndSecretChangeDetectionStrategy = config.ResourceChangeDetectionStrategy(in.ConfigMapAndSecretChangeDetectionStrategy)
 | 
						out.ConfigMapAndSecretChangeDetectionStrategy = config.ResourceChangeDetectionStrategy(in.ConfigMapAndSecretChangeDetectionStrategy)
 | 
				
			||||||
	out.SystemReserved = *(*map[string]string)(unsafe.Pointer(&in.SystemReserved))
 | 
						out.SystemReserved = *(*map[string]string)(unsafe.Pointer(&in.SystemReserved))
 | 
				
			||||||
	out.KubeReserved = *(*map[string]string)(unsafe.Pointer(&in.KubeReserved))
 | 
						out.KubeReserved = *(*map[string]string)(unsafe.Pointer(&in.KubeReserved))
 | 
				
			||||||
@@ -664,6 +670,12 @@ func autoConvert_config_KubeletConfiguration_To_v1beta1_KubeletConfiguration(in
 | 
				
			|||||||
	if err := v1.Convert_int32_To_Pointer_int32(&in.ContainerLogMaxFiles, &out.ContainerLogMaxFiles, s); err != nil {
 | 
						if err := v1.Convert_int32_To_Pointer_int32(&in.ContainerLogMaxFiles, &out.ContainerLogMaxFiles, s); err != nil {
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if err := v1.Convert_int32_To_Pointer_int32(&in.ContainerLogMaxWorkers, &out.ContainerLogMaxWorkers, s); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if err := v1.Convert_v1_Duration_To_Pointer_v1_Duration(&in.ContainerLogMonitorInterval, &out.ContainerLogMonitorInterval, s); err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	out.ConfigMapAndSecretChangeDetectionStrategy = v1beta1.ResourceChangeDetectionStrategy(in.ConfigMapAndSecretChangeDetectionStrategy)
 | 
						out.ConfigMapAndSecretChangeDetectionStrategy = v1beta1.ResourceChangeDetectionStrategy(in.ConfigMapAndSecretChangeDetectionStrategy)
 | 
				
			||||||
	out.AllowedUnsafeSysctls = *(*[]string)(unsafe.Pointer(&in.AllowedUnsafeSysctls))
 | 
						out.AllowedUnsafeSysctls = *(*[]string)(unsafe.Pointer(&in.AllowedUnsafeSysctls))
 | 
				
			||||||
	out.KernelMemcgNotification = in.KernelMemcgNotification
 | 
						out.KernelMemcgNotification = in.KernelMemcgNotification
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -279,5 +279,12 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration, featur
 | 
				
			|||||||
			fmt.Errorf("invalid configuration: enableSystemLogHandler is required for enableSystemLogQuery"))
 | 
								fmt.Errorf("invalid configuration: enableSystemLogHandler is required for enableSystemLogQuery"))
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if kc.ContainerLogMaxWorkers < 1 {
 | 
				
			||||||
 | 
							allErrors = append(allErrors, fmt.Errorf("invalid configuration: containerLogMaxWorkers must be greater than or equal to 1"))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if kc.ContainerLogMonitorInterval.Duration.Seconds() < 3 {
 | 
				
			||||||
 | 
							allErrors = append(allErrors, fmt.Errorf("invalid configuration: containerLogMonitorInterval must be a positive time duration greater than or equal to 3s"))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	return utilerrors.NewAggregate(allErrors)
 | 
						return utilerrors.NewAggregate(allErrors)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -75,6 +75,8 @@ var (
 | 
				
			|||||||
			Format: "text",
 | 
								Format: "text",
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		ContainerRuntimeEndpoint:    "unix:///run/containerd/containerd.sock",
 | 
							ContainerRuntimeEndpoint:    "unix:///run/containerd/containerd.sock",
 | 
				
			||||||
 | 
							ContainerLogMaxWorkers:      1,
 | 
				
			||||||
 | 
							ContainerLogMonitorInterval: metav1.Duration{Duration: 10 * time.Second},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -545,6 +547,27 @@ func TestValidateKubeletConfiguration(t *testing.T) {
 | 
				
			|||||||
			return conf
 | 
								return conf
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
		errMsg: "invalid configuration: imageMaximumGCAge 1ns must be greater than imageMinimumGCAge 2ns",
 | 
							errMsg: "invalid configuration: imageMaximumGCAge 1ns must be greater than imageMinimumGCAge 2ns",
 | 
				
			||||||
 | 
						}, {
 | 
				
			||||||
 | 
							name: "containerLogMaxWorkers must be greater than or equal to 1",
 | 
				
			||||||
 | 
							configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
 | 
				
			||||||
 | 
								conf.ContainerLogMaxWorkers = 0
 | 
				
			||||||
 | 
								return conf
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							errMsg: "invalid configuration: containerLogMaxWorkers must be greater than or equal to 1",
 | 
				
			||||||
 | 
						}, {
 | 
				
			||||||
 | 
							name: "containerLogMonitorInterval must be a positive time duration",
 | 
				
			||||||
 | 
							configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
 | 
				
			||||||
 | 
								conf.ContainerLogMonitorInterval = metav1.Duration{Duration: -1 * time.Second}
 | 
				
			||||||
 | 
								return conf
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							errMsg: "invalid configuration: containerLogMonitorInterval must be a positive time duration greater than or equal to 3s",
 | 
				
			||||||
 | 
						}, {
 | 
				
			||||||
 | 
							name: "containerLogMonitorInterval must be at least 3s or higher",
 | 
				
			||||||
 | 
							configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
 | 
				
			||||||
 | 
								conf.ContainerLogMonitorInterval = metav1.Duration{Duration: 2 * time.Second}
 | 
				
			||||||
 | 
								return conf
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							errMsg: "invalid configuration: containerLogMonitorInterval must be a positive time duration greater than or equal to 3s",
 | 
				
			||||||
	}}
 | 
						}}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for _, tc := range cases {
 | 
						for _, tc := range cases {
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										1
									
								
								pkg/kubelet/apis/config/zz_generated.deepcopy.go
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										1
									
								
								pkg/kubelet/apis/config/zz_generated.deepcopy.go
									
									
									
										generated
									
									
									
								
							@@ -270,6 +270,7 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	out.MemorySwap = in.MemorySwap
 | 
						out.MemorySwap = in.MemorySwap
 | 
				
			||||||
 | 
						out.ContainerLogMonitorInterval = in.ContainerLogMonitorInterval
 | 
				
			||||||
	if in.AllowedUnsafeSysctls != nil {
 | 
						if in.AllowedUnsafeSysctls != nil {
 | 
				
			||||||
		in, out := &in.AllowedUnsafeSysctls, &out.AllowedUnsafeSysctls
 | 
							in, out := &in.AllowedUnsafeSysctls, &out.AllowedUnsafeSysctls
 | 
				
			||||||
		*out = make([]string, len(*in))
 | 
							*out = make([]string, len(*in))
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -623,6 +623,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
 | 
				
			|||||||
		kubeDeps.OSInterface,
 | 
							kubeDeps.OSInterface,
 | 
				
			||||||
		kubeCfg.ContainerLogMaxSize,
 | 
							kubeCfg.ContainerLogMaxSize,
 | 
				
			||||||
		int(kubeCfg.ContainerLogMaxFiles),
 | 
							int(kubeCfg.ContainerLogMaxFiles),
 | 
				
			||||||
 | 
							int(kubeCfg.ContainerLogMaxWorkers),
 | 
				
			||||||
 | 
							kubeCfg.ContainerLogMonitorInterval,
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
 | 
							return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -94,7 +94,7 @@ func (f *fakePodPullingTimeRecorder) RecordImageFinishedPulling(podUID types.UID
 | 
				
			|||||||
func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring, tracer trace.Tracer) (*kubeGenericRuntimeManager, error) {
 | 
					func newFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring, tracer trace.Tracer) (*kubeGenericRuntimeManager, error) {
 | 
				
			||||||
	ctx := context.Background()
 | 
						ctx := context.Background()
 | 
				
			||||||
	recorder := &record.FakeRecorder{}
 | 
						recorder := &record.FakeRecorder{}
 | 
				
			||||||
	logManager, err := logs.NewContainerLogManager(runtimeService, osInterface, "1", 2)
 | 
						logManager, err := logs.NewContainerLogManager(runtimeService, osInterface, "1", 2, 10, metav1.Duration{Duration: 10 * time.Second})
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, err
 | 
							return nil, err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -26,11 +26,12 @@ import (
 | 
				
			|||||||
	"sort"
 | 
						"sort"
 | 
				
			||||||
	"strings"
 | 
						"strings"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"k8s.io/client-go/util/workqueue"
 | 
				
			||||||
	"k8s.io/klog/v2"
 | 
						"k8s.io/klog/v2"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/api/resource"
 | 
						"k8s.io/apimachinery/pkg/api/resource"
 | 
				
			||||||
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						"k8s.io/apimachinery/pkg/util/wait"
 | 
				
			||||||
	internalapi "k8s.io/cri-api/pkg/apis"
 | 
						internalapi "k8s.io/cri-api/pkg/apis"
 | 
				
			||||||
	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
 | 
						runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
 | 
				
			||||||
@@ -39,9 +40,6 @@ import (
 | 
				
			|||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
	// logMonitorPeriod is the period container log manager monitors
 | 
					 | 
				
			||||||
	// container logs and performs log rotation.
 | 
					 | 
				
			||||||
	logMonitorPeriod = 10 * time.Second
 | 
					 | 
				
			||||||
	// timestampFormat is format of the timestamp suffix for rotated log.
 | 
						// timestampFormat is format of the timestamp suffix for rotated log.
 | 
				
			||||||
	// See https://golang.org/pkg/time/#Time.Format.
 | 
						// See https://golang.org/pkg/time/#Time.Format.
 | 
				
			||||||
	timestampFormat = "20060102-150405"
 | 
						timestampFormat = "20060102-150405"
 | 
				
			||||||
@@ -148,10 +146,13 @@ type containerLogManager struct {
 | 
				
			|||||||
	policy           LogRotatePolicy
 | 
						policy           LogRotatePolicy
 | 
				
			||||||
	clock            clock.Clock
 | 
						clock            clock.Clock
 | 
				
			||||||
	mutex            sync.Mutex
 | 
						mutex            sync.Mutex
 | 
				
			||||||
 | 
						queue            workqueue.RateLimitingInterface
 | 
				
			||||||
 | 
						maxWorkers       int
 | 
				
			||||||
 | 
						monitoringPeriod metav1.Duration
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewContainerLogManager creates a new container log manager.
 | 
					// NewContainerLogManager creates a new container log manager.
 | 
				
			||||||
func NewContainerLogManager(runtimeService internalapi.RuntimeService, osInterface kubecontainer.OSInterface, maxSize string, maxFiles int) (ContainerLogManager, error) {
 | 
					func NewContainerLogManager(runtimeService internalapi.RuntimeService, osInterface kubecontainer.OSInterface, maxSize string, maxFiles int, maxWorkers int, monitorInterval metav1.Duration) (ContainerLogManager, error) {
 | 
				
			||||||
	if maxFiles <= 1 {
 | 
						if maxFiles <= 1 {
 | 
				
			||||||
		return nil, fmt.Errorf("invalid MaxFiles %d, must be > 1", maxFiles)
 | 
							return nil, fmt.Errorf("invalid MaxFiles %d, must be > 1", maxFiles)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@@ -173,18 +174,26 @@ func NewContainerLogManager(runtimeService internalapi.RuntimeService, osInterfa
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
		clock:            clock.RealClock{},
 | 
							clock:            clock.RealClock{},
 | 
				
			||||||
		mutex:            sync.Mutex{},
 | 
							mutex:            sync.Mutex{},
 | 
				
			||||||
 | 
							maxWorkers:       maxWorkers,
 | 
				
			||||||
 | 
							queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubelet_log_rotate_manager"),
 | 
				
			||||||
 | 
							monitoringPeriod: monitorInterval,
 | 
				
			||||||
	}, nil
 | 
						}, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Start the container log manager.
 | 
					// Start the container log manager.
 | 
				
			||||||
func (c *containerLogManager) Start() {
 | 
					func (c *containerLogManager) Start() {
 | 
				
			||||||
	ctx := context.Background()
 | 
						ctx := context.Background()
 | 
				
			||||||
 | 
						klog.InfoS("Initializing container log rotate workers", "workers", c.maxWorkers, "monitorPeriod", c.monitoringPeriod)
 | 
				
			||||||
 | 
						for i := 0; i < c.maxWorkers; i++ {
 | 
				
			||||||
 | 
							worker := i + 1
 | 
				
			||||||
 | 
							go c.processQueueItems(ctx, worker)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	// Start a goroutine periodically does container log rotation.
 | 
						// Start a goroutine periodically does container log rotation.
 | 
				
			||||||
	go wait.Forever(func() {
 | 
						go wait.Forever(func() {
 | 
				
			||||||
		if err := c.rotateLogs(ctx); err != nil {
 | 
							if err := c.rotateLogs(ctx); err != nil {
 | 
				
			||||||
			klog.ErrorS(err, "Failed to rotate container logs")
 | 
								klog.ErrorS(err, "Failed to rotate container logs")
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}, logMonitorPeriod)
 | 
						}, c.monitoringPeriod.Duration)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Clean removes all logs of specified container (including rotated one).
 | 
					// Clean removes all logs of specified container (including rotated one).
 | 
				
			||||||
@@ -213,63 +222,89 @@ func (c *containerLogManager) Clean(ctx context.Context, containerID string) err
 | 
				
			|||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c *containerLogManager) processQueueItems(ctx context.Context, worker int) {
 | 
				
			||||||
 | 
						klog.V(4).InfoS("Starting container log rotation worker", "workerID", worker)
 | 
				
			||||||
 | 
						for c.processContainer(ctx, worker) {
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						klog.V(4).InfoS("Terminating container log rotation worker", "workerID", worker)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *containerLogManager) rotateLogs(ctx context.Context) error {
 | 
					func (c *containerLogManager) rotateLogs(ctx context.Context) error {
 | 
				
			||||||
	c.mutex.Lock()
 | 
						c.mutex.Lock()
 | 
				
			||||||
	defer c.mutex.Unlock()
 | 
						defer c.mutex.Unlock()
 | 
				
			||||||
 | 
						klog.V(4).InfoS("Starting container log rotation sequence")
 | 
				
			||||||
	// TODO(#59998): Use kubelet pod cache.
 | 
						// TODO(#59998): Use kubelet pod cache.
 | 
				
			||||||
	containers, err := c.runtimeService.ListContainers(ctx, &runtimeapi.ContainerFilter{})
 | 
						containers, err := c.runtimeService.ListContainers(ctx, &runtimeapi.ContainerFilter{})
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return fmt.Errorf("failed to list containers: %v", err)
 | 
							return fmt.Errorf("failed to list containers: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// NOTE(random-liu): Figure out whether we need to rotate container logs in parallel.
 | 
					 | 
				
			||||||
	for _, container := range containers {
 | 
						for _, container := range containers {
 | 
				
			||||||
		// Only rotate logs for running containers. Non-running containers won't
 | 
							// Only rotate logs for running containers. Non-running containers won't
 | 
				
			||||||
		// generate new output, it doesn't make sense to keep an empty latest log.
 | 
							// generate new output, it doesn't make sense to keep an empty latest log.
 | 
				
			||||||
		if container.GetState() != runtimeapi.ContainerState_CONTAINER_RUNNING {
 | 
							if container.GetState() != runtimeapi.ContainerState_CONTAINER_RUNNING {
 | 
				
			||||||
			continue
 | 
								continue
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		id := container.GetId()
 | 
							// Doing this to avoid additional overhead with logging of label like arguments that can prove costly
 | 
				
			||||||
		// Note that we should not block log rotate for an error of a single container.
 | 
							if v := klog.V(4); v.Enabled() {
 | 
				
			||||||
 | 
								klog.V(4).InfoS("Adding new entry to the queue for processing", "id", container.GetId(), "name", container.Metadata.GetName(), "labels", container.GetLabels())
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							c.queue.Add(container.GetId())
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (c *containerLogManager) processContainer(ctx context.Context, worker int) (ok bool) {
 | 
				
			||||||
 | 
						key, quit := c.queue.Get()
 | 
				
			||||||
 | 
						if quit {
 | 
				
			||||||
 | 
							return false
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						defer func() {
 | 
				
			||||||
 | 
							c.queue.Done(key)
 | 
				
			||||||
 | 
							c.queue.Forget(key)
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
						// Always default the return to true to keep the processing of Queue ongoing
 | 
				
			||||||
 | 
						ok = true
 | 
				
			||||||
 | 
						id := key.(string)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	resp, err := c.runtimeService.ContainerStatus(ctx, id, false)
 | 
						resp, err := c.runtimeService.ContainerStatus(ctx, id, false)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
			klog.ErrorS(err, "Failed to get container status", "containerID", id)
 | 
							klog.ErrorS(err, "Failed to get container status", "worker", worker, "containerID", id)
 | 
				
			||||||
			continue
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if resp.GetStatus() == nil {
 | 
						if resp.GetStatus() == nil {
 | 
				
			||||||
			klog.ErrorS(err, "Container status is nil", "containerID", id)
 | 
							klog.ErrorS(err, "Container status is nil", "worker", worker, "containerID", id)
 | 
				
			||||||
			continue
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	path := resp.GetStatus().GetLogPath()
 | 
						path := resp.GetStatus().GetLogPath()
 | 
				
			||||||
	info, err := c.osInterface.Stat(path)
 | 
						info, err := c.osInterface.Stat(path)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		if !os.IsNotExist(err) {
 | 
							if !os.IsNotExist(err) {
 | 
				
			||||||
				klog.ErrorS(err, "Failed to stat container log", "path", path)
 | 
								klog.ErrorS(err, "Failed to stat container log", "worker", worker, "containerID", id, "path", path)
 | 
				
			||||||
				continue
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
			// In rotateLatestLog, there are several cases that we may
 | 
					
 | 
				
			||||||
			// lose original container log after ReopenContainerLog fails.
 | 
							if err = c.runtimeService.ReopenContainerLog(ctx, id); err != nil {
 | 
				
			||||||
			// We try to recover it by reopening container log.
 | 
								klog.ErrorS(err, "Container log doesn't exist, reopen container log failed", "worker", worker, "containerID", id, "path", path)
 | 
				
			||||||
			if err := c.runtimeService.ReopenContainerLog(ctx, id); err != nil {
 | 
								return
 | 
				
			||||||
				klog.ErrorS(err, "Container log doesn't exist, reopen container log failed", "containerID", id, "path", path)
 | 
					 | 
				
			||||||
				continue
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
			// The container log should be recovered.
 | 
					
 | 
				
			||||||
		info, err = c.osInterface.Stat(path)
 | 
							info, err = c.osInterface.Stat(path)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
				klog.ErrorS(err, "Failed to stat container log after reopen", "path", path)
 | 
								klog.ErrorS(err, "Failed to stat container log after reopen", "worker", worker, "containerID", id, "path", path)
 | 
				
			||||||
				continue
 | 
								return
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if info.Size() < c.policy.MaxSize {
 | 
						if info.Size() < c.policy.MaxSize {
 | 
				
			||||||
			continue
 | 
							klog.V(7).InfoS("log file doesn't need to be rotated", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
		// Perform log rotation.
 | 
					
 | 
				
			||||||
	if err := c.rotateLog(ctx, id, path); err != nil {
 | 
						if err := c.rotateLog(ctx, id, path); err != nil {
 | 
				
			||||||
			klog.ErrorS(err, "Failed to rotate log for container", "path", path, "containerID", id)
 | 
							klog.ErrorS(err, "Failed to rotate log for container", "worker", worker, "containerID", id, "path", path, "currentSize", info.Size(), "maxSize", c.policy.MaxSize)
 | 
				
			||||||
			continue
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	}
 | 
						return
 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (c *containerLogManager) rotateLog(ctx context.Context, id, log string) error {
 | 
					func (c *containerLogManager) rotateLog(ctx context.Context, id, log string) error {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -23,11 +23,15 @@ import (
 | 
				
			|||||||
	"io"
 | 
						"io"
 | 
				
			||||||
	"os"
 | 
						"os"
 | 
				
			||||||
	"path/filepath"
 | 
						"path/filepath"
 | 
				
			||||||
 | 
						"sync"
 | 
				
			||||||
	"testing"
 | 
						"testing"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/stretchr/testify/assert"
 | 
						"github.com/stretchr/testify/assert"
 | 
				
			||||||
	"github.com/stretchr/testify/require"
 | 
						"github.com/stretchr/testify/require"
 | 
				
			||||||
 | 
						v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/util/wait"
 | 
				
			||||||
 | 
						"k8s.io/client-go/util/workqueue"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/kubelet/container"
 | 
						"k8s.io/kubernetes/pkg/kubelet/container"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
 | 
						runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
 | 
				
			||||||
@@ -94,6 +98,10 @@ func TestRotateLogs(t *testing.T) {
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
		osInterface:      container.RealOS{},
 | 
							osInterface:      container.RealOS{},
 | 
				
			||||||
		clock:            testingclock.NewFakeClock(now),
 | 
							clock:            testingclock.NewFakeClock(now),
 | 
				
			||||||
 | 
							mutex:            sync.Mutex{},
 | 
				
			||||||
 | 
							queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubelet_log_rotate_manager"),
 | 
				
			||||||
 | 
							maxWorkers:       10,
 | 
				
			||||||
 | 
							monitoringPeriod: v1.Duration{Duration: 10 * time.Second},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	testLogs := []string{
 | 
						testLogs := []string{
 | 
				
			||||||
		"test-log-1",
 | 
							"test-log-1",
 | 
				
			||||||
@@ -149,8 +157,16 @@ func TestRotateLogs(t *testing.T) {
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	f.SetFakeContainers(testContainers)
 | 
						f.SetFakeContainers(testContainers)
 | 
				
			||||||
 | 
						go c.processQueueItems(ctx, 1)
 | 
				
			||||||
	require.NoError(t, c.rotateLogs(ctx))
 | 
						require.NoError(t, c.rotateLogs(ctx))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						pollTimeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
						err = wait.PollUntilContextCancel(pollTimeoutCtx, 20*time.Millisecond, false, func(ctx context.Context) (done bool, err error) {
 | 
				
			||||||
 | 
							return c.queue.Len() == 0, nil
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						require.NoError(t, err)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	timestamp := now.Format(timestampFormat)
 | 
						timestamp := now.Format(timestampFormat)
 | 
				
			||||||
	logs, err := os.ReadDir(dir)
 | 
						logs, err := os.ReadDir(dir)
 | 
				
			||||||
	require.NoError(t, err)
 | 
						require.NoError(t, err)
 | 
				
			||||||
@@ -160,6 +176,7 @@ func TestRotateLogs(t *testing.T) {
 | 
				
			|||||||
	assert.Equal(t, testLogs[4]+compressSuffix, logs[2].Name())
 | 
						assert.Equal(t, testLogs[4]+compressSuffix, logs[2].Name())
 | 
				
			||||||
	assert.Equal(t, testLogs[2]+"."+timestamp, logs[3].Name())
 | 
						assert.Equal(t, testLogs[2]+"."+timestamp, logs[3].Name())
 | 
				
			||||||
	assert.Equal(t, testLogs[3], logs[4].Name())
 | 
						assert.Equal(t, testLogs[3], logs[4].Name())
 | 
				
			||||||
 | 
						c.queue.ShutDown()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestClean(t *testing.T) {
 | 
					func TestClean(t *testing.T) {
 | 
				
			||||||
@@ -182,6 +199,10 @@ func TestClean(t *testing.T) {
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
		osInterface:      container.RealOS{},
 | 
							osInterface:      container.RealOS{},
 | 
				
			||||||
		clock:            testingclock.NewFakeClock(now),
 | 
							clock:            testingclock.NewFakeClock(now),
 | 
				
			||||||
 | 
							mutex:            sync.Mutex{},
 | 
				
			||||||
 | 
							queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubelet_log_rotate_manager"),
 | 
				
			||||||
 | 
							maxWorkers:       10,
 | 
				
			||||||
 | 
							monitoringPeriod: v1.Duration{Duration: 10 * time.Second},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	testLogs := []string{
 | 
						testLogs := []string{
 | 
				
			||||||
		"test-log-1",
 | 
							"test-log-1",
 | 
				
			||||||
@@ -387,6 +408,10 @@ func TestRotateLatestLog(t *testing.T) {
 | 
				
			|||||||
			policy:           LogRotatePolicy{MaxFiles: test.maxFiles},
 | 
								policy:           LogRotatePolicy{MaxFiles: test.maxFiles},
 | 
				
			||||||
			osInterface:      container.RealOS{},
 | 
								osInterface:      container.RealOS{},
 | 
				
			||||||
			clock:            testingclock.NewFakeClock(now),
 | 
								clock:            testingclock.NewFakeClock(now),
 | 
				
			||||||
 | 
								mutex:            sync.Mutex{},
 | 
				
			||||||
 | 
								queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubelet_log_rotate_manager"),
 | 
				
			||||||
 | 
								maxWorkers:       10,
 | 
				
			||||||
 | 
								monitoringPeriod: v1.Duration{Duration: 10 * time.Second},
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		if test.runtimeError != nil {
 | 
							if test.runtimeError != nil {
 | 
				
			||||||
			f.InjectError("ReopenContainerLog", test.runtimeError)
 | 
								f.InjectError("ReopenContainerLog", test.runtimeError)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -595,6 +595,21 @@ type KubeletConfiguration struct {
 | 
				
			|||||||
	// Default: 5
 | 
						// Default: 5
 | 
				
			||||||
	// +optional
 | 
						// +optional
 | 
				
			||||||
	ContainerLogMaxFiles *int32 `json:"containerLogMaxFiles,omitempty"`
 | 
						ContainerLogMaxFiles *int32 `json:"containerLogMaxFiles,omitempty"`
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// ContainerLogMaxWorkers specifies the maximum number of concurrent workers to spawn
 | 
				
			||||||
 | 
						// for performing the log rotate operations. Set this count to 1 for disabling the
 | 
				
			||||||
 | 
						// concurrent log rotation workflows
 | 
				
			||||||
 | 
						// Default: 1
 | 
				
			||||||
 | 
						// +optional
 | 
				
			||||||
 | 
						ContainerLogMaxWorkers *int32 `json:"containerLogMaxWorkers,omitempty"`
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// ContainerLogMonitorInterval specifies the duration at which the container logs are monitored
 | 
				
			||||||
 | 
						// for performing the log rotate operation. This defaults to 10 * time.Seconds. But can be
 | 
				
			||||||
 | 
						// customized to a smaller value based on the log generation rate and the size required to be
 | 
				
			||||||
 | 
						// rotated against
 | 
				
			||||||
 | 
						// Default: 10s
 | 
				
			||||||
 | 
						// +optional
 | 
				
			||||||
 | 
						ContainerLogMonitorInterval *metav1.Duration `json:"containerLogMonitorInterval,omitempty"`
 | 
				
			||||||
	// configMapAndSecretChangeDetectionStrategy is a mode in which ConfigMap and Secret
 | 
						// configMapAndSecretChangeDetectionStrategy is a mode in which ConfigMap and Secret
 | 
				
			||||||
	// managers are running. Valid values include:
 | 
						// managers are running. Valid values include:
 | 
				
			||||||
	//
 | 
						//
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -384,6 +384,16 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
 | 
				
			|||||||
		*out = new(int32)
 | 
							*out = new(int32)
 | 
				
			||||||
		**out = **in
 | 
							**out = **in
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if in.ContainerLogMaxWorkers != nil {
 | 
				
			||||||
 | 
							in, out := &in.ContainerLogMaxWorkers, &out.ContainerLogMaxWorkers
 | 
				
			||||||
 | 
							*out = new(int32)
 | 
				
			||||||
 | 
							**out = **in
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if in.ContainerLogMonitorInterval != nil {
 | 
				
			||||||
 | 
							in, out := &in.ContainerLogMonitorInterval, &out.ContainerLogMonitorInterval
 | 
				
			||||||
 | 
							*out = new(v1.Duration)
 | 
				
			||||||
 | 
							**out = **in
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	if in.SystemReserved != nil {
 | 
						if in.SystemReserved != nil {
 | 
				
			||||||
		in, out := &in.SystemReserved, &out.SystemReserved
 | 
							in, out := &in.SystemReserved, &out.SystemReserved
 | 
				
			||||||
		*out = make(map[string]string, len(*in))
 | 
							*out = make(map[string]string, len(*in))
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -36,6 +36,8 @@ import (
 | 
				
			|||||||
const (
 | 
					const (
 | 
				
			||||||
	testContainerLogMaxFiles        = 3
 | 
						testContainerLogMaxFiles        = 3
 | 
				
			||||||
	testContainerLogMaxSize         = "40Ki"
 | 
						testContainerLogMaxSize         = "40Ki"
 | 
				
			||||||
 | 
						testContainerLogMaxWorkers      = 2
 | 
				
			||||||
 | 
						testContainerLogMonitorInterval = 2 * time.Second
 | 
				
			||||||
	rotationPollInterval            = 5 * time.Second
 | 
						rotationPollInterval            = 5 * time.Second
 | 
				
			||||||
	rotationEventuallyTimeout       = 3 * time.Minute
 | 
						rotationEventuallyTimeout       = 3 * time.Minute
 | 
				
			||||||
	rotationConsistentlyTimeout     = 2 * time.Minute
 | 
						rotationConsistentlyTimeout     = 2 * time.Minute
 | 
				
			||||||
@@ -106,3 +108,86 @@ var _ = SIGDescribe("ContainerLogRotation", framework.WithSlow(), framework.With
 | 
				
			|||||||
		})
 | 
							})
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
})
 | 
					})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var _ = SIGDescribe("ContainerLogRotationWithMultipleWorkers", framework.WithSlow(), framework.WithSerial(), framework.WithDisruptive(), func() {
 | 
				
			||||||
 | 
						f := framework.NewDefaultFramework("container-log-rotation-test-multi-worker")
 | 
				
			||||||
 | 
						f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
 | 
				
			||||||
 | 
						ginkgo.Context("when a container generates a lot of logs", func() {
 | 
				
			||||||
 | 
							tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) {
 | 
				
			||||||
 | 
								initialConfig.ContainerLogMaxFiles = testContainerLogMaxFiles
 | 
				
			||||||
 | 
								initialConfig.ContainerLogMaxSize = testContainerLogMaxSize
 | 
				
			||||||
 | 
								initialConfig.ContainerLogMaxWorkers = testContainerLogMaxWorkers
 | 
				
			||||||
 | 
								initialConfig.ContainerLogMonitorInterval = metav1.Duration{Duration: testContainerLogMonitorInterval}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							var logRotationPods []*v1.Pod
 | 
				
			||||||
 | 
							ginkgo.BeforeEach(func(ctx context.Context) {
 | 
				
			||||||
 | 
								ginkgo.By("create log container 1")
 | 
				
			||||||
 | 
								for _, name := range []string{"test-container-log-rotation", "test-container-log-rotation-1"} {
 | 
				
			||||||
 | 
									pod := &v1.Pod{
 | 
				
			||||||
 | 
										ObjectMeta: metav1.ObjectMeta{
 | 
				
			||||||
 | 
											Name: name,
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
										Spec: v1.PodSpec{
 | 
				
			||||||
 | 
											RestartPolicy: v1.RestartPolicyNever,
 | 
				
			||||||
 | 
											Containers: []v1.Container{
 | 
				
			||||||
 | 
												{
 | 
				
			||||||
 | 
													Name:  "log-container",
 | 
				
			||||||
 | 
													Image: busyboxImage,
 | 
				
			||||||
 | 
													Command: []string{
 | 
				
			||||||
 | 
														"sh",
 | 
				
			||||||
 | 
														"-c",
 | 
				
			||||||
 | 
														// ~12Kb/s. Exceeding 40Kb in 4 seconds. Log rotation period is 10 seconds.
 | 
				
			||||||
 | 
														"while true; do echo hello world; sleep 0.001; done;",
 | 
				
			||||||
 | 
													},
 | 
				
			||||||
 | 
												},
 | 
				
			||||||
 | 
											},
 | 
				
			||||||
 | 
										},
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									logRotationPod := e2epod.NewPodClient(f).CreateSync(ctx, pod)
 | 
				
			||||||
 | 
									logRotationPods = append(logRotationPods, logRotationPod)
 | 
				
			||||||
 | 
									ginkgo.DeferCleanup(e2epod.NewPodClient(f).DeleteSync, logRotationPod.Name, metav1.DeleteOptions{}, time.Minute)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							ginkgo.It("should be rotated and limited to a fixed amount of files", func(ctx context.Context) {
 | 
				
			||||||
 | 
								ginkgo.By("get container log path")
 | 
				
			||||||
 | 
								var logPaths []string
 | 
				
			||||||
 | 
								for _, pod := range logRotationPods {
 | 
				
			||||||
 | 
									gomega.Expect(pod.Status.ContainerStatuses).To(gomega.HaveLen(1), "log rotation pod should have one container")
 | 
				
			||||||
 | 
									id := kubecontainer.ParseContainerID(pod.Status.ContainerStatuses[0].ContainerID).ID
 | 
				
			||||||
 | 
									r, _, err := getCRIClient()
 | 
				
			||||||
 | 
									framework.ExpectNoError(err, "should connect to CRI and obtain runtime service clients and image service client")
 | 
				
			||||||
 | 
									resp, err := r.ContainerStatus(context.Background(), id, false)
 | 
				
			||||||
 | 
									framework.ExpectNoError(err)
 | 
				
			||||||
 | 
									logPaths = append(logPaths, resp.GetStatus().GetLogPath())
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								ginkgo.By("wait for container log being rotated to max file limit")
 | 
				
			||||||
 | 
								gomega.Eventually(ctx, func() (int, error) {
 | 
				
			||||||
 | 
									var logFiles []string
 | 
				
			||||||
 | 
									for _, logPath := range logPaths {
 | 
				
			||||||
 | 
										logs, err := kubelogs.GetAllLogs(logPath)
 | 
				
			||||||
 | 
										if err != nil {
 | 
				
			||||||
 | 
											return 0, err
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										logFiles = append(logFiles, logs...)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									return len(logFiles), nil
 | 
				
			||||||
 | 
								}, rotationEventuallyTimeout, rotationPollInterval).Should(gomega.Equal(testContainerLogMaxFiles*2), "should eventually rotate to max file limit")
 | 
				
			||||||
 | 
								ginkgo.By("make sure container log number won't exceed max file limit")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								gomega.Consistently(ctx, func() (int, error) {
 | 
				
			||||||
 | 
									var logFiles []string
 | 
				
			||||||
 | 
									for _, logPath := range logPaths {
 | 
				
			||||||
 | 
										logs, err := kubelogs.GetAllLogs(logPath)
 | 
				
			||||||
 | 
										if err != nil {
 | 
				
			||||||
 | 
											return 0, err
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										logFiles = append(logFiles, logs...)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									return len(logFiles), nil
 | 
				
			||||||
 | 
								}, rotationConsistentlyTimeout, rotationPollInterval).Should(gomega.BeNumerically("<=", testContainerLogMaxFiles*2), "should never exceed max file limit")
 | 
				
			||||||
 | 
							})
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
					})
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user