remove the rs&subrs logic from cluster controller

This commit is contained in:
huangyuqi
2016-05-05 20:29:44 +08:00
parent 649b2c6e20
commit 21fe26bd07
19 changed files with 553 additions and 1841 deletions

View File

@@ -0,0 +1,4 @@
assignees:
- quinton-hoole
- nikhiljindal
- madhusundancs

View File

@@ -0,0 +1,123 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster
import (
"net"
"strings"
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/typed/discovery"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
utilnet "k8s.io/kubernetes/pkg/util/net"
)
const (
UserAgentName = "Cluster-Controller"
KubeAPIQPS = 20.0
KubeAPIBurst = 30
)
type ClusterClient struct {
discoveryClient *discovery.DiscoveryClient
}
func NewClusterClientSet(c *federation_v1alpha1.Cluster) (*ClusterClient, error) {
var serverAddress string
hostIP, err := utilnet.ChooseHostInterface()
if err != nil {
return nil, err
}
for _, item := range c.Spec.ServerAddressByClientCIDRs {
_, cidrnet, err := net.ParseCIDR(item.ClientCIDR)
if err != nil {
return nil, err
}
myaddr := net.ParseIP(hostIP.String())
if cidrnet.Contains(myaddr) == true {
serverAddress = item.ServerAddress
break
}
}
var clusterClientSet = ClusterClient{}
if serverAddress != "" {
clusterConfig, err := clientcmd.BuildConfigFromFlags(serverAddress, "")
if err != nil {
return nil, err
}
clusterConfig.QPS = KubeAPIQPS
clusterConfig.Burst = KubeAPIBurst
clusterClientSet.discoveryClient = discovery.NewDiscoveryClientForConfigOrDie((restclient.AddUserAgent(clusterConfig, UserAgentName)))
if clusterClientSet.discoveryClient == nil {
return nil, nil
}
}
return &clusterClientSet, err
}
// GetClusterHealthStatus gets the kubernetes cluster health status by requesting "/healthz"
func (self *ClusterClient) GetClusterHealthStatus() *federation_v1alpha1.ClusterStatus {
clusterStatus := federation_v1alpha1.ClusterStatus{}
currentTime := unversioned.Now()
newClusterReadyCondition := federation_v1alpha1.ClusterCondition{
Type: federation_v1alpha1.ClusterReady,
Status: v1.ConditionTrue,
Reason: "ClusterReady",
Message: "/healthz responded with ok",
LastProbeTime: currentTime,
LastTransitionTime: currentTime,
}
newClusterNotReadyCondition := federation_v1alpha1.ClusterCondition{
Type: federation_v1alpha1.ClusterReady,
Status: v1.ConditionFalse,
Reason: "ClusterNotReady",
Message: "/healthz responded without ok",
LastProbeTime: currentTime,
LastTransitionTime: currentTime,
}
newNodeOfflineCondition := federation_v1alpha1.ClusterCondition{
Type: federation_v1alpha1.ClusterOffline,
Status: v1.ConditionTrue,
Reason: "ClusterNotReachable",
Message: "cluster is not reachable",
LastProbeTime: currentTime,
LastTransitionTime: currentTime,
}
newNodeNotOfflineCondition := federation_v1alpha1.ClusterCondition{
Type: federation_v1alpha1.ClusterOffline,
Status: v1.ConditionFalse,
Reason: "ClusterReachable",
Message: "cluster is reachable",
LastProbeTime: currentTime,
LastTransitionTime: currentTime,
}
body, err := self.discoveryClient.Get().AbsPath("/healthz").Do().Raw()
if err != nil {
clusterStatus.Conditions = append(clusterStatus.Conditions, newNodeOfflineCondition)
} else {
if !strings.EqualFold(string(body), "ok") {
clusterStatus.Conditions = append(clusterStatus.Conditions, newClusterNotReadyCondition, newNodeNotOfflineCondition)
} else {
clusterStatus.Conditions = append(clusterStatus.Conditions, newClusterReadyCondition)
}
}
return &clusterStatus
}

View File

@@ -0,0 +1,196 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster
import (
"strings"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/federation/apis/federation"
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
cluster_cache "k8s.io/kubernetes/federation/client/cache"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
)
type ClusterController struct {
knownClusterSet sets.String
// federationClient used to operate cluster
federationClient federationclientset.Interface
// clusterMonitorPeriod is the period for updating status of cluster
clusterMonitorPeriod time.Duration
// clusterClusterStatusMap is a mapping of clusterName and cluster status of last sampling
clusterClusterStatusMap map[string]federation_v1alpha1.ClusterStatus
// clusterKubeClientMap is a mapping of clusterName and restclient
clusterKubeClientMap map[string]ClusterClient
// cluster framework and store
clusterController *framework.Controller
clusterStore cluster_cache.StoreToClusterLister
}
// NewclusterController returns a new cluster controller
func NewclusterController(federationClient federationclientset.Interface, clusterMonitorPeriod time.Duration) *ClusterController {
cc := &ClusterController{
knownClusterSet: make(sets.String),
federationClient: federationClient,
clusterMonitorPeriod: clusterMonitorPeriod,
clusterClusterStatusMap: make(map[string]federation_v1alpha1.ClusterStatus),
clusterKubeClientMap: make(map[string]ClusterClient),
}
cc.clusterStore.Store, cc.clusterController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return cc.federationClient.Federation().Clusters().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return cc.federationClient.Federation().Clusters().Watch(options)
},
},
&federation.Cluster{},
controller.NoResyncPeriodFunc(),
framework.ResourceEventHandlerFuncs{
DeleteFunc: cc.delFromClusterSet,
AddFunc: cc.addToClusterSet,
},
)
return cc
}
// delFromClusterSet delete a cluster from clusterSet and
// delete the corresponding restclient from the map clusterKubeClientMap
func (cc *ClusterController) delFromClusterSet(obj interface{}) {
cluster := obj.(*federation_v1alpha1.Cluster)
cc.knownClusterSet.Delete(cluster.Name)
delete(cc.clusterKubeClientMap, cluster.Name)
}
// addToClusterSet insert the new cluster to clusterSet and create a corresponding
// restclient to map clusterKubeClientMap
func (cc *ClusterController) addToClusterSet(obj interface{}) {
cluster := obj.(*federation_v1alpha1.Cluster)
cc.knownClusterSet.Insert(cluster.Name)
// create the restclient of cluster
restClient, err := NewClusterClientSet(cluster)
if err != nil || restClient == nil {
glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err)
return
}
cc.clusterKubeClientMap[cluster.Name] = *restClient
}
// Run begins watching and syncing.
func (cc *ClusterController) Run() {
defer utilruntime.HandleCrash()
go cc.clusterController.Run(wait.NeverStop)
// monitor cluster status periodically, in phase 1 we just get the health state from "/healthz"
go wait.Until(func() {
if err := cc.UpdateClusterStatus(); err != nil {
glog.Errorf("Error monitoring cluster status: %v", err)
}
}, cc.clusterMonitorPeriod, wait.NeverStop)
}
func (cc *ClusterController) GetClusterStatus(cluster *federation_v1alpha1.Cluster) (*federation_v1alpha1.ClusterStatus, error) {
// just get the status of cluster, by requesting the restapi "/healthz"
clusterClient, found := cc.clusterKubeClientMap[cluster.Name]
if !found {
glog.Infof("It's a new cluster, a cluster client will be created")
client, err := NewClusterClientSet(cluster)
if err != nil || client == nil {
glog.Infof("Failed to create cluster client, err: %v", err)
return nil, err
}
clusterClient = *client
cc.clusterKubeClientMap[cluster.Name] = clusterClient
}
clusterStatus := clusterClient.GetClusterHealthStatus()
return clusterStatus, nil
}
// UpdateClusterStatus checks cluster status and get the metrics from cluster's restapi
func (cc *ClusterController) UpdateClusterStatus() error {
clusters, err := cc.federationClient.Federation().Clusters().List(api.ListOptions{})
if err != nil {
return err
}
for _, cluster := range clusters.Items {
if !cc.knownClusterSet.Has(cluster.Name) {
glog.V(1).Infof("ClusterController observed a new cluster: %#v", cluster)
cc.knownClusterSet.Insert(cluster.Name)
}
}
// If there's a difference between lengths of known clusters and observed clusters
if len(cc.knownClusterSet) != len(clusters.Items) {
observedSet := make(sets.String)
for _, cluster := range clusters.Items {
observedSet.Insert(cluster.Name)
}
deleted := cc.knownClusterSet.Difference(observedSet)
for clusterName := range deleted {
glog.V(1).Infof("ClusterController observed a Cluster deletion: %v", clusterName)
cc.knownClusterSet.Delete(clusterName)
}
}
for _, cluster := range clusters.Items {
clusterStatusNew, err := cc.GetClusterStatus(&cluster)
if err != nil {
glog.Infof("Failed to Get the status of cluster: %v", cluster.Name)
continue
}
clusterStatusOld, found := cc.clusterClusterStatusMap[cluster.Name]
if !found {
glog.Infof("There is no status stored for cluster: %v before", cluster.Name)
} else {
hasTransition := false
for i := 0; i < len(clusterStatusNew.Conditions); i++ {
if !(strings.EqualFold(string(clusterStatusNew.Conditions[i].Type), string(clusterStatusOld.Conditions[i].Type)) &&
strings.EqualFold(string(clusterStatusNew.Conditions[i].Status), string(clusterStatusOld.Conditions[i].Status))) {
hasTransition = true
break
}
}
if !hasTransition {
for j := 0; j < len(clusterStatusNew.Conditions); j++ {
clusterStatusNew.Conditions[j].LastTransitionTime = clusterStatusOld.Conditions[j].LastTransitionTime
}
}
}
cc.clusterClusterStatusMap[cluster.Name] = *clusterStatusNew
cluster.Status = *clusterStatusNew
cluster, err := cc.federationClient.Federation().Clusters().UpdateStatus(&cluster)
if err != nil {
glog.Infof("Failed to update the status of cluster: %v ,error is : %v", cluster.Name, err)
continue
}
}
return nil
}

View File

@@ -0,0 +1,138 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cluster
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/util"
)
func newCluster(clusterName string, serverUrl string) *federation_v1alpha1.Cluster {
cluster := federation_v1alpha1.Cluster{
TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Federation.GroupVersion().String()},
ObjectMeta: v1.ObjectMeta{
UID: util.NewUUID(),
Name: clusterName,
},
Spec: federation_v1alpha1.ClusterSpec{
ServerAddressByClientCIDRs: []federation_v1alpha1.ServerAddressByClientCIDR{
{
ClientCIDR: "0.0.0.0/0",
ServerAddress: serverUrl,
},
},
},
}
return &cluster
}
func newClusterList(cluster *federation_v1alpha1.Cluster) *federation_v1alpha1.ClusterList {
clusterList := federation_v1alpha1.ClusterList{
TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Federation.GroupVersion().String()},
ListMeta: unversioned.ListMeta{
SelfLink: "foobar",
},
Items: []federation_v1alpha1.Cluster{},
}
clusterList.Items = append(clusterList.Items, *cluster)
return &clusterList
}
// init a fake http handler, simulate a federation apiserver, response the "DELETE" "PUT" "GET" "UPDATE"
// when "canBeGotten" is false, means that user can not get the cluster cluster from apiserver
func createHttptestFakeHandlerForFederation(clusterList *federation_v1alpha1.ClusterList, canBeGotten bool) *http.HandlerFunc {
fakeHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
clusterListString, _ := json.Marshal(*clusterList)
w.Header().Set("Content-Type", "application/json")
switch r.Method {
case "PUT":
fmt.Fprintln(w, string(clusterListString))
case "GET":
if canBeGotten {
fmt.Fprintln(w, string(clusterListString))
} else {
fmt.Fprintln(w, "")
}
default:
fmt.Fprintln(w, "")
}
})
return &fakeHandler
}
// init a fake http handler, simulate a cluster apiserver, response the "/healthz"
// when "canBeGotten" is false, means that user can not get response from apiserver
func createHttptestFakeHandlerForCluster(canBeGotten bool) *http.HandlerFunc {
fakeHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
switch r.Method {
case "GET":
if canBeGotten {
fmt.Fprintln(w, "ok")
} else {
w.WriteHeader(http.StatusNotFound)
}
default:
fmt.Fprintln(w, "")
}
})
return &fakeHandler
}
func TestUpdateClusterStatusOK(t *testing.T) {
clusterName := "foobarCluster"
// create dummy httpserver
testClusterServer := httptest.NewServer(createHttptestFakeHandlerForCluster(true))
defer testClusterServer.Close()
federationCluster := newCluster(clusterName, testClusterServer.URL)
federationClusterList := newClusterList(federationCluster)
testFederationServer := httptest.NewServer(createHttptestFakeHandlerForFederation(federationClusterList, true))
defer testFederationServer.Close()
restClientCfg, err := clientcmd.BuildConfigFromFlags(testFederationServer.URL, "")
if err != nil {
t.Errorf("Failed to build client config")
}
federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
manager := NewclusterController(federationClientSet, 5)
err = manager.UpdateClusterStatus()
if err != nil {
t.Errorf("Failed to Update Cluster Status: %v", err)
}
clusterStatus, found := manager.clusterClusterStatusMap[clusterName]
if !found {
t.Errorf("Failed to Update Cluster Status")
} else {
if (clusterStatus.Conditions[1].Status != v1.ConditionFalse) || (clusterStatus.Conditions[1].Type != federation_v1alpha1.ClusterOffline) {
t.Errorf("Failed to Update Cluster Status")
}
}
}

View File

@@ -0,0 +1,18 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package cluster contains code for syncing cluster
package cluster

View File

@@ -0,0 +1,19 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package federation_controller contains code for controllers (like the cluster
// controller).
package federation_controller