Allow configurable Kubelet net image for isolated networks
Public access to the DockerHub is not guaranteed in all environments, add a flag to the kubelet that allows it to use a different image (like one on a private registry) as well as only pull the first time the image is needed. Fixes #1545
This commit is contained in:
		| @@ -48,21 +48,22 @@ import ( | ||||
| const defaultRootDir = "/var/lib/kubelet" | ||||
|  | ||||
| var ( | ||||
| 	config             = flag.String("config", "", "Path to the config file or directory of files") | ||||
| 	syncFrequency      = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config") | ||||
| 	fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking config files for new data") | ||||
| 	httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data") | ||||
| 	manifestURL        = flag.String("manifest_url", "", "URL for accessing the container manifest") | ||||
| 	enableServer       = flag.Bool("enable_server", true, "Enable the info server") | ||||
| 	address            = flag.String("address", "127.0.0.1", "The address for the info server to serve on (set to 0.0.0.0 or \"\" for all interfaces)") | ||||
| 	port               = flag.Uint("port", master.KubeletPort, "The port for the info server to serve on") | ||||
| 	hostnameOverride   = flag.String("hostname_override", "", "If non-empty, will use this string as identification instead of the actual hostname.") | ||||
| 	dockerEndpoint     = flag.String("docker_endpoint", "", "If non-empty, use this for the docker endpoint to communicate with") | ||||
| 	etcdServerList     util.StringList | ||||
| 	rootDirectory      = flag.String("root_dir", defaultRootDir, "Directory path for managing kubelet files (volume mounts,etc).") | ||||
| 	allowPrivileged    = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]") | ||||
| 	registryPullQPS    = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value.  If 0, unlimited. [default=0.0]") | ||||
| 	registryBurst      = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps.  Only used if --registry_qps > 0") | ||||
| 	config                = flag.String("config", "", "Path to the config file or directory of files") | ||||
| 	syncFrequency         = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config") | ||||
| 	fileCheckFrequency    = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking config files for new data") | ||||
| 	httpCheckFrequency    = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data") | ||||
| 	manifestURL           = flag.String("manifest_url", "", "URL for accessing the container manifest") | ||||
| 	enableServer          = flag.Bool("enable_server", true, "Enable the info server") | ||||
| 	address               = flag.String("address", "127.0.0.1", "The address for the info server to serve on (set to 0.0.0.0 or \"\" for all interfaces)") | ||||
| 	port                  = flag.Uint("port", master.KubeletPort, "The port for the info server to serve on") | ||||
| 	hostnameOverride      = flag.String("hostname_override", "", "If non-empty, will use this string as identification instead of the actual hostname.") | ||||
| 	networkContainerImage = flag.String("network_container_image", kubelet.NetworkContainerImage, "The image that network containers in each pod will use.") | ||||
| 	dockerEndpoint        = flag.String("docker_endpoint", "", "If non-empty, use this for the docker endpoint to communicate with") | ||||
| 	etcdServerList        util.StringList | ||||
| 	rootDirectory         = flag.String("root_dir", defaultRootDir, "Directory path for managing kubelet files (volume mounts,etc).") | ||||
| 	allowPrivileged       = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]") | ||||
| 	registryPullQPS       = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value.  If 0, unlimited. [default=0.0]") | ||||
| 	registryBurst         = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps.  Only used if --registry_qps > 0") | ||||
| ) | ||||
|  | ||||
| func init() { | ||||
| @@ -159,6 +160,7 @@ func main() { | ||||
| 		cadvisorClient, | ||||
| 		etcdClient, | ||||
| 		*rootDirectory, | ||||
| 		*networkContainerImage, | ||||
| 		*syncFrequency, | ||||
| 		float32(*registryPullQPS), | ||||
| 		*registryBurst) | ||||
|   | ||||
| @@ -81,7 +81,7 @@ func (f *FakeDockerClient) CreateContainer(c docker.CreateContainerOptions) (*do | ||||
| 	// This is not a very good fake. We'll just add this container's name to the list. | ||||
| 	// Docker likes to add a '/', so copy that behavior. | ||||
| 	name := "/" + c.Name | ||||
| 	f.ContainerList = append(f.ContainerList, docker.APIContainers{ID: name, Names: []string{name}}) | ||||
| 	f.ContainerList = append(f.ContainerList, docker.APIContainers{ID: name, Names: []string{name}, Image: c.Config.Image}) | ||||
| 	return &docker.Container{ID: name}, nil | ||||
| } | ||||
|  | ||||
| @@ -138,6 +138,7 @@ func (f *FakeDockerClient) InspectImage(name string) (*docker.Image, error) { | ||||
| type FakeDockerPuller struct { | ||||
| 	sync.Mutex | ||||
|  | ||||
| 	HasImages    []string | ||||
| 	ImagesPulled []string | ||||
|  | ||||
| 	// Every pull will return the first error here, and then reslice | ||||
| @@ -159,5 +160,15 @@ func (f *FakeDockerPuller) Pull(image string) (err error) { | ||||
| } | ||||
|  | ||||
| func (f *FakeDockerPuller) IsImagePresent(name string) (bool, error) { | ||||
| 	return true, nil | ||||
| 	f.Lock() | ||||
| 	defer f.Unlock() | ||||
| 	if f.HasImages == nil { | ||||
| 		return true, nil | ||||
| 	} | ||||
| 	for _, s := range f.HasImages { | ||||
| 		if s == name { | ||||
| 			return true, nil | ||||
| 		} | ||||
| 	} | ||||
| 	return false, nil | ||||
| } | ||||
|   | ||||
| @@ -67,21 +67,23 @@ func NewMainKubelet( | ||||
| 	cc CadvisorInterface, | ||||
| 	ec tools.EtcdClient, | ||||
| 	rd string, | ||||
| 	ni string, | ||||
| 	ri time.Duration, | ||||
| 	pullQPS float32, | ||||
| 	pullBurst int) *Kubelet { | ||||
| 	return &Kubelet{ | ||||
| 		hostname:       hn, | ||||
| 		dockerClient:   dc, | ||||
| 		cadvisorClient: cc, | ||||
| 		etcdClient:     ec, | ||||
| 		rootDirectory:  rd, | ||||
| 		resyncInterval: ri, | ||||
| 		podWorkers:     newPodWorkers(), | ||||
| 		runner:         dockertools.NewDockerContainerCommandRunner(), | ||||
| 		httpClient:     &http.Client{}, | ||||
| 		pullQPS:        pullQPS, | ||||
| 		pullBurst:      pullBurst, | ||||
| 		hostname:              hn, | ||||
| 		dockerClient:          dc, | ||||
| 		cadvisorClient:        cc, | ||||
| 		etcdClient:            ec, | ||||
| 		rootDirectory:         rd, | ||||
| 		resyncInterval:        ri, | ||||
| 		networkContainerImage: ni, | ||||
| 		podWorkers:            newPodWorkers(), | ||||
| 		runner:                dockertools.NewDockerContainerCommandRunner(), | ||||
| 		httpClient:            &http.Client{}, | ||||
| 		pullQPS:               pullQPS, | ||||
| 		pullBurst:             pullBurst, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -89,11 +91,12 @@ func NewMainKubelet( | ||||
| // TODO: add more integration tests, and expand parameter list as needed. | ||||
| func NewIntegrationTestKubelet(hn string, dc dockertools.DockerInterface) *Kubelet { | ||||
| 	return &Kubelet{ | ||||
| 		hostname:       hn, | ||||
| 		dockerClient:   dc, | ||||
| 		dockerPuller:   &dockertools.FakeDockerPuller{}, | ||||
| 		resyncInterval: 3 * time.Second, | ||||
| 		podWorkers:     newPodWorkers(), | ||||
| 		hostname:              hn, | ||||
| 		dockerClient:          dc, | ||||
| 		dockerPuller:          &dockertools.FakeDockerPuller{}, | ||||
| 		networkContainerImage: NetworkContainerImage, | ||||
| 		resyncInterval:        3 * time.Second, | ||||
| 		podWorkers:            newPodWorkers(), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -103,11 +106,12 @@ type httpGetInterface interface { | ||||
|  | ||||
| // Kubelet is the main kubelet implementation. | ||||
| type Kubelet struct { | ||||
| 	hostname       string | ||||
| 	dockerClient   dockertools.DockerInterface | ||||
| 	rootDirectory  string | ||||
| 	podWorkers     podWorkers | ||||
| 	resyncInterval time.Duration | ||||
| 	hostname              string | ||||
| 	dockerClient          dockertools.DockerInterface | ||||
| 	rootDirectory         string | ||||
| 	networkContainerImage string | ||||
| 	podWorkers            podWorkers | ||||
| 	resyncInterval        time.Duration | ||||
|  | ||||
| 	// Optional, no events will be sent without it | ||||
| 	etcdClient tools.EtcdClient | ||||
| @@ -368,7 +372,7 @@ func (kl *Kubelet) killContainerByID(ID, name string) error { | ||||
|  | ||||
| const ( | ||||
| 	networkContainerName  = "net" | ||||
| 	networkContainerImage = "kubernetes/pause:latest" | ||||
| 	NetworkContainerImage = "kubernetes/pause:latest" | ||||
| ) | ||||
|  | ||||
| // createNetworkContainer starts the network container for a pod. Returns the docker container ID of the newly created container. | ||||
| @@ -381,12 +385,19 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (dockertools.DockerID, error | ||||
| 	} | ||||
| 	container := &api.Container{ | ||||
| 		Name:  networkContainerName, | ||||
| 		Image: networkContainerImage, | ||||
| 		Image: kl.networkContainerImage, | ||||
| 		Ports: ports, | ||||
| 	} | ||||
| 	if err := kl.dockerPuller.Pull(networkContainerImage); err != nil { | ||||
| 	// TODO: make this a TTL based pull (if image older than X policy, pull) | ||||
| 	ok, err := kl.dockerPuller.IsImagePresent(container.Image) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 	if !ok { | ||||
| 		if err := kl.dockerPuller.Pull(container.Image); err != nil { | ||||
| 			return "", err | ||||
| 		} | ||||
| 	} | ||||
| 	return kl.runContainer(pod, container, nil, "") | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -22,6 +22,7 @@ import ( | ||||
| 	"reflect" | ||||
| 	"regexp" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| @@ -206,6 +207,7 @@ func matchString(t *testing.T, pattern, str string) bool { | ||||
|  | ||||
| func TestSyncPodsCreatesNetAndContainer(t *testing.T) { | ||||
| 	kubelet, _, fakeDocker := newTestKubelet(t) | ||||
| 	kubelet.networkContainerImage = "custom_image_name" | ||||
| 	fakeDocker.ContainerList = []docker.APIContainers{} | ||||
| 	err := kubelet.SyncPods([]Pod{ | ||||
| 		{ | ||||
| @@ -228,6 +230,57 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { | ||||
| 		"list", "list", "create", "start", "list", "inspect", "list", "create", "start"}) | ||||
|  | ||||
| 	fakeDocker.Lock() | ||||
|  | ||||
| 	found := false | ||||
| 	for _, c := range fakeDocker.ContainerList { | ||||
| 		if c.Image == "custom_image_name" && strings.HasPrefix(c.Names[0], "/k8s_net") { | ||||
| 			found = true | ||||
| 		} | ||||
| 	} | ||||
| 	if !found { | ||||
| 		t.Errorf("Custom net container not found: %v", fakeDocker.ContainerList) | ||||
| 	} | ||||
|  | ||||
| 	if len(fakeDocker.Created) != 2 || | ||||
| 		!matchString(t, "k8s_net\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) || | ||||
| 		!matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[1]) { | ||||
| 		t.Errorf("Unexpected containers created %v", fakeDocker.Created) | ||||
| 	} | ||||
| 	fakeDocker.Unlock() | ||||
| } | ||||
|  | ||||
| func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { | ||||
| 	kubelet, _, fakeDocker := newTestKubelet(t) | ||||
| 	puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller) | ||||
| 	puller.HasImages = []string{} | ||||
| 	kubelet.networkContainerImage = "custom_image_name" | ||||
| 	fakeDocker.ContainerList = []docker.APIContainers{} | ||||
| 	err := kubelet.SyncPods([]Pod{ | ||||
| 		{ | ||||
| 			Name:      "foo", | ||||
| 			Namespace: "test", | ||||
| 			Manifest: api.ContainerManifest{ | ||||
| 				ID: "foo", | ||||
| 				Containers: []api.Container{ | ||||
| 					{Name: "bar"}, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("unexpected error: %v", err) | ||||
| 	} | ||||
| 	kubelet.drainWorkers() | ||||
|  | ||||
| 	verifyCalls(t, fakeDocker, []string{ | ||||
| 		"list", "list", "create", "start", "list", "inspect", "list", "create", "start"}) | ||||
|  | ||||
| 	fakeDocker.Lock() | ||||
|  | ||||
| 	if !reflect.DeepEqual(puller.ImagesPulled, []string{"custom_image_name", ""}) { | ||||
| 		t.Errorf("Unexpected pulled containers: %v", puller.ImagesPulled) | ||||
| 	} | ||||
|  | ||||
| 	if len(fakeDocker.Created) != 2 || | ||||
| 		!matchString(t, "k8s_net\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) || | ||||
| 		!matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[1]) { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Clayton Coleman
					Clayton Coleman