Change to NotReadyNetworking and use in scheduler

This commit is contained in:
Wojciech Tyczynski
2016-05-27 11:37:20 +02:00
parent 7bdf480340
commit be1b57100d
6 changed files with 151 additions and 72 deletions

View File

@@ -37,7 +37,8 @@ const (
maxConcurrentRouteCreations int = 200
// Maximum number of retries of route creations.
maxRetries int = 5
updateNodeStatusMaxRetries = 3
// Maximum number of retries of node status update.
updateNodeStatusMaxRetries int = 3
)
type RouteController struct {
@@ -86,22 +87,6 @@ func (rc *RouteController) reconcileNodeRoutes() error {
return rc.reconcile(nodeList.Items, routeList)
}
func tryUpdateNodeStatus(node *api.Node, kubeClient clientset.Interface) error {
for i := 0; i < updateNodeStatusMaxRetries; i++ {
if _, err := kubeClient.Core().Nodes().UpdateStatus(node); err == nil {
break
} else {
if i+1 < updateNodeStatusMaxRetries {
glog.Errorf("Error updating node %s - will retry: %v", node.Name, err)
} else {
glog.Errorf("Error updating node %s - wont retry: %v", node.Name, err)
return err
}
}
}
return nil
}
func (rc *RouteController) reconcile(nodes []api.Node, routes []*cloudprovider.Route) error {
// nodeCIDRs maps nodeName->nodeCIDR
nodeCIDRs := make(map[string]string)
@@ -129,6 +114,28 @@ func (rc *RouteController) reconcile(nodes []api.Node, routes []*cloudprovider.R
}
nameHint := string(node.UID)
wg.Add(1)
glog.Infof("Creating route for node %s %s with hint %s", node.Name, route.DestinationCIDR, nameHint)
go func(nodeName string, nameHint string, route *cloudprovider.Route) {
defer wg.Done()
for i := 0; i < maxRetries; i++ {
startTime := time.Now()
// Ensure that we don't have more than maxConcurrentRouteCreations
// CreateRoute calls in flight.
rateLimiter <- struct{}{}
err := rc.routes.CreateRoute(rc.clusterName, nameHint, route)
<-rateLimiter
rc.updateNetworkingCondition(nodeName, err == nil)
if err != nil {
glog.Errorf("Could not create route %s %s for node %s after %v: %v", nameHint, route.DestinationCIDR, nodeName, time.Now().Sub(startTime), err)
} else {
glog.Infof("Created route for node %s %s with hint %s after %v", nodeName, route.DestinationCIDR, nameHint, time.Now().Sub(startTime))
return
}
}
}(node.Name, nameHint, route)
} else {
rc.updateNetworkingCondition(node.Name, true)
}
nodeCIDRs[node.Name] = node.Spec.PodCIDR
}
@@ -138,12 +145,12 @@ func (rc *RouteController) reconcile(nodes []api.Node, routes []*cloudprovider.R
if nodeCIDRs[route.TargetInstance] != route.DestinationCIDR {
wg.Add(1)
// Delete the route.
glog.V(2).Infof("Deleting route %s %s", route.Name, route.DestinationCIDR)
glog.Infof("Deleting route %s %s", route.Name, route.DestinationCIDR)
go func(route *cloudprovider.Route, startTime time.Time) {
if err := rc.routes.DeleteRoute(rc.clusterName, route); err != nil {
glog.Errorf("Could not delete route %s %s after %v: %v", route.Name, route.DestinationCIDR, time.Now().Sub(startTime), err)
} else {
glog.V(2).Infof("Deleted route %s %s after %v", route.Name, route.DestinationCIDR, time.Now().Sub(startTime))
glog.Infof("Deleted route %s %s after %v", route.Name, route.DestinationCIDR, time.Now().Sub(startTime))
}
wg.Done()
@@ -155,6 +162,65 @@ func (rc *RouteController) reconcile(nodes []api.Node, routes []*cloudprovider.R
return nil
}
func updateNetworkingCondition(node *api.Node, routeCreated bool) {
_, networkingCondition := api.GetNodeCondition(&node.Status, api.NodeNetworkUnavailable)
currentTime := unversioned.Now()
if routeCreated {
if networkingCondition != nil && networkingCondition.Status != api.ConditionFalse {
networkingCondition.Status = api.ConditionFalse
networkingCondition.Reason = "RouteCreated"
networkingCondition.Message = "RouteController created a route"
networkingCondition.LastTransitionTime = currentTime
} else if networkingCondition == nil {
node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{
Type: api.NodeNetworkUnavailable,
Status: api.ConditionFalse,
Reason: "RouteCreated",
Message: "RouteController created a route",
LastTransitionTime: currentTime,
})
}
} else {
if networkingCondition != nil && networkingCondition.Status != api.ConditionTrue {
networkingCondition.Status = api.ConditionTrue
networkingCondition.Reason = "NoRouteCreated"
networkingCondition.Message = "RouteController failed to create a route"
networkingCondition.LastTransitionTime = currentTime
} else if networkingCondition == nil {
node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{
Type: api.NodeNetworkUnavailable,
Status: api.ConditionTrue,
Reason: "NoRouteCreated",
Message: "RouteController failed to create a route",
LastTransitionTime: currentTime,
})
}
}
}
func (rc *RouteController) updateNetworkingCondition(nodeName string, routeCreated bool) error {
var err error
for i := 0; i < updateNodeStatusMaxRetries; i++ {
node, err := rc.kubeClient.Core().Nodes().Get(nodeName)
if err != nil {
glog.Errorf("Error geting node: %v", err)
continue
}
updateNetworkingCondition(node, routeCreated)
// TODO: Use Patch instead once #26381 is merged.
// See kubernetes/node-problem-detector#9 for details.
if _, err = rc.kubeClient.Core().Nodes().UpdateStatus(node); err == nil {
return nil
}
if i+1 < updateNodeStatusMaxRetries {
glog.Errorf("Error updating node %s, retrying: %v", node.Name, err)
} else {
glog.Errorf("Error updating node %s: %v", node.Name, err)
}
}
return err
}
func (rc *RouteController) isResponsibleForRoute(route *cloudprovider.Route) bool {
_, cidr, err := net.ParseCIDR(route.DestinationCIDR)
if err != nil {