diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 90fe295aab3..f4665a069d6 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -191,13 +191,13 @@ func startComponents(manifestURL string) (apiServerURL string) { // Kubelet (localhost) testRootDir := makeTempDirOrDie("kubelet_integ_1.") glog.Infof("Using %s as root dir for kubelet #1", testRootDir) - standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault) + standalone.SimpleRunKubelet(cl, nil, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault) // Kubelet (machine) // Create a second kubelet so that the guestbook example's two redis slaves both // have a place they can schedule. testRootDir = makeTempDirOrDie("kubelet_integ_2.") glog.Infof("Using %s as root dir for kubelet #2", testRootDir) - standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault) + standalone.SimpleRunKubelet(cl, nil, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault) return apiServer.URL } diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index 722e2a28e19..dfabdaaf3d5 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -57,7 +57,7 @@ var ( allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]") registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]") registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0") - runonce = flag.Bool("runonce", false, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --etcd_servers and --enable-server") + runonce = flag.Bool("runonce", false, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --etcd_servers, --api_servers, and --enable-server") enableDebuggingHandlers = flag.Bool("enable_debugging_handlers", true, "Enables server endpoints for log collection and local running of containers and commands") minimumGCAge = flag.Duration("minimum_container_ttl_duration", 1*time.Minute, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'") maxContainerCount = flag.Int("maximum_dead_containers_per_container", 5, "Maximum number of old instances of a container to retain per container. Each container takes up some disk space. Default: 5.") @@ -73,15 +73,19 @@ var ( func init() { flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd_config") flag.Var(&address, "address", "The IP address for the info server to serve on (set to 0.0.0.0 for all interfaces)") - flag.Var(&apiServerList, "api_servers", "List of Kubernetes API servers to publish events to. (ip:port), comma separated.") + flag.Var(&apiServerList, "api_servers", "List of Kubernetes API servers for publishing events, and reading pods and services. (ip:port), comma separated.") flag.Var(&clusterDNS, "cluster_dns", "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers") } func setupRunOnce() { if *runonce { + // Don't use remote (etcd or apiserver) sources if len(etcdServerList) > 0 { glog.Fatalf("invalid option: --runonce and --etcd_servers are mutually exclusive") } + if len(apiServerList) > 0 { + glog.Fatalf("invalid option: --runonce and --api_servers are mutually exclusive") + } if *enableServer { glog.Infof("--runonce is set, disabling server") *enableServer = false @@ -97,6 +101,18 @@ func main() { verflag.PrintAndExitIfRequested() + // Cluster creation scripts support both kubernetes versions that 1) support kublet watching + // apiserver for pods, and 2) ones that don't. So they ca set both --etcd_servers and + // --api_servers. The current code will ignore the --etcd_servers flag, while older kubelet + // code will use the --etd_servers flag for pods, and use --api_servers for event publising. + // + // TODO(erictune): convert all cloud provider scripts and Google Container Engine to + // use only --api_servers, then delete --etcd_servers flag and the resulting dead code. + if len(etcdServerList) > 0 && len(apiServerList) > 0 { + glog.Infof("Both --etcd_servers and --api_servers are set. Not using etcd source.") + etcdServerList = util.StringList{} + } + setupRunOnce() if err := util.ApplyOomScoreAdj(*oomScoreAdj); err != nil { diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index a822ea7457b..3f243f6e439 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -54,7 +54,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr string standalone.RunControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory) dockerClient := util.ConnectToDockerOrDie(*dockerEndpoint) - standalone.SimpleRunKubelet(cl, etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace) + standalone.SimpleRunKubelet(cl, nil, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace) } func newApiClient(addr string, port int) *client.Client { diff --git a/pkg/kubelet/config/apiserver_test.go b/pkg/kubelet/config/apiserver_test.go index c57167b0047..936ff22aaec 100644 --- a/pkg/kubelet/config/apiserver_test.go +++ b/pkg/kubelet/config/apiserver_test.go @@ -43,25 +43,31 @@ func (lw fakePodLW) Watch(resourceVersion string) (watch.Interface, error) { var _ cache.ListerWatcher = fakePodLW{} -func TestNewSourceApiserver(t *testing.T) { - podv1 := api.Pod{ +func TestNewSourceApiserver_UpdatesAndMultiplePods(t *testing.T) { + pod1v1 := api.Pod{ ObjectMeta: api.ObjectMeta{Name: "p"}, Spec: api.PodSpec{Containers: []api.Container{{Image: "image/one"}}}} - podv2 := api.Pod{ + pod1v2 := api.Pod{ ObjectMeta: api.ObjectMeta{Name: "p"}, Spec: api.PodSpec{Containers: []api.Container{{Image: "image/two"}}}} + pod2 := api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "q"}, + Spec: api.PodSpec{Containers: []api.Container{{Image: "image/blah"}}}} - expectedBoundPodv1 := api.BoundPod{ + expectedBoundPod1v1 := api.BoundPod{ ObjectMeta: api.ObjectMeta{Name: "p", SelfLink: "/api/v1beta1/boundPods/p"}, Spec: api.PodSpec{Containers: []api.Container{{Image: "image/one"}}}} - expectedBoundPodv2 := api.BoundPod{ + expectedBoundPod1v2 := api.BoundPod{ ObjectMeta: api.ObjectMeta{Name: "p", SelfLink: "/api/v1beta1/boundPods/p"}, Spec: api.PodSpec{Containers: []api.Container{{Image: "image/two"}}}} + expectedBoundPod2 := api.BoundPod{ + ObjectMeta: api.ObjectMeta{Name: "q", SelfLink: "/api/v1beta1/boundPods/q"}, + Spec: api.PodSpec{Containers: []api.Container{{Image: "image/blah"}}}} // Setup fake api client. fakeWatch := watch.NewFake() lw := fakePodLW{ - listResp: &api.PodList{Items: []api.Pod{podv1}}, + listResp: &api.PodList{Items: []api.Pod{pod1v1}}, watchResp: fakeWatch, } @@ -74,23 +80,54 @@ func TestNewSourceApiserver(t *testing.T) { t.Errorf("Unable to read from channel when expected") } update := got.(kubelet.PodUpdate) - expected := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPodv1) + expected := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod1v1) if !api.Semantic.DeepEqual(expected, update) { t.Errorf("Expected %#v; Got %#v", expected, update) } - fakeWatch.Modify(&podv2) + // Add another pod + fakeWatch.Add(&pod2) got, ok = <-ch if !ok { t.Errorf("Unable to read from channel when expected") } update = got.(kubelet.PodUpdate) - expected = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPodv2) - if !api.Semantic.DeepEqual(expected, update) { - t.Fatalf("Expected %#v, Got %#v", expected, update) + // Could be sorted either of these two ways: + expectedA := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod1v1, expectedBoundPod2) + expectedB := CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod2, expectedBoundPod1v1) + + if !api.Semantic.DeepEqual(expectedA, update) && !api.Semantic.DeepEqual(expectedB, update) { + t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, update) } - fakeWatch.Delete(&podv2) + // Modify pod1 + fakeWatch.Modify(&pod1v2) + got, ok = <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + update = got.(kubelet.PodUpdate) + expectedA = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod1v2, expectedBoundPod2) + expectedB = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod2, expectedBoundPod1v2) + + if !api.Semantic.DeepEqual(expectedA, update) && !api.Semantic.DeepEqual(expectedB, update) { + t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, update) + } + + // Delete pod1 + fakeWatch.Delete(&pod1v2) + got, ok = <-ch + if !ok { + t.Errorf("Unable to read from channel when expected") + } + update = got.(kubelet.PodUpdate) + expected = CreatePodUpdate(kubelet.SET, kubelet.ApiserverSource, expectedBoundPod2) + if !api.Semantic.DeepEqual(expected, update) { + t.Errorf("Expected %#v, Got %#v", expected, update) + } + + // Delete pod2 + fakeWatch.Delete(&pod2) got, ok = <-ch if !ok { t.Errorf("Unable to read from channel when expected") diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ccdafda93fa..a6f9635a2c1 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -100,6 +100,7 @@ func NewMainKubelet( hostname: hostname, dockerClient: dockerClient, etcdClient: etcdClient, + kubeClient: kubeClient, rootDirectory: rootDirectory, resyncInterval: resyncInterval, networkContainerImage: networkContainerImage, diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 76d53537956..e280fd4077a 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -47,6 +47,7 @@ func init() { } func newTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *dockertools.FakeDockerClient) { + // TODO: get rid of fakeEtcdClient and return value. fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeDocker := &dockertools.FakeDockerClient{ RemovedImages: util.StringSet{}, @@ -55,10 +56,11 @@ func newTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *dockertools kubelet := &Kubelet{} kubelet.dockerClient = fakeDocker kubelet.dockerPuller = &dockertools.FakeDockerPuller{} - kubelet.etcdClient = fakeEtcdClient kubelet.rootDirectory = "/tmp/kubelet" kubelet.podWorkers = newPodWorkers() kubelet.sourceReady = func(source string) bool { return true } + kubelet.masterServiceNamespace = api.NamespaceDefault + kubelet.serviceLister = testServiceLister{} return kubelet, fakeEtcdClient, fakeDocker } diff --git a/pkg/standalone/standalone.go b/pkg/standalone/standalone.go index 36da0e6deac..b3d7218dcb3 100644 --- a/pkg/standalone/standalone.go +++ b/pkg/standalone/standalone.go @@ -229,10 +229,14 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig { glog.Infof("Adding manifest url: %v", kc.ManifestURL) config.NewSourceURL(kc.ManifestURL, kc.HttpCheckFrequency, cfg.Channel(kubelet.HTTPSource)) } - if !reflect.ValueOf(kc.EtcdClient).IsNil() { + if kc.EtcdClient != nil && !reflect.ValueOf(kc.EtcdClient).IsNil() { glog.Infof("Watching for etcd configs at %v", kc.EtcdClient.GetCluster()) config.NewSourceEtcd(config.EtcdKeyForHost(kc.Hostname), kc.EtcdClient, cfg.Channel(kubelet.EtcdSource)) } + if kc.KubeClient != nil && !reflect.ValueOf(kc.KubeClient).IsNil() { + glog.Infof("Watching apiserver") + config.NewSourceApiserver(kc.KubeClient, kc.Hostname, cfg.Channel(kubelet.ApiserverSource)) + } return cfg }