Remove Node Controllers ability to pull status from Kubelet

This commit is contained in:
gmarek
2015-03-27 15:09:51 +01:00
parent 2719194154
commit 72182735b9
12 changed files with 62 additions and 586 deletions

View File

@@ -21,7 +21,6 @@ import (
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@@ -29,7 +28,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
@@ -132,12 +130,9 @@ func NewNodeController(
// node addresses.
// 2. SyncCloudNodes() is called periodically (if enabled) to sync instances from cloudprovider.
// Node created here will only have specs.
// 3. Depending on how k8s is configured, there are two ways of syncing the node status:
// 3.1 SyncProbedNodeStatus() is called periodically to trigger master to probe kubelet,
// and incorporate the resulting node status.
// 3.2 MonitorNodeStatus() is called periodically to incorporate the results of node status
// pushed from kubelet to master.
func (nc *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus bool) {
// 3. MonitorNodeStatus() is called periodically to incorporate the results of node status
// pushed from kubelet to master.
func (nc *NodeController) Run(period time.Duration, syncNodeList bool) {
// Register intial set of nodes with their status set.
var nodes *api.NodeList
var err error
@@ -170,20 +165,12 @@ func (nc *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus
}, period)
}
// Start syncing or monitoring node status.
if syncNodeStatus {
go util.Forever(func() {
if err := nc.SyncProbedNodeStatus(); err != nil {
glog.Errorf("Error syncing status: %v", err)
}
}, period)
} else {
go util.Forever(func() {
if err := nc.MonitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err)
}
}, nc.nodeMonitorPeriod)
}
// Start monitoring node status.
go util.Forever(func() {
if err = nc.MonitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err)
}
}, nc.nodeMonitorPeriod)
}
// RegisterNodes registers the given list of nodes, it keeps retrying for `retryCount` times.
@@ -272,121 +259,6 @@ func (nc *NodeController) SyncCloudNodes() error {
return nil
}
// SyncProbedNodeStatus synchronizes cluster nodes status to master server.
func (nc *NodeController) SyncProbedNodeStatus() error {
nodes, err := nc.kubeClient.Nodes().List()
if err != nil {
return err
}
nodes, err = nc.PopulateNodesStatus(nodes)
if err != nil {
return err
}
for _, node := range nodes.Items {
// We used to skip updating node when node status doesn't change, this is no longer
// useful after we introduce per-probe status field, e.g. 'LastProbeTime', which will
// differ in every call of the sync loop.
glog.V(2).Infof("updating node %v", node.Name)
_, err = nc.kubeClient.Nodes().Update(&node)
if err != nil {
glog.Errorf("error updating node %s: %v", node.Name, err)
}
}
return nil
}
// PopulateNodesStatus populates node status for given list of nodes.
func (nc *NodeController) PopulateNodesStatus(nodes *api.NodeList) (*api.NodeList, error) {
var wg sync.WaitGroup
wg.Add(len(nodes.Items))
for i := range nodes.Items {
go func(node *api.Node) {
node.Status.Conditions = nc.DoCheck(node)
if err := nc.populateNodeInfo(node); err != nil {
glog.Errorf("Can't collect information for node %s: %v", node.Name, err)
}
wg.Done()
}(&nodes.Items[i])
}
wg.Wait()
return nc.PopulateAddresses(nodes)
}
// populateNodeInfo gets node info from kubelet and update the node.
func (nc *NodeController) populateNodeInfo(node *api.Node) error {
nodeInfo, err := nc.kubeletClient.GetNodeInfo(node.Name)
if err != nil {
return err
}
for key, value := range nodeInfo.Capacity {
node.Status.Capacity[key] = value
}
node.Status.NodeInfo = nodeInfo.NodeSystemInfo
return nil
}
// DoCheck performs various condition checks for given node.
func (nc *NodeController) DoCheck(node *api.Node) []api.NodeCondition {
var conditions []api.NodeCondition
// Check Condition: NodeReady. TODO: More node conditions.
oldReadyCondition := nc.getCondition(&node.Status, api.NodeReady)
newReadyCondition := nc.checkNodeReady(node)
nc.updateLastTransitionTime(oldReadyCondition, newReadyCondition)
if newReadyCondition.Status != api.ConditionTrue {
// Node is not ready for this probe, we need to check if pods need to be deleted.
if newReadyCondition.LastProbeTime.After(newReadyCondition.LastTransitionTime.Add(nc.podEvictionTimeout)) {
// As long as the node fails, we call delete pods to delete all pods. Node controller sync
// is not a closed loop process, there is no feedback from other components regarding pod
// status. Keep listing pods to sanity check if pods are all deleted makes more sense.
nc.deletePods(node.Name)
}
}
conditions = append(conditions, *newReadyCondition)
return conditions
}
// updateLastTransitionTime updates LastTransitionTime for the newCondition based on oldCondition.
func (nc *NodeController) updateLastTransitionTime(oldCondition, newCondition *api.NodeCondition) {
if oldCondition != nil && oldCondition.Status == newCondition.Status {
// If node status doesn't change, transition time is same as last time.
newCondition.LastTransitionTime = oldCondition.LastTransitionTime
} else {
// Set transition time to Now() if node status changes or `oldCondition` is nil, which
// happens only when the node is checked for the first time.
newCondition.LastTransitionTime = nc.now()
}
}
// checkNodeReady checks raw node ready condition, without transition timestamp set.
func (nc *NodeController) checkNodeReady(node *api.Node) *api.NodeCondition {
switch status, err := nc.kubeletClient.HealthCheck(node.Name); {
case err != nil:
glog.V(2).Infof("NodeController: node %s health check error: %v", node.Name, err)
return &api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionUnknown,
Reason: fmt.Sprintf("Node health check error: %v", err),
LastProbeTime: nc.now(),
}
case status == probe.Failure:
return &api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionFalse,
Reason: fmt.Sprintf("Node health check failed: kubelet /healthz endpoint returns not ok"),
LastProbeTime: nc.now(),
}
default:
return &api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: fmt.Sprintf("Node health check succeeded: kubelet /healthz endpoint returns ok"),
LastProbeTime: nc.now(),
}
}
}
// PopulateAddresses queries Address for given list of nodes.
func (nc *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, error) {
if nc.isRunningCloudProvider() {
@@ -440,7 +312,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
lastReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionUnknown,
LastProbeTime: node.CreationTimestamp,
LastHeartbeatTime: node.CreationTimestamp,
LastTransitionTime: node.CreationTimestamp,
}
gracePeriod = nc.nodeStartupGracePeriod
@@ -497,7 +369,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
readyTransitionTimestamp: nc.now(),
}
nc.nodeStatusMap[node.Name] = savedNodeStatus
} else if savedCondition != nil && observedCondition != nil && savedCondition.LastProbeTime != observedCondition.LastProbeTime {
} else if savedCondition != nil && observedCondition != nil && savedCondition.LastHeartbeatTime != observedCondition.LastHeartbeatTime {
var transitionTime util.Time
// If ReadyCondition changed since the last time we checked, we update the transition timestamp to "now",
// otherwise we leave it as it is.
@@ -526,7 +398,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
Type: api.NodeReady,
Status: api.ConditionUnknown,
Reason: fmt.Sprintf("Kubelet never posted node status."),
LastProbeTime: node.CreationTimestamp,
LastHeartbeatTime: node.CreationTimestamp,
LastTransitionTime: nc.now(),
})
} else {
@@ -536,7 +408,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
readyCondition.Status = api.ConditionUnknown
readyCondition.Reason = fmt.Sprintf("Kubelet stopped posting node status.")
// LastProbeTime is the last time we heard from kubelet.
readyCondition.LastProbeTime = lastReadyCondition.LastProbeTime
readyCondition.LastHeartbeatTime = lastReadyCondition.LastHeartbeatTime
readyCondition.LastTransitionTime = nc.now()
}
}