
Automatic merge from submit-queue CRI: Add kuberuntime container logs Based on https://github.com/kubernetes/kubernetes/pull/34858. The first 2 commits are from #34858. And the last 2 commits are new. This PR added kuberuntime container logs support and add unit test for it. I've tested all the functions manually, and I'll send another PR to write a node e2e test for container log. **_Notice: current implementation doesn't support log rotation**_, which means that: - It will not retrieve logs in rotated log file. - If log rotation happens when following the log: - If the rotation is using create mode, we'll still follow the old file. - If the rotation is using copytruncate, we'll be reading at the original position and get nothing. To solve these issues, kubelet needs to rotate the log itself, or at least kubelet should be able to control the the behavior of log rotator. These are doable but out of the scope of 1.5 and will be addressed in future release. @yujuhong @feiskyer @yifan-gu /cc @kubernetes/sig-node
219 lines
7.6 KiB
Go
219 lines
7.6 KiB
Go
/*
|
|
Copyright 2016 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package kuberuntime
|
|
|
|
import (
|
|
"fmt"
|
|
"path/filepath"
|
|
"strconv"
|
|
|
|
"github.com/golang/glog"
|
|
"k8s.io/kubernetes/pkg/api"
|
|
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
|
"k8s.io/kubernetes/pkg/types"
|
|
)
|
|
|
|
const (
|
|
// Taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc
|
|
minShares = 2
|
|
sharesPerCPU = 1024
|
|
milliCPUToCPU = 1000
|
|
|
|
// 100000 is equivalent to 100ms
|
|
quotaPeriod = 100 * minQuotaPeriod
|
|
minQuotaPeriod = 1000
|
|
)
|
|
|
|
var (
|
|
// The default dns opt strings
|
|
defaultDNSOptions = []string{"ndots:5"}
|
|
)
|
|
|
|
type podsByID []*kubecontainer.Pod
|
|
|
|
func (b podsByID) Len() int { return len(b) }
|
|
func (b podsByID) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
|
|
func (b podsByID) Less(i, j int) bool { return b[i].ID < b[j].ID }
|
|
|
|
type containersByID []*kubecontainer.Container
|
|
|
|
func (b containersByID) Len() int { return len(b) }
|
|
func (b containersByID) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
|
|
func (b containersByID) Less(i, j int) bool { return b[i].ID.ID < b[j].ID.ID }
|
|
|
|
// Newest first.
|
|
type podSandboxByCreated []*runtimeApi.PodSandbox
|
|
|
|
func (p podSandboxByCreated) Len() int { return len(p) }
|
|
func (p podSandboxByCreated) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
|
func (p podSandboxByCreated) Less(i, j int) bool { return p[i].GetCreatedAt() > p[j].GetCreatedAt() }
|
|
|
|
type containerStatusByCreated []*kubecontainer.ContainerStatus
|
|
|
|
func (c containerStatusByCreated) Len() int { return len(c) }
|
|
func (c containerStatusByCreated) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
|
|
func (c containerStatusByCreated) Less(i, j int) bool { return c[i].CreatedAt.After(c[j].CreatedAt) }
|
|
|
|
// toKubeContainerState converts runtimeApi.ContainerState to kubecontainer.ContainerState.
|
|
func toKubeContainerState(state runtimeApi.ContainerState) kubecontainer.ContainerState {
|
|
switch state {
|
|
case runtimeApi.ContainerState_CONTAINER_CREATED:
|
|
return kubecontainer.ContainerStateCreated
|
|
case runtimeApi.ContainerState_CONTAINER_RUNNING:
|
|
return kubecontainer.ContainerStateRunning
|
|
case runtimeApi.ContainerState_CONTAINER_EXITED:
|
|
return kubecontainer.ContainerStateExited
|
|
case runtimeApi.ContainerState_CONTAINER_UNKNOWN:
|
|
return kubecontainer.ContainerStateUnknown
|
|
}
|
|
|
|
return kubecontainer.ContainerStateUnknown
|
|
}
|
|
|
|
// toRuntimeProtocol converts api.Protocol to runtimeApi.Protocol.
|
|
func toRuntimeProtocol(protocol api.Protocol) runtimeApi.Protocol {
|
|
switch protocol {
|
|
case api.ProtocolTCP:
|
|
return runtimeApi.Protocol_TCP
|
|
case api.ProtocolUDP:
|
|
return runtimeApi.Protocol_UDP
|
|
}
|
|
|
|
glog.Warningf("Unknown protocol %q: defaulting to TCP", protocol)
|
|
return runtimeApi.Protocol_TCP
|
|
}
|
|
|
|
// toKubeContainer converts runtimeApi.Container to kubecontainer.Container.
|
|
func (m *kubeGenericRuntimeManager) toKubeContainer(c *runtimeApi.Container) (*kubecontainer.Container, error) {
|
|
if c == nil || c.Id == nil || c.Image == nil || c.State == nil {
|
|
return nil, fmt.Errorf("unable to convert a nil pointer to a runtime container")
|
|
}
|
|
|
|
labeledInfo := getContainerInfoFromLabels(c.Labels)
|
|
annotatedInfo := getContainerInfoFromAnnotations(c.Annotations)
|
|
return &kubecontainer.Container{
|
|
ID: kubecontainer.ContainerID{Type: m.runtimeName, ID: c.GetId()},
|
|
Name: labeledInfo.ContainerName,
|
|
Image: c.Image.GetImage(),
|
|
Hash: annotatedInfo.Hash,
|
|
State: toKubeContainerState(c.GetState()),
|
|
}, nil
|
|
}
|
|
|
|
// sandboxToKubeContainer converts runtimeApi.PodSandbox to kubecontainer.Container.
|
|
// This is only needed because we need to return sandboxes as if they were
|
|
// kubecontainer.Containers to avoid substantial changes to PLEG.
|
|
// TODO: Remove this once it becomes obsolete.
|
|
func (m *kubeGenericRuntimeManager) sandboxToKubeContainer(s *runtimeApi.PodSandbox) (*kubecontainer.Container, error) {
|
|
if s == nil || s.Id == nil || s.State == nil {
|
|
return nil, fmt.Errorf("unable to convert a nil pointer to a runtime container")
|
|
}
|
|
|
|
return &kubecontainer.Container{
|
|
ID: kubecontainer.ContainerID{Type: m.runtimeName, ID: s.GetId()},
|
|
State: kubecontainer.SandboxToContainerState(s.GetState()),
|
|
}, nil
|
|
}
|
|
|
|
// getContainerSpec gets the container spec by containerName.
|
|
func getContainerSpec(pod *api.Pod, containerName string) *api.Container {
|
|
for i, c := range pod.Spec.Containers {
|
|
if containerName == c.Name {
|
|
return &pod.Spec.Containers[i]
|
|
}
|
|
}
|
|
for i, c := range pod.Spec.InitContainers {
|
|
if containerName == c.Name {
|
|
return &pod.Spec.InitContainers[i]
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// isContainerFailed returns true if container has exited and exitcode is not zero.
|
|
func isContainerFailed(status *kubecontainer.ContainerStatus) bool {
|
|
if status.State == kubecontainer.ContainerStateExited && status.ExitCode != 0 {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// milliCPUToShares converts milliCPU to CPU shares
|
|
func milliCPUToShares(milliCPU int64) int64 {
|
|
if milliCPU == 0 {
|
|
// Return 2 here to really match kernel default for zero milliCPU.
|
|
return minShares
|
|
}
|
|
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
|
|
shares := (milliCPU * sharesPerCPU) / milliCPUToCPU
|
|
if shares < minShares {
|
|
return minShares
|
|
}
|
|
return shares
|
|
}
|
|
|
|
// milliCPUToQuota converts milliCPU to CFS quota and period values
|
|
func milliCPUToQuota(milliCPU int64) (quota int64, period int64) {
|
|
// CFS quota is measured in two values:
|
|
// - cfs_period_us=100ms (the amount of time to measure usage across)
|
|
// - cfs_quota=20ms (the amount of cpu time allowed to be used across a period)
|
|
// so in the above example, you are limited to 20% of a single CPU
|
|
// for multi-cpu environments, you just scale equivalent amounts
|
|
if milliCPU == 0 {
|
|
return
|
|
}
|
|
|
|
// we set the period to 100ms by default
|
|
period = quotaPeriod
|
|
|
|
// we then convert your milliCPU to a value normalized over a period
|
|
quota = (milliCPU * quotaPeriod) / milliCPUToCPU
|
|
|
|
// quota needs to be a minimum of 1ms.
|
|
if quota < minQuotaPeriod {
|
|
quota = minQuotaPeriod
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// getStableKey generates a key (string) to uniquely identify a
|
|
// (pod, container) tuple. The key should include the content of the
|
|
// container, so that any change to the container generates a new key.
|
|
func getStableKey(pod *api.Pod, container *api.Container) string {
|
|
hash := strconv.FormatUint(kubecontainer.HashContainer(container), 16)
|
|
return fmt.Sprintf("%s_%s_%s_%s_%s", pod.Name, pod.Namespace, string(pod.UID), container.Name, hash)
|
|
}
|
|
|
|
// buildContainerLogsPath builds log path for container relative to pod logs directory.
|
|
func buildContainerLogsPath(containerName string, restartCount int) string {
|
|
return fmt.Sprintf("%s_%d.log", containerName, restartCount)
|
|
}
|
|
|
|
// buildFullContainerLogsPath builds absolute log path for container.
|
|
func buildFullContainerLogsPath(podUID types.UID, containerName string, restartCount int) string {
|
|
return filepath.Join(buildPodLogsDirectory(podUID), buildContainerLogsPath(containerName, restartCount))
|
|
}
|
|
|
|
// buildPodLogsDirectory builds absolute log directory path for a pod sandbox.
|
|
func buildPodLogsDirectory(podUID types.UID) string {
|
|
return filepath.Join(podLogsRootDirectory, string(podUID))
|
|
}
|