Kubelet: a step towards to better encapsulation of docker functions
We want to stop leaking more docker details into kubelet, and we also want to consolidate some of the existing docker interfaces/structs. This change creates DockerManager as the new home of some functions in dockertools/docker.go. It also absorbs containerRunner. In addition, GetDockerPodStatus is renamed to GetPodStatus with the entire pod passed to it so that it is simialr to the what is defined in the container Runtime interface. Eventually, DockerManager should implement the container Runtime interface, and integrate DockerCache with a flag to turn on/off caching. Code in kubelet.go should not be using docker client directly.
This commit is contained in:
@@ -19,11 +19,9 @@ package dockertools
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/adler32"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"os/exec"
|
||||
@@ -447,286 +445,6 @@ func (c DockerContainers) FindContainersByPod(podUID types.UID, podFullName stri
|
||||
return containers
|
||||
}
|
||||
|
||||
// GetKubeletDockerContainers takes client and boolean whether to list all container or just the running ones.
|
||||
// Returns a map of docker containers that we manage. The map key is the docker container ID
|
||||
func GetKubeletDockerContainers(client DockerInterface, allContainers bool) (DockerContainers, error) {
|
||||
result := make(DockerContainers)
|
||||
containers, err := client.ListContainers(docker.ListContainersOptions{All: allContainers})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := range containers {
|
||||
container := &containers[i]
|
||||
if len(container.Names) == 0 {
|
||||
continue
|
||||
}
|
||||
// Skip containers that we didn't create to allow users to manually
|
||||
// spin up their own containers if they want.
|
||||
// TODO(dchen1107): Remove the old separator "--" by end of Oct
|
||||
if !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"_") &&
|
||||
!strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"--") {
|
||||
glog.V(3).Infof("Docker Container: %s is not managed by kubelet.", container.Names[0])
|
||||
continue
|
||||
}
|
||||
result[DockerID(container.ID)] = container
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// GetRecentDockerContainersWithNameAndUUID returns a list of dead docker containers which matches the name
|
||||
// and uid given.
|
||||
func GetRecentDockerContainersWithNameAndUUID(client DockerInterface, podFullName string, uid types.UID, containerName string) ([]*docker.Container, error) {
|
||||
var result []*docker.Container
|
||||
containers, err := client.ListContainers(docker.ListContainersOptions{All: true})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, dockerContainer := range containers {
|
||||
if len(dockerContainer.Names) == 0 {
|
||||
continue
|
||||
}
|
||||
dockerName, _, err := ParseDockerName(dockerContainer.Names[0])
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if dockerName.PodFullName != podFullName {
|
||||
continue
|
||||
}
|
||||
if uid != "" && dockerName.PodUID != uid {
|
||||
continue
|
||||
}
|
||||
if dockerName.ContainerName != containerName {
|
||||
continue
|
||||
}
|
||||
inspectResult, _ := client.InspectContainer(dockerContainer.ID)
|
||||
if inspectResult != nil && !inspectResult.State.Running && !inspectResult.State.Paused {
|
||||
result = append(result, inspectResult)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// GetKubeletDockerContainerLogs returns logs of specific container
|
||||
// By default the function will return snapshot of the container log
|
||||
// Log streaming is possible if 'follow' param is set to true
|
||||
// Log tailing is possible when number of tailed lines are set and only if 'follow' is false
|
||||
// TODO: Make 'RawTerminal' option flagable.
|
||||
func GetKubeletDockerContainerLogs(client DockerInterface, containerID, tail string, follow bool, stdout, stderr io.Writer) (err error) {
|
||||
opts := docker.LogsOptions{
|
||||
Container: containerID,
|
||||
Stdout: true,
|
||||
Stderr: true,
|
||||
OutputStream: stdout,
|
||||
ErrorStream: stderr,
|
||||
Timestamps: true,
|
||||
RawTerminal: false,
|
||||
Follow: follow,
|
||||
}
|
||||
|
||||
if !follow {
|
||||
opts.Tail = tail
|
||||
}
|
||||
|
||||
err = client.Logs(opts)
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
// ErrNoContainersInPod is returned when there are no containers for a given pod
|
||||
ErrNoContainersInPod = errors.New("no containers exist for this pod")
|
||||
|
||||
// ErrNoPodInfraContainerInPod is returned when there is no pod infra container for a given pod
|
||||
ErrNoPodInfraContainerInPod = errors.New("No pod infra container exists for this pod")
|
||||
|
||||
// ErrContainerCannotRun is returned when a container is created, but cannot run properly
|
||||
ErrContainerCannotRun = errors.New("Container cannot run")
|
||||
)
|
||||
|
||||
// Internal information kept for containers from inspection
|
||||
type containerStatusResult struct {
|
||||
status api.ContainerStatus
|
||||
ip string
|
||||
err error
|
||||
}
|
||||
|
||||
func inspectContainer(client DockerInterface, dockerID, containerName, tPath string) *containerStatusResult {
|
||||
result := containerStatusResult{api.ContainerStatus{}, "", nil}
|
||||
|
||||
inspectResult, err := client.InspectContainer(dockerID)
|
||||
|
||||
if err != nil {
|
||||
result.err = err
|
||||
return &result
|
||||
}
|
||||
if inspectResult == nil {
|
||||
// Why did we not get an error?
|
||||
return &result
|
||||
}
|
||||
|
||||
glog.V(3).Infof("Container inspect result: %+v", *inspectResult)
|
||||
result.status = api.ContainerStatus{
|
||||
Name: containerName,
|
||||
Image: inspectResult.Config.Image,
|
||||
ImageID: DockerPrefix + inspectResult.Image,
|
||||
ContainerID: DockerPrefix + dockerID,
|
||||
}
|
||||
|
||||
waiting := true
|
||||
if inspectResult.State.Running {
|
||||
result.status.State.Running = &api.ContainerStateRunning{
|
||||
StartedAt: util.NewTime(inspectResult.State.StartedAt),
|
||||
}
|
||||
if containerName == PodInfraContainerName && inspectResult.NetworkSettings != nil {
|
||||
result.ip = inspectResult.NetworkSettings.IPAddress
|
||||
}
|
||||
waiting = false
|
||||
} else if !inspectResult.State.FinishedAt.IsZero() {
|
||||
reason := ""
|
||||
// Note: An application might handle OOMKilled gracefully.
|
||||
// In that case, the container is oom killed, but the exit
|
||||
// code could be 0.
|
||||
if inspectResult.State.OOMKilled {
|
||||
reason = "OOM Killed"
|
||||
} else {
|
||||
reason = inspectResult.State.Error
|
||||
}
|
||||
result.status.State.Termination = &api.ContainerStateTerminated{
|
||||
ExitCode: inspectResult.State.ExitCode,
|
||||
Reason: reason,
|
||||
StartedAt: util.NewTime(inspectResult.State.StartedAt),
|
||||
FinishedAt: util.NewTime(inspectResult.State.FinishedAt),
|
||||
}
|
||||
if tPath != "" {
|
||||
path, found := inspectResult.Volumes[tPath]
|
||||
if found {
|
||||
data, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
glog.Errorf("Error on reading termination-log %s: %v", path, err)
|
||||
} else {
|
||||
result.status.State.Termination.Message = string(data)
|
||||
}
|
||||
}
|
||||
}
|
||||
waiting = false
|
||||
}
|
||||
|
||||
if waiting {
|
||||
// TODO(dchen1107): Separate issue docker/docker#8294 was filed
|
||||
// TODO(dchen1107): Need to figure out why we are still waiting
|
||||
// Check any issue to run container
|
||||
result.status.State.Waiting = &api.ContainerStateWaiting{
|
||||
Reason: ErrContainerCannotRun.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
return &result
|
||||
}
|
||||
|
||||
// GetDockerPodStatus returns docker related status for all containers in the pod/manifest and
|
||||
// infrastructure container
|
||||
func GetDockerPodStatus(client DockerInterface, manifest api.PodSpec, podFullName string, uid types.UID) (*api.PodStatus, error) {
|
||||
var podStatus api.PodStatus
|
||||
statuses := make(map[string]api.ContainerStatus)
|
||||
|
||||
expectedContainers := make(map[string]api.Container)
|
||||
for _, container := range manifest.Containers {
|
||||
expectedContainers[container.Name] = container
|
||||
}
|
||||
expectedContainers[PodInfraContainerName] = api.Container{}
|
||||
|
||||
containers, err := client.ListContainers(docker.ListContainersOptions{All: true})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, value := range containers {
|
||||
if len(value.Names) == 0 {
|
||||
continue
|
||||
}
|
||||
dockerName, _, err := ParseDockerName(value.Names[0])
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if dockerName.PodFullName != podFullName {
|
||||
continue
|
||||
}
|
||||
if uid != "" && dockerName.PodUID != uid {
|
||||
continue
|
||||
}
|
||||
dockerContainerName := dockerName.ContainerName
|
||||
c, found := expectedContainers[dockerContainerName]
|
||||
terminationMessagePath := ""
|
||||
if !found {
|
||||
// TODO(dchen1107): should figure out why not continue here
|
||||
// continue
|
||||
} else {
|
||||
terminationMessagePath = c.TerminationMessagePath
|
||||
}
|
||||
// We assume docker return us a list of containers in time order
|
||||
if containerStatus, found := statuses[dockerContainerName]; found {
|
||||
containerStatus.RestartCount += 1
|
||||
statuses[dockerContainerName] = containerStatus
|
||||
continue
|
||||
}
|
||||
|
||||
result := inspectContainer(client, value.ID, dockerContainerName, terminationMessagePath)
|
||||
if result.err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Add user container information
|
||||
if dockerContainerName == PodInfraContainerName &&
|
||||
result.status.State.Running != nil {
|
||||
// Found network container
|
||||
podStatus.PodIP = result.ip
|
||||
} else {
|
||||
statuses[dockerContainerName] = result.status
|
||||
}
|
||||
}
|
||||
|
||||
if len(statuses) == 0 && podStatus.PodIP == "" {
|
||||
return nil, ErrNoContainersInPod
|
||||
}
|
||||
|
||||
// Not all containers expected are created, check if there are
|
||||
// image related issues
|
||||
if len(statuses) < len(manifest.Containers) {
|
||||
var containerStatus api.ContainerStatus
|
||||
for _, container := range manifest.Containers {
|
||||
if _, found := statuses[container.Name]; found {
|
||||
continue
|
||||
}
|
||||
|
||||
image := container.Image
|
||||
// Check image is ready on the node or not
|
||||
// TODO(dchen1107): docker/docker/issues/8365 to figure out if the image exists
|
||||
_, err := client.InspectImage(image)
|
||||
if err == nil {
|
||||
containerStatus.State.Waiting = &api.ContainerStateWaiting{
|
||||
Reason: fmt.Sprintf("Image: %s is ready, container is creating", image),
|
||||
}
|
||||
} else if err == docker.ErrNoSuchImage {
|
||||
containerStatus.State.Waiting = &api.ContainerStateWaiting{
|
||||
Reason: fmt.Sprintf("Image: %s is not ready on the node", image),
|
||||
}
|
||||
} else {
|
||||
containerStatus.State.Waiting = &api.ContainerStateWaiting{
|
||||
Reason: err.Error(),
|
||||
}
|
||||
}
|
||||
|
||||
statuses[container.Name] = containerStatus
|
||||
}
|
||||
}
|
||||
|
||||
podStatus.ContainerStatuses = make([]api.ContainerStatus, 0)
|
||||
for _, status := range statuses {
|
||||
podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, status)
|
||||
}
|
||||
|
||||
return &podStatus, nil
|
||||
}
|
||||
|
||||
const containerNamePrefix = "k8s"
|
||||
|
||||
func HashContainer(container *api.Container) uint64 {
|
||||
@@ -781,23 +499,6 @@ func ParseDockerName(name string) (dockerName *KubeletContainerName, hash uint64
|
||||
return &KubeletContainerName{podFullName, podUID, containerName}, hash, nil
|
||||
}
|
||||
|
||||
func GetRunningContainers(client DockerInterface, ids []string) ([]*docker.Container, error) {
|
||||
result := []*docker.Container{}
|
||||
if client == nil {
|
||||
return nil, fmt.Errorf("unexpected nil docker client.")
|
||||
}
|
||||
for ix := range ids {
|
||||
status, err := client.InspectContainer(ids[ix])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if status != nil && status.State.Running {
|
||||
result = append(result, status)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Get a docker endpoint, either from the string passed in, or $DOCKER_HOST environment variables
|
||||
func getDockerEndpoint(dockerEndpoint string) string {
|
||||
var endpoint string
|
||||
@@ -833,6 +534,47 @@ type ContainerCommandRunner interface {
|
||||
PortForward(podInfraContainerID string, port uint16, stream io.ReadWriteCloser) error
|
||||
}
|
||||
|
||||
func milliCPUToShares(milliCPU int64) int64 {
|
||||
if milliCPU == 0 {
|
||||
// zero milliCPU means unset. Use kernel default.
|
||||
return 0
|
||||
}
|
||||
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
|
||||
shares := (milliCPU * sharesPerCPU) / milliCPUToCPU
|
||||
if shares < minShares {
|
||||
return minShares
|
||||
}
|
||||
return shares
|
||||
}
|
||||
|
||||
// GetKubeletDockerContainers lists all container or just the running ones.
|
||||
// Returns a map of docker containers that we manage, keyed by container ID.
|
||||
// TODO: Move this function with dockerCache to DockerManager.
|
||||
func GetKubeletDockerContainers(client DockerInterface, allContainers bool) (DockerContainers, error) {
|
||||
result := make(DockerContainers)
|
||||
containers, err := client.ListContainers(docker.ListContainersOptions{All: allContainers})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := range containers {
|
||||
container := &containers[i]
|
||||
if len(container.Names) == 0 {
|
||||
continue
|
||||
}
|
||||
// Skip containers that we didn't create to allow users to manually
|
||||
// spin up their own containers if they want.
|
||||
// TODO(dchen1107): Remove the old separator "--" by end of Oct
|
||||
if !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"_") &&
|
||||
!strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"--") {
|
||||
glog.V(3).Infof("Docker Container: %s is not managed by kubelet.", container.Names[0])
|
||||
continue
|
||||
}
|
||||
result[DockerID(container.ID)] = container
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// TODO: Move this function with dockerCache to DockerManager.
|
||||
func GetPods(client DockerInterface, all bool) ([]*kubecontainer.Pod, error) {
|
||||
pods := make(map[types.UID]*kubecontainer.Pod)
|
||||
var result []*kubecontainer.Pod
|
||||
@@ -881,64 +623,3 @@ func GetPods(client DockerInterface, all bool) ([]*kubecontainer.Pod, error) {
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func milliCPUToShares(milliCPU int64) int64 {
|
||||
if milliCPU == 0 {
|
||||
// zero milliCPU means unset. Use kernel default.
|
||||
return 0
|
||||
}
|
||||
// Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding.
|
||||
shares := (milliCPU * sharesPerCPU) / milliCPUToCPU
|
||||
if shares < minShares {
|
||||
return minShares
|
||||
}
|
||||
return shares
|
||||
}
|
||||
|
||||
func makePortsAndBindings(container *api.Container) (map[docker.Port]struct{}, map[docker.Port][]docker.PortBinding) {
|
||||
exposedPorts := map[docker.Port]struct{}{}
|
||||
portBindings := map[docker.Port][]docker.PortBinding{}
|
||||
for _, port := range container.Ports {
|
||||
exteriorPort := port.HostPort
|
||||
if exteriorPort == 0 {
|
||||
// No need to do port binding when HostPort is not specified
|
||||
continue
|
||||
}
|
||||
interiorPort := port.ContainerPort
|
||||
// Some of this port stuff is under-documented voodoo.
|
||||
// See http://stackoverflow.com/questions/20428302/binding-a-port-to-a-host-interface-using-the-rest-api
|
||||
var protocol string
|
||||
switch strings.ToUpper(string(port.Protocol)) {
|
||||
case "UDP":
|
||||
protocol = "/udp"
|
||||
case "TCP":
|
||||
protocol = "/tcp"
|
||||
default:
|
||||
glog.Warningf("Unknown protocol %q: defaulting to TCP", port.Protocol)
|
||||
protocol = "/tcp"
|
||||
}
|
||||
dockerPort := docker.Port(strconv.Itoa(interiorPort) + protocol)
|
||||
exposedPorts[dockerPort] = struct{}{}
|
||||
portBindings[dockerPort] = []docker.PortBinding{
|
||||
{
|
||||
HostPort: strconv.Itoa(exteriorPort),
|
||||
HostIP: port.HostIP,
|
||||
},
|
||||
}
|
||||
}
|
||||
return exposedPorts, portBindings
|
||||
}
|
||||
|
||||
func makeCapabilites(capAdd []api.CapabilityType, capDrop []api.CapabilityType) ([]string, []string) {
|
||||
var (
|
||||
addCaps []string
|
||||
dropCaps []string
|
||||
)
|
||||
for _, cap := range capAdd {
|
||||
addCaps = append(addCaps, string(cap))
|
||||
}
|
||||
for _, cap := range capDrop {
|
||||
dropCaps = append(dropCaps, string(cap))
|
||||
}
|
||||
return addCaps, dropCaps
|
||||
}
|
||||
|
Reference in New Issue
Block a user