/* Copyright 2014 Google Inc. All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package dockertools import ( "fmt" "hash/adler32" "io" "math/rand" "os" "strconv" "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/credentialprovider" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/leaky" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/docker/docker/pkg/parsers" docker "github.com/fsouza/go-dockerclient" "github.com/golang/glog" ) const ( PodInfraContainerName = leaky.PodInfraContainerName DockerPrefix = "docker://" PodInfraContainerImage = "gcr.io/google_containers/pause:0.8.0" ) const ( // Taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc minShares = 2 sharesPerCPU = 1024 milliCPUToCPU = 1000 ) // DockerInterface is an abstract interface for testability. It abstracts the interface of docker.Client. type DockerInterface interface { ListContainers(options docker.ListContainersOptions) ([]docker.APIContainers, error) InspectContainer(id string) (*docker.Container, error) CreateContainer(docker.CreateContainerOptions) (*docker.Container, error) StartContainer(id string, hostConfig *docker.HostConfig) error StopContainer(id string, timeout uint) error RemoveContainer(opts docker.RemoveContainerOptions) error InspectImage(image string) (*docker.Image, error) ListImages(opts docker.ListImagesOptions) ([]docker.APIImages, error) PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error RemoveImage(image string) error Logs(opts docker.LogsOptions) error Version() (*docker.Env, error) Info() (*docker.Env, error) CreateExec(docker.CreateExecOptions) (*docker.Exec, error) StartExec(string, docker.StartExecOptions) error } // DockerID is an ID of docker container. It is a type to make it clear when we're working with docker container Ids type DockerID string // KubeletContainerName encapsulates a pod name and a Kubernetes container name. type KubeletContainerName struct { PodFullName string PodUID types.UID ContainerName string } // DockerPuller is an abstract interface for testability. It abstracts image pull operations. type DockerPuller interface { Pull(image string) error IsImagePresent(image string) (bool, error) } // dockerPuller is the default implementation of DockerPuller. type dockerPuller struct { client DockerInterface keyring credentialprovider.DockerKeyring } type throttledDockerPuller struct { puller dockerPuller limiter util.RateLimiter } // newDockerPuller creates a new instance of the default implementation of DockerPuller. func newDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller { dp := dockerPuller{ client: client, keyring: credentialprovider.NewDockerKeyring(), } if qps == 0.0 { return dp } return &throttledDockerPuller{ puller: dp, limiter: util.NewTokenBucketRateLimiter(qps, burst), } } func parseImageName(image string) (string, string) { return parsers.ParseRepositoryTag(image) } func (p dockerPuller) Pull(image string) error { repoToPull, tag := parseImageName(image) // If no tag was specified, use the default "latest". if len(tag) == 0 { tag = "latest" } opts := docker.PullImageOptions{ Repository: repoToPull, Tag: tag, } creds, ok := p.keyring.Lookup(repoToPull) if !ok { glog.V(1).Infof("Pulling image %s without credentials", image) } err := p.client.PullImage(opts, creds) // If there was no error, or we had credentials, just return the error. if err == nil || ok { return err } // Image spec: [/]/[: 1 { hash, err = strconv.ParseUint(nameParts[1], 16, 32) if err != nil { glog.Warningf("invalid container hash %q in container %q", nameParts[1], name) } } podFullName := parts[2] + "_" + parts[3] podUID := types.UID(parts[4]) return &KubeletContainerName{podFullName, podUID, containerName}, hash, nil } // Get a docker endpoint, either from the string passed in, or $DOCKER_HOST environment variables func getDockerEndpoint(dockerEndpoint string) string { var endpoint string if len(dockerEndpoint) > 0 { endpoint = dockerEndpoint } else if len(os.Getenv("DOCKER_HOST")) > 0 { endpoint = os.Getenv("DOCKER_HOST") } else { endpoint = "unix:///var/run/docker.sock" } glog.Infof("Connecting to docker on %s", endpoint) return endpoint } func ConnectToDockerOrDie(dockerEndpoint string) DockerInterface { if dockerEndpoint == "fake://" { return &FakeDockerClient{ VersionInfo: []string{"apiVersion=1.16"}, } } client, err := docker.NewClient(getDockerEndpoint(dockerEndpoint)) if err != nil { glog.Fatal("Couldn't connect to docker.") } return client } // TODO(yifan): Move this to container.Runtime. type ContainerCommandRunner interface { RunInContainer(containerID string, cmd []string) ([]byte, error) ExecInContainer(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error } func milliCPUToShares(milliCPU int64) int64 { if milliCPU == 0 { // zero milliCPU means unset. Use kernel default. return 0 } // Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding. shares := (milliCPU * sharesPerCPU) / milliCPUToCPU if shares < minShares { return minShares } return shares } // GetKubeletDockerContainers lists all container or just the running ones. // Returns a map of docker containers that we manage, keyed by container ID. // TODO: Move this function with dockerCache to DockerManager. func GetKubeletDockerContainers(client DockerInterface, allContainers bool) (DockerContainers, error) { result := make(DockerContainers) containers, err := client.ListContainers(docker.ListContainersOptions{All: allContainers}) if err != nil { return nil, err } for i := range containers { container := &containers[i] if len(container.Names) == 0 { continue } // Skip containers that we didn't create to allow users to manually // spin up their own containers if they want. // TODO(dchen1107): Remove the old separator "--" by end of Oct if !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"_") && !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"--") { glog.V(3).Infof("Docker Container: %s is not managed by kubelet.", container.Names[0]) continue } result[DockerID(container.ID)] = container } return result, nil }