Merge pull request #34273 from wlan0/master
Automatic merge from submit-queue (batch tested with PRs 39093, 34273) start breaking up controller manager into two pieces This PR addresses: https://github.com/kubernetes/features/issues/88 This commit starts breaking the controller manager into two pieces, namely, 1. cloudprovider dependent piece 2. coudprovider agnostic piece the controller manager has the following control loops - - nodeController - volumeController - routeController - serviceController - replicationController - endpointController - resourceQuotaController - namespaceController - deploymentController etc.. among the above controller loops, - nodeController - volumeController - routeController - serviceController are cloud provider dependent. As kubernetes has evolved tremendously, it has become difficult for different cloudproviders (currently 8), to make changes and iterate quickly. Moreover, the cloudproviders are constrained by the kubernetes build/release lifecycle. This commit is the first step in moving towards a kubernetes code base where cloud providers specific code will move out of the core repository, and will be maintained by the cloud providers themselves. I have added a new cloud provider called "external", which signals the controller-manager that cloud provider specific loops are being run by another controller. I have added these changes in such a way that the existing cloud providers are not affected. This change is completely backwards compatible, and does not require any changes to the way kubernetes is run today. Finally, along with the controller-manager, the kubelet also has cloud-provider specific code, and that will be addressed in a different commit/issue. @alena1108 @ibuildthecloud @thockin @dchen1107 **Special notes for your reviewer**: @thockin - Im making this **WIP** PR to ensure that I don't stray too far from everyone's view of how we should make this change. As you can see, only one controller, namely `nodecontroller` can be disabled with the `--cloudprovider=external` flag at the moment. I'm working on cleaning up the `rancher-controller-manger` that I wrote to test this. Secondly, I'd like to use this PR to address cloudprovider specific code in kubelet and api-server. **Kubelet** Kubelet uses provider specific code for node registration and for checking node-status. I thought of two ways to divide the kubelet: - We could start a cloud provider specific kubelet on each host as a part of kubernetes, and this cloud-specific-kubelet does node registration and node-status checks. - Create a kubelet plugin for each provider, which will be started by kubelet as a long running service. This plugin can be packaged as a binary. I'm leaning towards the first option. That way, kubelet does not have to manage another process, and we can offload the process management of the cloud-provider-specific-kubelet to something like systemd. @dchen1107 @thockin what do you think? **Kube-apiserver** Kube-apiserver uses provider specific code for distributing ssh keys to all the nodes of a cluster. Do you have any suggestions about how to address this? **Release note**: ``` release-note ```
This commit is contained in:
28
cmd/cloud-controller-manager/BUILD
Normal file
28
cmd/cloud-controller-manager/BUILD
Normal file
@@ -0,0 +1,28 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_binary",
|
||||
)
|
||||
|
||||
go_binary(
|
||||
name = "cloud-controller-manager",
|
||||
srcs = ["controller-manager.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//cmd/cloud-controller-manager/app:go_default_library",
|
||||
"//cmd/cloud-controller-manager/app/options:go_default_library",
|
||||
"//pkg/client/metrics/prometheus:go_default_library",
|
||||
"//pkg/cloudprovider:go_default_library",
|
||||
"//pkg/cloudprovider/providers:go_default_library",
|
||||
"//pkg/healthz:go_default_library",
|
||||
"//pkg/util/flag:go_default_library",
|
||||
"//pkg/util/logs:go_default_library",
|
||||
"//pkg/version/prometheus:go_default_library",
|
||||
"//pkg/version/verflag:go_default_library",
|
||||
"//vendor:github.com/golang/glog",
|
||||
"//vendor:github.com/spf13/pflag",
|
||||
],
|
||||
)
|
38
cmd/cloud-controller-manager/app/BUILD
Normal file
38
cmd/cloud-controller-manager/app/BUILD
Normal file
@@ -0,0 +1,38 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["controllermanager.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//cmd/cloud-controller-manager/app/options:go_default_library",
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library",
|
||||
"//pkg/client/leaderelection:go_default_library",
|
||||
"//pkg/client/leaderelection/resourcelock:go_default_library",
|
||||
"//pkg/client/record:go_default_library",
|
||||
"//pkg/client/restclient:go_default_library",
|
||||
"//pkg/client/unversioned/clientcmd:go_default_library",
|
||||
"//pkg/cloudprovider:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/cloud:go_default_library",
|
||||
"//pkg/controller/informers:go_default_library",
|
||||
"//pkg/controller/route:go_default_library",
|
||||
"//pkg/controller/service:go_default_library",
|
||||
"//pkg/healthz:go_default_library",
|
||||
"//pkg/util/configz:go_default_library",
|
||||
"//pkg/util/wait:go_default_library",
|
||||
"//vendor:github.com/golang/glog",
|
||||
"//vendor:github.com/prometheus/client_golang/prometheus",
|
||||
"//vendor:github.com/spf13/cobra",
|
||||
"//vendor:github.com/spf13/pflag",
|
||||
],
|
||||
)
|
251
cmd/cloud-controller-manager/app/controllermanager.go
Normal file
251
cmd/cloud-controller-manager/app/controllermanager.go
Normal file
@@ -0,0 +1,251 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package app
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/pprof"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/cmd/cloud-controller-manager/app/options"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
v1core "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
|
||||
"k8s.io/kubernetes/pkg/client/leaderelection"
|
||||
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
nodecontroller "k8s.io/kubernetes/pkg/controller/cloud"
|
||||
"k8s.io/kubernetes/pkg/controller/informers"
|
||||
routecontroller "k8s.io/kubernetes/pkg/controller/route"
|
||||
servicecontroller "k8s.io/kubernetes/pkg/controller/service"
|
||||
"k8s.io/kubernetes/pkg/healthz"
|
||||
"k8s.io/kubernetes/pkg/util/configz"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
const (
|
||||
// Jitter used when starting controller managers
|
||||
ControllerStartJitter = 1.0
|
||||
)
|
||||
|
||||
// NewCloudControllerManagerCommand creates a *cobra.Command object with default parameters
|
||||
func NewCloudControllerManagerCommand() *cobra.Command {
|
||||
s := options.NewCloudControllerManagerServer()
|
||||
s.AddFlags(pflag.CommandLine)
|
||||
cmd := &cobra.Command{
|
||||
Use: "cloud-controller-manager",
|
||||
Long: `The Cloud controller manager is a daemon that embeds
|
||||
the cloud specific control loops shipped with Kubernetes.`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
},
|
||||
}
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
// ResyncPeriod computes the time interval a shared informer waits before resyncing with the api server
|
||||
func ResyncPeriod(s *options.CloudControllerManagerServer) func() time.Duration {
|
||||
return func() time.Duration {
|
||||
factor := rand.Float64() + 1
|
||||
return time.Duration(float64(s.MinResyncPeriod.Nanoseconds()) * factor)
|
||||
}
|
||||
}
|
||||
|
||||
// Run runs the ExternalCMServer. This should never exit.
|
||||
func Run(s *options.CloudControllerManagerServer, cloud cloudprovider.Interface) error {
|
||||
if c, err := configz.New("componentconfig"); err == nil {
|
||||
c.Set(s.KubeControllerManagerConfiguration)
|
||||
} else {
|
||||
glog.Errorf("unable to register configz: %s", err)
|
||||
}
|
||||
kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Set the ContentType of the requests from kube client
|
||||
kubeconfig.ContentConfig.ContentType = s.ContentType
|
||||
// Override kubeconfig qps/burst settings from flags
|
||||
kubeconfig.QPS = s.KubeAPIQPS
|
||||
kubeconfig.Burst = int(s.KubeAPIBurst)
|
||||
kubeClient, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, "cloud-controller-manager"))
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid API configuration: %v", err)
|
||||
}
|
||||
leaderElectionClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "leader-election"))
|
||||
|
||||
// Start the external controller manager server
|
||||
go func() {
|
||||
mux := http.NewServeMux()
|
||||
healthz.InstallHandler(mux)
|
||||
if s.EnableProfiling {
|
||||
mux.HandleFunc("/debug/pprof/", pprof.Index)
|
||||
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
|
||||
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
|
||||
}
|
||||
configz.InstallHandler(mux)
|
||||
mux.Handle("/metrics", prometheus.Handler())
|
||||
|
||||
server := &http.Server{
|
||||
Addr: net.JoinHostPort(s.Address, strconv.Itoa(int(s.Port))),
|
||||
Handler: mux,
|
||||
}
|
||||
glog.Fatal(server.ListenAndServe())
|
||||
}()
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
|
||||
recorder := eventBroadcaster.NewRecorder(v1.EventSource{Component: "cloud-controller-manager"})
|
||||
|
||||
run := func(stop <-chan struct{}) {
|
||||
rootClientBuilder := controller.SimpleControllerClientBuilder{
|
||||
ClientConfig: kubeconfig,
|
||||
}
|
||||
var clientBuilder controller.ControllerClientBuilder
|
||||
if len(s.ServiceAccountKeyFile) > 0 && s.UseServiceAccountCredentials {
|
||||
clientBuilder = controller.SAControllerClientBuilder{
|
||||
ClientConfig: restclient.AnonymousClientConfig(kubeconfig),
|
||||
CoreClient: kubeClient.Core(),
|
||||
Namespace: "kube-system",
|
||||
}
|
||||
} else {
|
||||
clientBuilder = rootClientBuilder
|
||||
}
|
||||
|
||||
err := StartControllers(s, kubeconfig, rootClientBuilder, clientBuilder, stop, recorder, cloud)
|
||||
glog.Fatalf("error running controllers: %v", err)
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
if !s.LeaderElection.LeaderElect {
|
||||
run(nil)
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
// Identity used to distinguish between multiple cloud controller manager instances
|
||||
id, err := os.Hostname()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Lock required for leader election
|
||||
rl := resourcelock.EndpointsLock{
|
||||
EndpointsMeta: v1.ObjectMeta{
|
||||
Namespace: "kube-system",
|
||||
Name: "cloud-controller-manager",
|
||||
},
|
||||
Client: leaderElectionClient,
|
||||
LockConfig: resourcelock.ResourceLockConfig{
|
||||
Identity: id + "-external-cloud-controller",
|
||||
EventRecorder: recorder,
|
||||
},
|
||||
}
|
||||
|
||||
// Try and become the leader and start cloud controller manager loops
|
||||
leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
|
||||
Lock: &rl,
|
||||
LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
|
||||
RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
|
||||
RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
|
||||
Callbacks: leaderelection.LeaderCallbacks{
|
||||
OnStartedLeading: run,
|
||||
OnStoppedLeading: func() {
|
||||
glog.Fatalf("leaderelection lost")
|
||||
},
|
||||
},
|
||||
})
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
// StartControllers starts the cloud specific controller loops.
|
||||
func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restclient.Config, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}, recorder record.EventRecorder, cloud cloudprovider.Interface) error {
|
||||
// Function to build the kube client object
|
||||
client := func(serviceAccountName string) clientset.Interface {
|
||||
return rootClientBuilder.ClientOrDie(serviceAccountName)
|
||||
}
|
||||
sharedInformers := informers.NewSharedInformerFactory(client("shared-informers"), nil, ResyncPeriod(s)())
|
||||
|
||||
_, clusterCIDR, err := net.ParseCIDR(s.ClusterCIDR)
|
||||
if err != nil {
|
||||
glog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", s.ClusterCIDR, err)
|
||||
}
|
||||
|
||||
// Start the CloudNodeController
|
||||
nodeController, err := nodecontroller.NewCloudNodeController(
|
||||
sharedInformers.Nodes(),
|
||||
client("cloud-node-controller"), cloud,
|
||||
s.NodeMonitorPeriod.Duration)
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to initialize nodecontroller: %v", err)
|
||||
}
|
||||
nodeController.Run()
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
// Start the service controller
|
||||
serviceController, err := servicecontroller.New(cloud, client("service-controller"), s.ClusterName)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to start service controller: %v", err)
|
||||
} else {
|
||||
serviceController.Run(int(s.ConcurrentServiceSyncs))
|
||||
}
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
|
||||
// If CIDRs should be allocated for pods and set on the CloudProvider, then start the route controller
|
||||
if s.AllocateNodeCIDRs && s.ConfigureCloudRoutes {
|
||||
if routes, ok := cloud.Routes(); !ok {
|
||||
glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
|
||||
} else {
|
||||
routeController := routecontroller.New(routes, client("route-controller"), s.ClusterName, clusterCIDR)
|
||||
routeController.Run(s.RouteReconciliationPeriod.Duration)
|
||||
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
|
||||
}
|
||||
} else {
|
||||
glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes)
|
||||
}
|
||||
|
||||
// If apiserver is not running we should wait for some time and fail only then. This is particularly
|
||||
// important when we start apiserver and controller manager at the same time.
|
||||
var versionStrings []string
|
||||
err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
|
||||
if versionStrings, err = restclient.ServerAPIVersions(kubeconfig); err == nil {
|
||||
return true, nil
|
||||
}
|
||||
glog.Errorf("Failed to get api versions from server: %v", err)
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to get api versions from server: %v", err)
|
||||
}
|
||||
|
||||
sharedInformers.Start(stop)
|
||||
|
||||
select {}
|
||||
}
|
22
cmd/cloud-controller-manager/app/options/BUILD
Normal file
22
cmd/cloud-controller-manager/app/options/BUILD
Normal file
@@ -0,0 +1,22 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["options.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/apis/componentconfig:go_default_library",
|
||||
"//pkg/apis/meta/v1:go_default_library",
|
||||
"//pkg/client/leaderelection:go_default_library",
|
||||
"//pkg/master/ports:go_default_library",
|
||||
"//pkg/util/config:go_default_library",
|
||||
"//vendor:github.com/spf13/pflag",
|
||||
],
|
||||
)
|
86
cmd/cloud-controller-manager/app/options/options.go
Normal file
86
cmd/cloud-controller-manager/app/options/options.go
Normal file
@@ -0,0 +1,86 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package options
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/client/leaderelection"
|
||||
"k8s.io/kubernetes/pkg/master/ports"
|
||||
"k8s.io/kubernetes/pkg/util/config"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
// CloudControllerMangerServer is the main context object for the controller manager.
|
||||
type CloudControllerManagerServer struct {
|
||||
componentconfig.KubeControllerManagerConfiguration
|
||||
|
||||
Master string
|
||||
Kubeconfig string
|
||||
}
|
||||
|
||||
// NewCloudControllerManagerServer creates a new ExternalCMServer with a default config.
|
||||
func NewCloudControllerManagerServer() *CloudControllerManagerServer {
|
||||
s := CloudControllerManagerServer{
|
||||
KubeControllerManagerConfiguration: componentconfig.KubeControllerManagerConfiguration{
|
||||
Port: ports.CloudControllerManagerPort,
|
||||
Address: "0.0.0.0",
|
||||
ConcurrentServiceSyncs: 1,
|
||||
MinResyncPeriod: metav1.Duration{Duration: 12 * time.Hour},
|
||||
NodeMonitorPeriod: metav1.Duration{Duration: 5 * time.Second},
|
||||
ClusterName: "kubernetes",
|
||||
ConfigureCloudRoutes: true,
|
||||
ContentType: "application/vnd.kubernetes.protobuf",
|
||||
KubeAPIQPS: 20.0,
|
||||
KubeAPIBurst: 30,
|
||||
LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(),
|
||||
ControllerStartInterval: metav1.Duration{Duration: 0 * time.Second},
|
||||
},
|
||||
}
|
||||
s.LeaderElection.LeaderElect = true
|
||||
return &s
|
||||
}
|
||||
|
||||
// AddFlags adds flags for a specific ExternalCMServer to the specified FlagSet
|
||||
func (s *CloudControllerManagerServer) AddFlags(fs *pflag.FlagSet) {
|
||||
fs.Int32Var(&s.Port, "port", s.Port, "The port that the cloud-controller-manager's http service runs on")
|
||||
fs.Var(componentconfig.IPVar{Val: &s.Address}, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
|
||||
fs.StringVar(&s.CloudProvider, "cloud-provider", s.CloudProvider, "The provider of cloud services. Empty for no provider.")
|
||||
fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.")
|
||||
fs.DurationVar(&s.MinResyncPeriod.Duration, "min-resync-period", s.MinResyncPeriod.Duration, "The resync period in reflectors will be random between MinResyncPeriod and 2*MinResyncPeriod")
|
||||
fs.DurationVar(&s.NodeMonitorPeriod.Duration, "node-monitor-period", s.NodeMonitorPeriod.Duration,
|
||||
"The period for syncing NodeStatus in NodeController.")
|
||||
fs.StringVar(&s.ServiceAccountKeyFile, "service-account-private-key-file", s.ServiceAccountKeyFile, "Filename containing a PEM-encoded private RSA or ECDSA key used to sign service account tokens.")
|
||||
fs.BoolVar(&s.UseServiceAccountCredentials, "use-service-account-credentials", s.UseServiceAccountCredentials, "If true, use individual service account credentials for each controller.")
|
||||
fs.DurationVar(&s.RouteReconciliationPeriod.Duration, "route-reconciliation-period", s.RouteReconciliationPeriod.Duration, "The period for reconciling routes created for Nodes by cloud provider.")
|
||||
fs.BoolVar(&s.ConfigureCloudRoutes, "configure-cloud-routes", true, "Should CIDRs allocated by allocate-node-cidrs be configured on the cloud provider.")
|
||||
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
|
||||
fs.StringVar(&s.ClusterCIDR, "cluster-cidr", s.ClusterCIDR, "CIDR Range for Pods in cluster.")
|
||||
fs.BoolVar(&s.AllocateNodeCIDRs, "allocate-node-cidrs", false, "Should CIDRs for Pods be allocated and set on the cloud provider.")
|
||||
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
|
||||
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
|
||||
fs.StringVar(&s.ContentType, "kube-api-content-type", s.ContentType, "Content type of requests sent to apiserver.")
|
||||
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", s.KubeAPIQPS, "QPS to use while talking with kubernetes apiserver")
|
||||
fs.Int32Var(&s.KubeAPIBurst, "kube-api-burst", s.KubeAPIBurst, "Burst to use while talking with kubernetes apiserver")
|
||||
fs.DurationVar(&s.ControllerStartInterval.Duration, "controller-start-interval", s.ControllerStartInterval.Duration, "Interval between starting controller managers.")
|
||||
|
||||
leaderelection.BindFlags(&s.LeaderElection, fs)
|
||||
config.DefaultFeatureGate.AddFlag(fs)
|
||||
}
|
64
cmd/cloud-controller-manager/controller-manager.go
Normal file
64
cmd/cloud-controller-manager/controller-manager.go
Normal file
@@ -0,0 +1,64 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// The external controller manager is responsible for running controller loops that
|
||||
// are cloud provider dependent. It uses the API to listen to new events on resources.
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"k8s.io/kubernetes/cmd/cloud-controller-manager/app"
|
||||
"k8s.io/kubernetes/cmd/cloud-controller-manager/app/options"
|
||||
_ "k8s.io/kubernetes/pkg/client/metrics/prometheus" // for client metric registration
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
_ "k8s.io/kubernetes/pkg/cloudprovider/providers"
|
||||
"k8s.io/kubernetes/pkg/healthz"
|
||||
"k8s.io/kubernetes/pkg/util/flag"
|
||||
"k8s.io/kubernetes/pkg/util/logs"
|
||||
_ "k8s.io/kubernetes/pkg/version/prometheus" // for version metric registration
|
||||
"k8s.io/kubernetes/pkg/version/verflag"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
func init() {
|
||||
healthz.DefaultHealthz()
|
||||
}
|
||||
|
||||
func main() {
|
||||
s := options.NewCloudControllerManagerServer()
|
||||
s.AddFlags(pflag.CommandLine)
|
||||
|
||||
flag.InitFlags()
|
||||
logs.InitLogs()
|
||||
defer logs.FlushLogs()
|
||||
|
||||
verflag.PrintAndExitIfRequested()
|
||||
|
||||
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
|
||||
if err != nil {
|
||||
glog.Fatalf("Cloud provider could not be initialized: %v", err)
|
||||
}
|
||||
|
||||
if err := app.Run(s, cloud); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "%v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
@@ -2,6 +2,7 @@ cluster/addons/fluentd-elasticsearch/es-image
|
||||
cluster/images/etcd/attachlease
|
||||
cluster/images/etcd/rollback
|
||||
cmd/clicheck
|
||||
cmd/cloud-controller-manager
|
||||
cmd/gendocs
|
||||
cmd/genkubedocs
|
||||
cmd/genman
|
||||
|
48
pkg/controller/cloud/BUILD
Normal file
48
pkg/controller/cloud/BUILD
Normal file
@@ -0,0 +1,48 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["nodecontroller.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/apis/meta/v1:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library",
|
||||
"//pkg/client/record:go_default_library",
|
||||
"//pkg/cloudprovider:go_default_library",
|
||||
"//pkg/controller/informers:go_default_library",
|
||||
"//pkg/types:go_default_library",
|
||||
"//pkg/util/runtime:go_default_library",
|
||||
"//pkg/util/wait:go_default_library",
|
||||
"//vendor:github.com/golang/glog",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["nodecontroller_test.go"],
|
||||
library = "go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/apis/meta/v1:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
|
||||
"//pkg/client/record:go_default_library",
|
||||
"//pkg/cloudprovider:go_default_library",
|
||||
"//pkg/cloudprovider/providers/fake:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/informers:go_default_library",
|
||||
"//pkg/controller/node/testutil:go_default_library",
|
||||
"//pkg/util/wait:go_default_library",
|
||||
"//vendor:github.com/golang/glog",
|
||||
],
|
||||
)
|
155
pkg/controller/cloud/nodecontroller.go
Normal file
155
pkg/controller/cloud/nodecontroller.go
Normal file
@@ -0,0 +1,155 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cloud
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
v1core "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/controller/informers"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
type CloudNodeController struct {
|
||||
nodeInformer informers.NodeInformer
|
||||
kubeClient clientset.Interface
|
||||
recorder record.EventRecorder
|
||||
|
||||
cloud cloudprovider.Interface
|
||||
|
||||
// Value controlling NodeController monitoring period, i.e. how often does NodeController
|
||||
// check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod
|
||||
// set in controller-manager
|
||||
nodeMonitorPeriod time.Duration
|
||||
}
|
||||
|
||||
const (
|
||||
// nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update.
|
||||
nodeStatusUpdateRetry = 5
|
||||
|
||||
// The amount of time the nodecontroller should sleep between retrying NodeStatus updates
|
||||
retrySleepTime = 20 * time.Millisecond
|
||||
)
|
||||
|
||||
// NewCloudNodeController creates a CloudNodeController object
|
||||
func NewCloudNodeController(
|
||||
nodeInformer informers.NodeInformer,
|
||||
kubeClient clientset.Interface,
|
||||
cloud cloudprovider.Interface,
|
||||
nodeMonitorPeriod time.Duration) (*CloudNodeController, error) {
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
recorder := eventBroadcaster.NewRecorder(v1.EventSource{Component: "cloudcontrollermanager"})
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
if kubeClient != nil {
|
||||
glog.V(0).Infof("Sending events to api server.")
|
||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
|
||||
} else {
|
||||
glog.V(0).Infof("No api server defined - no events will be sent to API server.")
|
||||
}
|
||||
|
||||
cnc := &CloudNodeController{
|
||||
nodeInformer: nodeInformer,
|
||||
kubeClient: kubeClient,
|
||||
recorder: recorder,
|
||||
cloud: cloud,
|
||||
nodeMonitorPeriod: nodeMonitorPeriod,
|
||||
}
|
||||
return cnc, nil
|
||||
}
|
||||
|
||||
// This controller deletes a node if kubelet is not reporting
|
||||
// and the node is gone from the cloud provider.
|
||||
func (cnc *CloudNodeController) Run() {
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
go wait.Until(func() {
|
||||
nodes, err := cnc.kubeClient.Core().Nodes().List(v1.ListOptions{ResourceVersion: "0"})
|
||||
if err != nil {
|
||||
glog.Errorf("Error monitoring node status: %v", err)
|
||||
}
|
||||
|
||||
for i := range nodes.Items {
|
||||
var currentReadyCondition *v1.NodeCondition
|
||||
node := &nodes.Items[i]
|
||||
// Try to get the current node status
|
||||
// If node status is empty, then kubelet has not posted ready status yet. In this case, process next node
|
||||
for rep := 0; rep < nodeStatusUpdateRetry; rep++ {
|
||||
_, currentReadyCondition = v1.GetNodeCondition(&node.Status, v1.NodeReady)
|
||||
if currentReadyCondition != nil {
|
||||
break
|
||||
}
|
||||
name := node.Name
|
||||
node, err = cnc.kubeClient.Core().Nodes().Get(name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
glog.Errorf("Failed while getting a Node to retry updating NodeStatus. Probably Node %s was deleted.", name)
|
||||
break
|
||||
}
|
||||
time.Sleep(retrySleepTime)
|
||||
}
|
||||
if currentReadyCondition == nil {
|
||||
glog.Errorf("Update status of Node %v from CloudNodeController exceeds retry count.", node.Name)
|
||||
continue
|
||||
}
|
||||
// If the known node status says that Node is NotReady, then check if the node has been removed
|
||||
// from the cloud provider. If node cannot be found in cloudprovider, then delete the node immediately
|
||||
if currentReadyCondition != nil {
|
||||
if currentReadyCondition.Status != v1.ConditionTrue {
|
||||
instances, ok := cnc.cloud.Instances()
|
||||
if !ok {
|
||||
glog.Errorf("cloud provider does not support instances.")
|
||||
continue
|
||||
}
|
||||
// Check with the cloud provider to see if the node still exists. If it
|
||||
// doesn't, delete the node immediately.
|
||||
if _, err := instances.ExternalID(types.NodeName(node.Name)); err != nil {
|
||||
if err == cloudprovider.InstanceNotFound {
|
||||
glog.V(2).Infof("Deleting node no longer present in cloud provider: %s", node.Name)
|
||||
ref := &v1.ObjectReference{
|
||||
Kind: "Node",
|
||||
Name: node.Name,
|
||||
UID: types.UID(node.UID),
|
||||
Namespace: "",
|
||||
}
|
||||
glog.V(2).Infof("Recording %s event message for node %s", "DeletingNode", node.Name)
|
||||
cnc.recorder.Eventf(ref, v1.EventTypeNormal, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name), "Node %s event: %s", node.Name, "DeletingNode")
|
||||
go func(nodeName string) {
|
||||
defer utilruntime.HandleCrash()
|
||||
if err := cnc.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil {
|
||||
glog.Errorf("unable to delete node %q: %v", node.Name, err)
|
||||
}
|
||||
}(node.Name)
|
||||
}
|
||||
glog.Errorf("Error getting node data from cloud: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}, cnc.nodeMonitorPeriod, wait.NeverStop)
|
||||
}()
|
||||
}
|
122
pkg/controller/cloud/nodecontroller_test.go
Normal file
122
pkg/controller/cloud/nodecontroller_test.go
Normal file
@@ -0,0 +1,122 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cloud
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||
|
||||
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/informers"
|
||||
"k8s.io/kubernetes/pkg/controller/node/testutil"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
// This test checks that the node is deleted when kubelet stops reporting
|
||||
// and cloud provider says node is gone
|
||||
func TestNodeDeleted(t *testing.T) {
|
||||
pod0 := &v1.Pod{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Namespace: "default",
|
||||
Name: "pod0",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "node0",
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Conditions: []v1.PodCondition{
|
||||
{
|
||||
Type: v1.PodReady,
|
||||
Status: v1.ConditionTrue,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
pod1 := &v1.Pod{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Namespace: "default",
|
||||
Name: "pod1",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: "node0",
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Conditions: []v1.PodCondition{
|
||||
{
|
||||
Type: v1.PodReady,
|
||||
Status: v1.ConditionTrue,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
fnh := &testutil.FakeNodeHandler{
|
||||
Existing: []*v1.Node{
|
||||
{
|
||||
ObjectMeta: v1.ObjectMeta{
|
||||
Name: "node0",
|
||||
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
Status: v1.NodeStatus{
|
||||
Conditions: []v1.NodeCondition{
|
||||
{
|
||||
Type: v1.NodeReady,
|
||||
Status: v1.ConditionUnknown,
|
||||
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
|
||||
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*pod0, *pod1}}),
|
||||
DeleteWaitChan: make(chan struct{}),
|
||||
}
|
||||
|
||||
factory := informers.NewSharedInformerFactory(fnh, nil, controller.NoResyncPeriodFunc())
|
||||
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
cloudNodeController := &CloudNodeController{
|
||||
kubeClient: fnh,
|
||||
nodeInformer: factory.Nodes(),
|
||||
cloud: &fakecloud.FakeCloud{Err: cloudprovider.InstanceNotFound},
|
||||
nodeMonitorPeriod: 5 * time.Second,
|
||||
recorder: eventBroadcaster.NewRecorder(v1.EventSource{Component: "controllermanager"}),
|
||||
}
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
|
||||
cloudNodeController.Run()
|
||||
|
||||
select {
|
||||
case <-fnh.DeleteWaitChan:
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Errorf("Timed out waiting %v for node to be deleted", wait.ForeverTestTimeout)
|
||||
}
|
||||
if len(fnh.DeletedNodes) != 1 || fnh.DeletedNodes[0].Name != "node0" {
|
||||
t.Errorf("Node was not deleted")
|
||||
}
|
||||
}
|
@@ -29,6 +29,9 @@ const (
|
||||
// ControllerManagerPort is the default port for the controller manager status server.
|
||||
// May be overridden by a flag at startup.
|
||||
ControllerManagerPort = 10252
|
||||
// CloudControllerManagerPort is the default port for the cloud controller manager server.
|
||||
// This value may be overriden by a flag at startup.
|
||||
CloudControllerManagerPort = 10253
|
||||
// KubeletReadOnlyPort exposes basic read-only services from the kubelet.
|
||||
// May be overridden by a flag at startup.
|
||||
// This is necessary for heapster to collect monitoring stats from the kubelet
|
||||
|
Reference in New Issue
Block a user