Refactor kubelet, standalone k8s and integration test to all use the same code.

This commit is contained in:
Brendan Burns
2014-11-27 13:28:56 -08:00
parent ff1e9f4c19
commit d47b510104
5 changed files with 356 additions and 236 deletions

View File

@@ -31,6 +31,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/resources"
"github.com/GoogleCloudPlatform/kubernetes/pkg/service"
@@ -39,12 +40,9 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory"
docker "github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
)
const testRootDir = "/tmp/kubelet"
type delegateHandler struct {
delegate http.Handler
}
@@ -141,20 +139,131 @@ func RunControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
controllerManager.Run(10 * time.Second)
}
// RunKubelet starts a Kubelet talking to dockerEndpoint
func RunKubelet(etcdClient tools.EtcdClient, hostname, dockerEndpoint string) {
dockerClient, err := docker.NewClient(GetDockerEndpoint(dockerEndpoint))
if err != nil {
glog.Fatal("Couldn't connect to docker.")
// SimpleRunKubelet is a simple way to start a Kubelet talking to dockerEndpoint, using an etcdClient.
// Under the hood it calls RunKubelet (below)
func SimpleRunKubelet(etcdClient tools.EtcdClient, dockerClient dockertools.DockerInterface, hostname, rootDir, manifestURL, address string, port uint) {
kcfg := KubeletConfig{
EtcdClient: etcdClient,
DockerClient: dockerClient,
HostnameOverride: hostname,
RootDirectory: rootDir,
ManifestURL: manifestURL,
NetworkContainerImage: kubelet.NetworkContainerImage,
Port: port,
Address: util.IP(net.ParseIP(address)),
EnableServer: true,
EnableDebuggingHandlers: true,
}
RunKubelet(&kcfg)
}
// RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
// 1 Integration tests
// 2 Kubelet binary
// 3 Standalone 'kubernetes' binary
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kcfg *KubeletConfig) {
kubelet.SetupEventSending(kcfg.AuthPath, kcfg.ApiServerList)
kubelet.SetupLogging()
kubelet.SetupCapabilities(kcfg.AllowPrivileged)
kcfg.Hostname = kubelet.GetHostname(kcfg.HostnameOverride)
if len(kcfg.RootDirectory) > 0 {
kubelet.SetupRootDirectoryOrDie(kcfg.RootDirectory)
}
// Kubelet (localhost)
os.MkdirAll(testRootDir, 0750)
cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
config.NewSourceEtcd(config.EtcdKeyForHost(hostname), etcdClient, cfg1.Channel("etcd"))
myKubelet := kubelet.NewIntegrationTestKubelet(hostname, testRootDir, dockerClient)
go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0)
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), net.ParseIP("127.0.0.1"), 10250, true)
}, 0)
cfg := makePodSourceConfig(kcfg)
k := createAndInitKubelet(kcfg)
// process pods and exit.
if kcfg.Runonce {
if _, err := k.RunOnce(cfg.Updates()); err != nil {
glog.Errorf("--runonce failed: %v", err)
}
} else {
startKubelet(k, cfg, kcfg)
}
}
func startKubelet(k *kubelet.Kubelet, cfg *config.PodConfig, kc *KubeletConfig) {
// start the kubelet
go util.Forever(func() { k.Run(cfg.Updates()) }, 0)
// start the kubelet server
if kc.EnableServer {
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), net.IP(kc.Address), kc.Port, kc.EnableDebuggingHandlers)
}, 0)
}
}
func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
// define file config source
if kc.ConfigFile != "" {
config.NewSourceFile(kc.ConfigFile, kc.FileCheckFrequency, cfg.Channel("file"))
}
// define url config source
if kc.ManifestURL != "" {
config.NewSourceURL(kc.ManifestURL, kc.HttpCheckFrequency, cfg.Channel("http"))
}
if kc.EtcdClient != nil {
glog.Infof("Watching for etcd configs at %v", kc.EtcdClient.GetCluster())
config.NewSourceEtcd(config.EtcdKeyForHost(kc.Hostname), kc.EtcdClient, cfg.Channel("etcd"))
}
return cfg
}
type KubeletConfig struct {
EtcdClient tools.EtcdClient
DockerClient dockertools.DockerInterface
Address util.IP
AuthPath string
ApiServerList util.StringList
AllowPrivileged bool
HostnameOverride string
RootDirectory string
ConfigFile string
ManifestURL string
FileCheckFrequency time.Duration
HttpCheckFrequency time.Duration
Hostname string
NetworkContainerImage string
SyncFrequency time.Duration
RegistryPullQPS float64
RegistryBurst int
MinimumGCAge time.Duration
MaxContainerCount int
EnableServer bool
EnableDebuggingHandlers bool
Port uint
Runonce bool
}
func createAndInitKubelet(kc *KubeletConfig) *kubelet.Kubelet {
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
k := kubelet.NewMainKubelet(
kc.Hostname,
kc.DockerClient,
kc.EtcdClient,
kc.RootDirectory,
kc.NetworkContainerImage,
kc.SyncFrequency,
float32(kc.RegistryPullQPS),
kc.RegistryBurst,
kc.MinimumGCAge,
kc.MaxContainerCount)
k.BirthCry()
go kubelet.GarbageCollectLoop(k)
go kubelet.MonitorCAdvisor(k)
kubelet.InitHealthChecking(k)
return k
}