Merge pull request #7799 from cjcullen/test_pull_5246
Fix sync problems in #5246
This commit is contained in:
@@ -37,52 +37,12 @@ MASTER_NAME="${INSTANCE_PREFIX}-master"
|
|||||||
MASTER_TAG="${INSTANCE_PREFIX}-master"
|
MASTER_TAG="${INSTANCE_PREFIX}-master"
|
||||||
MINION_TAG="${INSTANCE_PREFIX}-minion"
|
MINION_TAG="${INSTANCE_PREFIX}-minion"
|
||||||
MASTER_IP_RANGE="${MASTER_IP_RANGE:-10.246.0.0/24}"
|
MASTER_IP_RANGE="${MASTER_IP_RANGE:-10.246.0.0/24}"
|
||||||
|
|
||||||
# Compute IP addresses for nodes.
|
|
||||||
function increment_ipv4 {
|
|
||||||
local ip_base=$1
|
|
||||||
local incr_amount=$2
|
|
||||||
local -a ip_components
|
|
||||||
local ip_regex="([0-9]+).([0-9]+).([0-9]+).([0-9]+)"
|
|
||||||
[[ $ip_base =~ $ip_regex ]]
|
|
||||||
ip_components=("${BASH_REMATCH[1]}" "${BASH_REMATCH[2]}" "${BASH_REMATCH[3]}" "${BASH_REMATCH[4]}")
|
|
||||||
ip_dec=0
|
|
||||||
local comp
|
|
||||||
for comp in "${ip_components[@]}"; do
|
|
||||||
ip_dec=$((ip_dec<<8))
|
|
||||||
ip_dec=$((ip_dec + $comp))
|
|
||||||
done
|
|
||||||
|
|
||||||
ip_dec=$((ip_dec + $incr_amount))
|
|
||||||
|
|
||||||
ip_components=()
|
|
||||||
local i
|
|
||||||
for ((i=0; i < 4; i++)); do
|
|
||||||
comp=$((ip_dec & 0xFF))
|
|
||||||
ip_components+=($comp)
|
|
||||||
ip_dec=$((ip_dec>>8))
|
|
||||||
done
|
|
||||||
echo "${ip_components[3]}.${ip_components[2]}.${ip_components[1]}.${ip_components[0]}"
|
|
||||||
}
|
|
||||||
|
|
||||||
node_count="${NUM_MINIONS}"
|
|
||||||
next_node="${KUBE_GCE_CLUSTER_CLASS_B:-10.244}.0.0"
|
|
||||||
node_subnet_size=24
|
|
||||||
node_subnet_count=$((2 ** (32-$node_subnet_size)))
|
|
||||||
subnets=()
|
|
||||||
|
|
||||||
for ((node_num=0; node_num<node_count; node_num++)); do
|
|
||||||
subnets+=("$next_node"/"${node_subnet_size}")
|
|
||||||
next_node=$(increment_ipv4 $next_node $node_subnet_count)
|
|
||||||
done
|
|
||||||
|
|
||||||
CLUSTER_IP_RANGE="${KUBE_GCE_CLUSTER_CLASS_B:-10.244}.0.0/16"
|
CLUSTER_IP_RANGE="${KUBE_GCE_CLUSTER_CLASS_B:-10.244}.0.0/16"
|
||||||
MINION_IP_RANGES=($(eval echo "${subnets[@]}"))
|
|
||||||
|
|
||||||
MINION_SCOPES=("storage-ro" "compute-rw" "https://www.googleapis.com/auth/monitoring" "https://www.googleapis.com/auth/logging.write")
|
MINION_SCOPES=("storage-ro" "compute-rw" "https://www.googleapis.com/auth/monitoring" "https://www.googleapis.com/auth/logging.write")
|
||||||
# Increase the sleep interval value if concerned about API rate limits. 3, in seconds, is the default.
|
# Increase the sleep interval value if concerned about API rate limits. 3, in seconds, is the default.
|
||||||
POLL_SLEEP_INTERVAL=3
|
POLL_SLEEP_INTERVAL=3
|
||||||
PORTAL_NET="10.0.0.0/16"
|
PORTAL_NET="10.0.0.0/16"
|
||||||
|
ALLOCATE_NODE_CIDRS=true
|
||||||
|
|
||||||
# When set to true, Docker Cache is enabled by default as part of the cluster bring up.
|
# When set to true, Docker Cache is enabled by default as part of the cluster bring up.
|
||||||
ENABLE_DOCKER_REGISTRY_CACHE=true
|
ENABLE_DOCKER_REGISTRY_CACHE=true
|
||||||
|
@@ -37,7 +37,6 @@ MASTER_NAME="${INSTANCE_PREFIX}-master"
|
|||||||
MASTER_TAG="${INSTANCE_PREFIX}-master"
|
MASTER_TAG="${INSTANCE_PREFIX}-master"
|
||||||
MINION_TAG="${INSTANCE_PREFIX}-minion"
|
MINION_TAG="${INSTANCE_PREFIX}-minion"
|
||||||
CLUSTER_IP_RANGE="${KUBE_GCE_CLUSTER_CLASS_B:-10.245}.0.0/16"
|
CLUSTER_IP_RANGE="${KUBE_GCE_CLUSTER_CLASS_B:-10.245}.0.0/16"
|
||||||
MINION_IP_RANGES=($(eval echo "${KUBE_GCE_CLUSTER_CLASS_B:-10.245}.{1..${NUM_MINIONS}}.0/24"))
|
|
||||||
MASTER_IP_RANGE="${MASTER_IP_RANGE:-10.246.0.0/24}"
|
MASTER_IP_RANGE="${MASTER_IP_RANGE:-10.246.0.0/24}"
|
||||||
MINION_SCOPES=("storage-ro" "compute-rw" "https://www.googleapis.com/auth/logging.write" "https://www.googleapis.com/auth/monitoring")
|
MINION_SCOPES=("storage-ro" "compute-rw" "https://www.googleapis.com/auth/logging.write" "https://www.googleapis.com/auth/monitoring")
|
||||||
# Increase the sleep interval value if concerned about API rate limits. 3, in seconds, is the default.
|
# Increase the sleep interval value if concerned about API rate limits. 3, in seconds, is the default.
|
||||||
|
@@ -234,6 +234,8 @@ function create-salt-pillar() {
|
|||||||
cat <<EOF >/srv/salt-overlay/pillar/cluster-params.sls
|
cat <<EOF >/srv/salt-overlay/pillar/cluster-params.sls
|
||||||
instance_prefix: '$(echo "$INSTANCE_PREFIX" | sed -e "s/'/''/g")'
|
instance_prefix: '$(echo "$INSTANCE_PREFIX" | sed -e "s/'/''/g")'
|
||||||
node_instance_prefix: '$(echo "$NODE_INSTANCE_PREFIX" | sed -e "s/'/''/g")'
|
node_instance_prefix: '$(echo "$NODE_INSTANCE_PREFIX" | sed -e "s/'/''/g")'
|
||||||
|
cluster_class_b: '$(echo "$KUBE_GCE_CLUSTER_CLASS_B" | sed -e "s/'/''/g")'
|
||||||
|
allocate_node_cidrs: '$(echo "$ALLOCATE_NODE_CIDRS" | sed -e "s/'/''/g")'
|
||||||
portal_net: '$(echo "$PORTAL_NET" | sed -e "s/'/''/g")'
|
portal_net: '$(echo "$PORTAL_NET" | sed -e "s/'/''/g")'
|
||||||
enable_cluster_monitoring: '$(echo "$ENABLE_CLUSTER_MONITORING" | sed -e "s/'/''/g")'
|
enable_cluster_monitoring: '$(echo "$ENABLE_CLUSTER_MONITORING" | sed -e "s/'/''/g")'
|
||||||
enable_node_monitoring: '$(echo "$ENABLE_NODE_MONITORING" | sed -e "s/'/''/g")'
|
enable_node_monitoring: '$(echo "$ENABLE_NODE_MONITORING" | sed -e "s/'/''/g")'
|
||||||
|
@@ -28,9 +28,11 @@ function build-kube-env {
|
|||||||
ENV_TIMESTAMP: $(yaml-quote $(date -u +%Y-%m-%dT%T%z))
|
ENV_TIMESTAMP: $(yaml-quote $(date -u +%Y-%m-%dT%T%z))
|
||||||
INSTANCE_PREFIX: $(yaml-quote ${INSTANCE_PREFIX})
|
INSTANCE_PREFIX: $(yaml-quote ${INSTANCE_PREFIX})
|
||||||
NODE_INSTANCE_PREFIX: $(yaml-quote ${NODE_INSTANCE_PREFIX})
|
NODE_INSTANCE_PREFIX: $(yaml-quote ${NODE_INSTANCE_PREFIX})
|
||||||
|
KUBE_GCE_CLUSTER_CLASS_B: $(yaml-quote ${KUBE_GCE_CLUSTER_CLASS_B:-10.244})
|
||||||
SERVER_BINARY_TAR_URL: $(yaml-quote ${SERVER_BINARY_TAR_URL})
|
SERVER_BINARY_TAR_URL: $(yaml-quote ${SERVER_BINARY_TAR_URL})
|
||||||
SALT_TAR_URL: $(yaml-quote ${SALT_TAR_URL})
|
SALT_TAR_URL: $(yaml-quote ${SALT_TAR_URL})
|
||||||
PORTAL_NET: $(yaml-quote ${PORTAL_NET})
|
PORTAL_NET: $(yaml-quote ${PORTAL_NET})
|
||||||
|
ALLOCATE_NODE_CIDRS: $(yaml-quote ${ALLOCATE_NODE_CIDRS:-false})
|
||||||
ENABLE_CLUSTER_MONITORING: $(yaml-quote ${ENABLE_CLUSTER_MONITORING:-false})
|
ENABLE_CLUSTER_MONITORING: $(yaml-quote ${ENABLE_CLUSTER_MONITORING:-false})
|
||||||
ENABLE_NODE_MONITORING: $(yaml-quote ${ENABLE_NODE_MONITORING:-false})
|
ENABLE_NODE_MONITORING: $(yaml-quote ${ENABLE_NODE_MONITORING:-false})
|
||||||
ENABLE_CLUSTER_LOGGING: $(yaml-quote ${ENABLE_CLUSTER_LOGGING:-false})
|
ENABLE_CLUSTER_LOGGING: $(yaml-quote ${ENABLE_CLUSTER_LOGGING:-false})
|
||||||
|
@@ -26,9 +26,11 @@ function build-kube-env {
|
|||||||
ENV_TIMESTAMP: $(yaml-quote $(date -u +%Y-%m-%dT%T%z))
|
ENV_TIMESTAMP: $(yaml-quote $(date -u +%Y-%m-%dT%T%z))
|
||||||
INSTANCE_PREFIX: $(yaml-quote ${INSTANCE_PREFIX})
|
INSTANCE_PREFIX: $(yaml-quote ${INSTANCE_PREFIX})
|
||||||
NODE_INSTANCE_PREFIX: $(yaml-quote ${NODE_INSTANCE_PREFIX})
|
NODE_INSTANCE_PREFIX: $(yaml-quote ${NODE_INSTANCE_PREFIX})
|
||||||
|
KUBE_GCE_CLUSTER_CLASS_B: $(yaml-quote ${KUBE_GCE_CLUSTER_CLASS_B:-10.244})
|
||||||
SERVER_BINARY_TAR_URL: $(yaml-quote ${SERVER_BINARY_TAR_URL})
|
SERVER_BINARY_TAR_URL: $(yaml-quote ${SERVER_BINARY_TAR_URL})
|
||||||
SALT_TAR_URL: $(yaml-quote ${SALT_TAR_URL})
|
SALT_TAR_URL: $(yaml-quote ${SALT_TAR_URL})
|
||||||
PORTAL_NET: $(yaml-quote ${PORTAL_NET})
|
PORTAL_NET: $(yaml-quote ${PORTAL_NET})
|
||||||
|
ALLOCATE_NODE_CIDRS: $(yaml-quote ${ALLOCATE_NODE_CIDRS:-false})
|
||||||
ENABLE_CLUSTER_MONITORING: $(yaml-quote ${ENABLE_CLUSTER_MONITORING:-false})
|
ENABLE_CLUSTER_MONITORING: $(yaml-quote ${ENABLE_CLUSTER_MONITORING:-false})
|
||||||
ENABLE_NODE_MONITORING: $(yaml-quote ${ENABLE_NODE_MONITORING:-false})
|
ENABLE_NODE_MONITORING: $(yaml-quote ${ENABLE_NODE_MONITORING:-false})
|
||||||
ENABLE_CLUSTER_LOGGING: $(yaml-quote ${ENABLE_CLUSTER_LOGGING:-false})
|
ENABLE_CLUSTER_LOGGING: $(yaml-quote ${ENABLE_CLUSTER_LOGGING:-false})
|
||||||
|
@@ -32,6 +32,8 @@ fi
|
|||||||
|
|
||||||
NODE_INSTANCE_PREFIX="${INSTANCE_PREFIX}-minion"
|
NODE_INSTANCE_PREFIX="${INSTANCE_PREFIX}-minion"
|
||||||
|
|
||||||
|
ALLOCATE_NODE_CIDRS=true
|
||||||
|
|
||||||
KUBE_PROMPT_FOR_UPDATE=y
|
KUBE_PROMPT_FOR_UPDATE=y
|
||||||
KUBE_SKIP_UPDATE=${KUBE_SKIP_UPDATE-"n"}
|
KUBE_SKIP_UPDATE=${KUBE_SKIP_UPDATE-"n"}
|
||||||
|
|
||||||
@@ -336,31 +338,6 @@ function create-firewall-rule {
|
|||||||
done
|
done
|
||||||
}
|
}
|
||||||
|
|
||||||
# Robustly try to create a route.
|
|
||||||
# $1: The name of the route.
|
|
||||||
# $2: IP range.
|
|
||||||
function create-route {
|
|
||||||
detect-project
|
|
||||||
local attempt=0
|
|
||||||
while true; do
|
|
||||||
if ! gcloud compute routes create "$1" \
|
|
||||||
--project "${PROJECT}" \
|
|
||||||
--destination-range "$2" \
|
|
||||||
--network "${NETWORK}" \
|
|
||||||
--next-hop-instance "$1" \
|
|
||||||
--next-hop-instance-zone "${ZONE}"; then
|
|
||||||
if (( attempt > 5 )); then
|
|
||||||
echo -e "${color_red}Failed to create route $1 ${color_norm}"
|
|
||||||
exit 2
|
|
||||||
fi
|
|
||||||
echo -e "${color_yellow}Attempt $(($attempt+1)) failed to create route $1. Retrying.${color_norm}"
|
|
||||||
attempt=$(($attempt+1))
|
|
||||||
else
|
|
||||||
break
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
}
|
|
||||||
|
|
||||||
# Robustly try to create an instance template.
|
# Robustly try to create an instance template.
|
||||||
# $1: The name of the instance template.
|
# $1: The name of the instance template.
|
||||||
# $2: The scopes flag.
|
# $2: The scopes flag.
|
||||||
@@ -569,23 +546,6 @@ function kube-up {
|
|||||||
# to gcloud's deficiency.
|
# to gcloud's deficiency.
|
||||||
wait-for-minions-to-run
|
wait-for-minions-to-run
|
||||||
detect-minion-names
|
detect-minion-names
|
||||||
|
|
||||||
# Create the routes and set IP ranges to instance metadata, 5 instances at a time.
|
|
||||||
for (( i=0; i<${#MINION_NAMES[@]}; i++)); do
|
|
||||||
create-route "${MINION_NAMES[$i]}" "${MINION_IP_RANGES[$i]}" &
|
|
||||||
add-instance-metadata "${MINION_NAMES[$i]}" "node-ip-range=${MINION_IP_RANGES[$i]}" &
|
|
||||||
|
|
||||||
if [ $i -ne 0 ] && [ $((i%5)) -eq 0 ]; then
|
|
||||||
echo Waiting for a batch of routes at $i...
|
|
||||||
wait-for-jobs
|
|
||||||
fi
|
|
||||||
|
|
||||||
done
|
|
||||||
create-route "${MASTER_NAME}" "${MASTER_IP_RANGE}"
|
|
||||||
|
|
||||||
# Wait for last batch of jobs.
|
|
||||||
wait-for-jobs
|
|
||||||
|
|
||||||
detect-master
|
detect-master
|
||||||
|
|
||||||
echo "Waiting for cluster initialization."
|
echo "Waiting for cluster initialization."
|
||||||
|
@@ -1,5 +1,6 @@
|
|||||||
{% set machines = ""-%}
|
{% set machines = ""-%}
|
||||||
{% set cluster_name = "" -%}
|
{% set cluster_name = "" -%}
|
||||||
|
{% set cluster_class_b = "" -%}
|
||||||
{% set minion_regexp = "--minion_regexp=.*" -%}
|
{% set minion_regexp = "--minion_regexp=.*" -%}
|
||||||
{% set sync_nodes = "--sync_nodes=true" -%}
|
{% set sync_nodes = "--sync_nodes=true" -%}
|
||||||
|
|
||||||
@@ -9,6 +10,12 @@
|
|||||||
{% if pillar['instance_prefix'] is defined -%}
|
{% if pillar['instance_prefix'] is defined -%}
|
||||||
{% set cluster_name = "--cluster_name=" + pillar['instance_prefix'] -%}
|
{% set cluster_name = "--cluster_name=" + pillar['instance_prefix'] -%}
|
||||||
{% endif -%}
|
{% endif -%}
|
||||||
|
{% if pillar['cluster_class_b'] is defined -%}
|
||||||
|
{% set cluster_class_b = "--cluster-class-b=" + pillar['cluster_class_b'] -%}
|
||||||
|
{% endif -%}
|
||||||
|
{% if pillar['allocate_node_cidrs'] is defined -%}
|
||||||
|
{% set allocate_node_cidrs = "--allocate-node-cidrs=" + pillar['allocate_node_cidrs'] -%}
|
||||||
|
{% endif -%}
|
||||||
|
|
||||||
{% set cloud_provider = "" -%}
|
{% set cloud_provider = "" -%}
|
||||||
{% set cloud_config = "" -%}
|
{% set cloud_config = "" -%}
|
||||||
@@ -47,7 +54,7 @@
|
|||||||
{% endif -%}
|
{% endif -%}
|
||||||
{% endif -%}
|
{% endif -%}
|
||||||
|
|
||||||
{% set params = "--master=127.0.0.1:8080" + " " + machines + " " + cluster_name + " " + minion_regexp + " " + cloud_provider + " " + sync_nodes + " " + cloud_config + " " + pillar['log_level'] -%}
|
{% set params = "--master=127.0.0.1:8080" + " " + machines + " " + cluster_name + " " + cluster_class_b + " " + allocate_node_cidrs + " " + minion_regexp + " " + cloud_provider + " " + sync_nodes + " " + cloud_config + " " + pillar['log_level'] -%}
|
||||||
|
|
||||||
{
|
{
|
||||||
"apiVersion": "v1beta3",
|
"apiVersion": "v1beta3",
|
||||||
|
@@ -225,7 +225,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
|
|||||||
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
|
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
|
||||||
}}
|
}}
|
||||||
|
|
||||||
nodeController := nodecontroller.NewNodeController(nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, "")
|
nodeController := nodecontroller.NewNodeController(nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, "", "", false)
|
||||||
nodeController.Run(5*time.Second, true)
|
nodeController.Run(5*time.Second, true)
|
||||||
cadvisorInterface := new(cadvisor.Fake)
|
cadvisorInterface := new(cadvisor.Fake)
|
||||||
|
|
||||||
|
@@ -79,6 +79,8 @@ type CMServer struct {
|
|||||||
NodeMemory resource.Quantity
|
NodeMemory resource.Quantity
|
||||||
|
|
||||||
ClusterName string
|
ClusterName string
|
||||||
|
ClusterClassB string
|
||||||
|
AllocateNodeCIDRs bool
|
||||||
EnableProfiling bool
|
EnableProfiling bool
|
||||||
|
|
||||||
Master string
|
Master string
|
||||||
@@ -145,6 +147,8 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
|||||||
fs.Var(resource.NewQuantityFlagValue(&s.NodeMemory), "node-memory", "The amount of memory (in bytes) provisioned on each node")
|
fs.Var(resource.NewQuantityFlagValue(&s.NodeMemory), "node-memory", "The amount of memory (in bytes) provisioned on each node")
|
||||||
fs.StringVar(&s.ClusterName, "cluster-name", s.ClusterName, "The instance prefix for the cluster")
|
fs.StringVar(&s.ClusterName, "cluster-name", s.ClusterName, "The instance prefix for the cluster")
|
||||||
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
|
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
|
||||||
|
fs.StringVar(&s.ClusterClassB, "cluster-class-b", "10.244", "Class B network address for Pods in cluster.")
|
||||||
|
fs.BoolVar(&s.AllocateNodeCIDRs, "allocate-node-cidrs", false, "Should CIDRs for Pods be allocated and set on the cloud provider.")
|
||||||
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
|
fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
|
||||||
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
|
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
|
||||||
}
|
}
|
||||||
@@ -226,7 +230,7 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
|
|
||||||
nodeController := nodecontroller.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources,
|
nodeController := nodecontroller.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources,
|
||||||
kubeClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
|
kubeClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
|
||||||
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, s.ClusterName)
|
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, s.ClusterName, s.ClusterClassB, s.AllocateNodeCIDRs)
|
||||||
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList)
|
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList)
|
||||||
|
|
||||||
serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName)
|
serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName)
|
||||||
|
@@ -132,7 +132,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
|
|||||||
|
|
||||||
const nodeSyncPeriod = 10 * time.Second
|
const nodeSyncPeriod = 10 * time.Second
|
||||||
nodeController := nodecontroller.NewNodeController(
|
nodeController := nodecontroller.NewNodeController(
|
||||||
nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second, "")
|
nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second, "", "", false)
|
||||||
nodeController.Run(nodeSyncPeriod, true)
|
nodeController.Run(nodeSyncPeriod, true)
|
||||||
|
|
||||||
serviceController := servicecontroller.New(nil, cl, "kubernetes")
|
serviceController := servicecontroller.New(nil, cl, "kubernetes")
|
||||||
|
@@ -973,3 +973,11 @@ func (aws *AWSCloud) DeleteVolume(volumeName string) error {
|
|||||||
}
|
}
|
||||||
return awsDisk.delete()
|
return awsDisk.delete()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (v *AWSCloud) Configure(name string, spec *api.NodeSpec) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *AWSCloud) Release(name string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@@ -80,6 +80,10 @@ type Instances interface {
|
|||||||
List(filter string) ([]string, error)
|
List(filter string) ([]string, error)
|
||||||
// GetNodeResources gets the resources for a particular node
|
// GetNodeResources gets the resources for a particular node
|
||||||
GetNodeResources(name string) (*api.NodeResources, error)
|
GetNodeResources(name string) (*api.NodeResources, error)
|
||||||
|
// Configure the specified instance using the spec
|
||||||
|
Configure(name string, spec *api.NodeSpec) error
|
||||||
|
// Delete all the configuration related to the instance, including other cloud resources
|
||||||
|
Release(name string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Zone represents the location of a particular machine.
|
// Zone represents the location of a particular machine.
|
||||||
|
@@ -159,3 +159,13 @@ func (f *FakeCloud) GetNodeResources(name string) (*api.NodeResources, error) {
|
|||||||
f.addCall("get-node-resources")
|
f.addCall("get-node-resources")
|
||||||
return f.NodeResources, f.Err
|
return f.NodeResources, f.Err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *FakeCloud) Configure(name string, spec *api.NodeSpec) error {
|
||||||
|
f.addCall("configure")
|
||||||
|
return f.Err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *FakeCloud) Release(name string) error {
|
||||||
|
f.addCall("release")
|
||||||
|
return f.Err
|
||||||
|
}
|
||||||
|
@@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package gce_cloud
|
package gce_cloud
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
@@ -42,6 +43,10 @@ import (
|
|||||||
"google.golang.org/cloud/compute/metadata"
|
"google.golang.org/cloud/compute/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var ErrMetadataConflict = errors.New("Metadata already set at the same key")
|
||||||
|
|
||||||
|
const podCIDRMetadataKey string = "node-ip-range"
|
||||||
|
|
||||||
// GCECloud is an implementation of Interface, TCPLoadBalancer and Instances for Google Compute Engine.
|
// GCECloud is an implementation of Interface, TCPLoadBalancer and Instances for Google Compute Engine.
|
||||||
type GCECloud struct {
|
type GCECloud struct {
|
||||||
service *compute.Service
|
service *compute.Service
|
||||||
@@ -50,6 +55,9 @@ type GCECloud struct {
|
|||||||
zone string
|
zone string
|
||||||
instanceID string
|
instanceID string
|
||||||
|
|
||||||
|
// We assume here that nodes and master are in the same network. TODO(cjcullen) Fix it.
|
||||||
|
networkName string
|
||||||
|
|
||||||
// Used for accessing the metadata server
|
// Used for accessing the metadata server
|
||||||
metadataAccess func(string) (string, error)
|
metadataAccess func(string) (string, error)
|
||||||
}
|
}
|
||||||
@@ -113,6 +121,18 @@ func getInstanceID() (string, error) {
|
|||||||
return parts[0], nil
|
return parts[0], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getNetworkName() (string, error) {
|
||||||
|
result, err := metadata.Get("instance/network-interfaces/0/network")
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
parts := strings.Split(result, "/")
|
||||||
|
if len(parts) != 4 {
|
||||||
|
return "", fmt.Errorf("unexpected response: %s", result)
|
||||||
|
}
|
||||||
|
return parts[3], nil
|
||||||
|
}
|
||||||
|
|
||||||
// newGCECloud creates a new instance of GCECloud.
|
// newGCECloud creates a new instance of GCECloud.
|
||||||
func newGCECloud(config io.Reader) (*GCECloud, error) {
|
func newGCECloud(config io.Reader) (*GCECloud, error) {
|
||||||
projectID, zone, err := getProjectAndZone()
|
projectID, zone, err := getProjectAndZone()
|
||||||
@@ -126,6 +146,10 @@ func newGCECloud(config io.Reader) (*GCECloud, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
networkName, err := getNetworkName()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
tokenSource := google.ComputeTokenSource("")
|
tokenSource := google.ComputeTokenSource("")
|
||||||
if config != nil {
|
if config != nil {
|
||||||
var cfg Config
|
var cfg Config
|
||||||
@@ -152,6 +176,7 @@ func newGCECloud(config io.Reader) (*GCECloud, error) {
|
|||||||
projectID: projectID,
|
projectID: projectID,
|
||||||
zone: zone,
|
zone: zone,
|
||||||
instanceID: instanceID,
|
instanceID: instanceID,
|
||||||
|
networkName: networkName,
|
||||||
metadataAccess: getMetadata,
|
metadataAccess: getMetadata,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
@@ -217,12 +242,13 @@ func (gce *GCECloud) targetPoolURL(name, region string) string {
|
|||||||
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name)
|
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error {
|
func waitForOp(op *compute.Operation, getOperation func() (*compute.Operation, error)) error {
|
||||||
pollOp := op
|
pollOp := op
|
||||||
for pollOp.Status != "DONE" {
|
for pollOp.Status != "DONE" {
|
||||||
var err error
|
var err error
|
||||||
|
// TODO: add some backoff here.
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
pollOp, err = gce.service.RegionOperations.Get(gce.projectID, region, op.Name).Do()
|
pollOp, err = getOperation()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -234,6 +260,25 @@ func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gce *GCECloud) waitForGlobalOp(op *compute.Operation) error {
|
||||||
|
return waitForOp(op, func() (*compute.Operation, error) {
|
||||||
|
return gce.service.GlobalOperations.Get(gce.projectID, op.Name).Do()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error {
|
||||||
|
return waitForOp(op, func() (*compute.Operation, error) {
|
||||||
|
return gce.service.RegionOperations.Get(gce.projectID, region, op.Name).Do()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gce *GCECloud) waitForZoneOp(op *compute.Operation) error {
|
||||||
|
return waitForOp(op, func() (*compute.Operation, error) {
|
||||||
|
return gce.service.ZoneOperations.Get(gce.projectID, gce.zone, op.Name).Do()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetTCPLoadBalancer is an implementation of TCPLoadBalancer.GetTCPLoadBalancer
|
// GetTCPLoadBalancer is an implementation of TCPLoadBalancer.GetTCPLoadBalancer
|
||||||
@@ -506,6 +551,67 @@ func (gce *GCECloud) GetNodeResources(name string) (*api.NodeResources, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getMetadataValue(metadata *compute.Metadata, key string) (string, bool) {
|
||||||
|
for _, item := range metadata.Items {
|
||||||
|
if item.Key == key {
|
||||||
|
return item.Value, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gce *GCECloud) Configure(name string, spec *api.NodeSpec) error {
|
||||||
|
instanceName := canonicalizeInstanceName(name)
|
||||||
|
instance, err := gce.service.Instances.Get(gce.projectID, gce.zone, instanceName).Do()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if currentValue, ok := getMetadataValue(instance.Metadata, podCIDRMetadataKey); ok {
|
||||||
|
if currentValue == spec.PodCIDR {
|
||||||
|
// IP range already set to proper value.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return ErrMetadataConflict
|
||||||
|
}
|
||||||
|
// We are setting the metadata, so they can be picked-up by the configure-vm.sh script to start docker with the given CIDR for Pods.
|
||||||
|
instance.Metadata.Items = append(instance.Metadata.Items,
|
||||||
|
&compute.MetadataItems{
|
||||||
|
Key: podCIDRMetadataKey,
|
||||||
|
Value: spec.PodCIDR,
|
||||||
|
})
|
||||||
|
setMetadataCall := gce.service.Instances.SetMetadata(gce.projectID, gce.zone, instanceName, instance.Metadata)
|
||||||
|
setMetadataOp, err := setMetadataCall.Do()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = gce.waitForZoneOp(setMetadataOp)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
insertCall := gce.service.Routes.Insert(gce.projectID, &compute.Route{
|
||||||
|
Name: instanceName,
|
||||||
|
DestRange: spec.PodCIDR,
|
||||||
|
NextHopInstance: fmt.Sprintf("zones/%s/instances/%s", gce.zone, instanceName),
|
||||||
|
Network: fmt.Sprintf("global/networks/%s", gce.networkName),
|
||||||
|
Priority: 1000,
|
||||||
|
})
|
||||||
|
insertOp, err := insertCall.Do()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return gce.waitForGlobalOp(insertOp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gce *GCECloud) Release(name string) error {
|
||||||
|
instanceName := canonicalizeInstanceName(name)
|
||||||
|
deleteCall := gce.service.Routes.Delete(gce.projectID, instanceName)
|
||||||
|
deleteOp, err := deleteCall.Do()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return gce.waitForGlobalOp(deleteOp)
|
||||||
|
}
|
||||||
|
|
||||||
func (gce *GCECloud) GetZone() (cloudprovider.Zone, error) {
|
func (gce *GCECloud) GetZone() (cloudprovider.Zone, error) {
|
||||||
region, err := getGceRegion(gce.zone)
|
region, err := getGceRegion(gce.zone)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -21,6 +21,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
@@ -87,6 +89,8 @@ type NodeController struct {
|
|||||||
// TODO: Change node status monitor to watch based.
|
// TODO: Change node status monitor to watch based.
|
||||||
nodeMonitorPeriod time.Duration
|
nodeMonitorPeriod time.Duration
|
||||||
clusterName string
|
clusterName string
|
||||||
|
clusterClassB string
|
||||||
|
allocateNodeCIDRs bool
|
||||||
// Method for easy mocking in unittest.
|
// Method for easy mocking in unittest.
|
||||||
lookupIP func(host string) ([]net.IP, error)
|
lookupIP func(host string) ([]net.IP, error)
|
||||||
now func() util.Time
|
now func() util.Time
|
||||||
@@ -105,7 +109,9 @@ func NewNodeController(
|
|||||||
nodeMonitorGracePeriod time.Duration,
|
nodeMonitorGracePeriod time.Duration,
|
||||||
nodeStartupGracePeriod time.Duration,
|
nodeStartupGracePeriod time.Duration,
|
||||||
nodeMonitorPeriod time.Duration,
|
nodeMonitorPeriod time.Duration,
|
||||||
clusterName string) *NodeController {
|
clusterName string,
|
||||||
|
clusterClassB string,
|
||||||
|
allocateNodeCIDRs bool) *NodeController {
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
|
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
|
||||||
if kubeClient != nil {
|
if kubeClient != nil {
|
||||||
@@ -131,6 +137,63 @@ func NewNodeController(
|
|||||||
lookupIP: net.LookupIP,
|
lookupIP: net.LookupIP,
|
||||||
now: util.Now,
|
now: util.Now,
|
||||||
clusterName: clusterName,
|
clusterName: clusterName,
|
||||||
|
clusterClassB: clusterClassB,
|
||||||
|
allocateNodeCIDRs: allocateNodeCIDRs,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generates num pod CIDRs that could be assigned to nodes.
|
||||||
|
func (nc *NodeController) generateCIDRs(num int) util.StringSet {
|
||||||
|
res := util.NewStringSet()
|
||||||
|
for i := 0; i < num; i++ {
|
||||||
|
// TODO: Make the CIDRs configurable.
|
||||||
|
res.Insert(fmt.Sprintf("%v.%v.0/24", nc.clusterClassB, i))
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
// For each node from newNodes, finds its current spec in registeredNodes.
|
||||||
|
// If it is not there, it gets a new valid CIDR assigned.
|
||||||
|
func (nc *NodeController) reconcilePodCIDRs(newNodes, registeredNodes *api.NodeList) *api.NodeList {
|
||||||
|
registeredCIDRs := make(map[string]string)
|
||||||
|
availableCIDRs := nc.generateCIDRs(len(newNodes.Items) + len(registeredNodes.Items))
|
||||||
|
for _, node := range registeredNodes.Items {
|
||||||
|
registeredCIDRs[node.Name] = node.Spec.PodCIDR
|
||||||
|
availableCIDRs.Delete(node.Spec.PodCIDR)
|
||||||
|
}
|
||||||
|
for i, node := range newNodes.Items {
|
||||||
|
podCIDR, registered := registeredCIDRs[node.Name]
|
||||||
|
if !registered {
|
||||||
|
podCIDR, _ = availableCIDRs.PopAny()
|
||||||
|
}
|
||||||
|
newNodes.Items[i].Spec.PodCIDR = podCIDR
|
||||||
|
}
|
||||||
|
return newNodes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nc *NodeController) configureNodeCIDR(node *api.Node) {
|
||||||
|
instances, ok := nc.cloud.Instances()
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("Error configuring node %s: CloudProvider does not support Instances()", node.Name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err := instances.Configure(node.Name, &node.Spec)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error configuring node %s: %s", node.Name, err)
|
||||||
|
// The newly assigned CIDR was not properly configured, so don't save it in the API server.
|
||||||
|
node.Spec.PodCIDR = ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (nc *NodeController) unassignNodeCIDR(nodeName string) {
|
||||||
|
instances, ok := nc.cloud.Instances()
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("Error deconfiguring node %s: CloudProvider does not support Instances()", nodeName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err := instances.Release(nodeName)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error deconfiguring node %s: %s", nodeName, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -164,6 +227,9 @@ func (nc *NodeController) Run(period time.Duration, syncNodeList bool) {
|
|||||||
if nodes, err = nc.populateAddresses(nodes); err != nil {
|
if nodes, err = nc.populateAddresses(nodes); err != nil {
|
||||||
glog.Errorf("Error getting nodes ips: %v", err)
|
glog.Errorf("Error getting nodes ips: %v", err)
|
||||||
}
|
}
|
||||||
|
if nc.isRunningCloudProvider() && nc.allocateNodeCIDRs {
|
||||||
|
nc.reconcilePodCIDRs(nodes, &api.NodeList{})
|
||||||
|
}
|
||||||
if err := nc.registerNodes(nodes, nc.registerRetryCount, period); err != nil {
|
if err := nc.registerNodes(nodes, nc.registerRetryCount, period); err != nil {
|
||||||
glog.Errorf("Error registering node list %+v: %v", nodes, err)
|
glog.Errorf("Error registering node list %+v: %v", nodes, err)
|
||||||
}
|
}
|
||||||
@@ -190,29 +256,37 @@ func (nc *NodeController) registerNodes(nodes *api.NodeList, retryCount int, ret
|
|||||||
if len(nodes.Items) == 0 {
|
if len(nodes.Items) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
registered := util.NewStringSet()
|
|
||||||
nodes = nc.canonicalizeName(nodes)
|
nodes = nc.canonicalizeName(nodes)
|
||||||
|
toRegister := util.NewStringSet()
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var successfullyRegistered int32 = 0
|
||||||
|
for i := range nodes.Items {
|
||||||
|
node := &nodes.Items[i]
|
||||||
|
if !toRegister.Has(node.Name) {
|
||||||
|
wg.Add(1)
|
||||||
|
toRegister.Insert(node.Name)
|
||||||
|
go func(n *api.Node) {
|
||||||
|
defer wg.Done()
|
||||||
for i := 0; i < retryCount; i++ {
|
for i := 0; i < retryCount; i++ {
|
||||||
for _, node := range nodes.Items {
|
if nc.isRunningCloudProvider() && nc.allocateNodeCIDRs {
|
||||||
if registered.Has(node.Name) {
|
nc.configureNodeCIDR(n)
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
_, err := nc.kubeClient.Nodes().Create(&node)
|
_, err := nc.kubeClient.Nodes().Create(n)
|
||||||
if err == nil || apierrors.IsAlreadyExists(err) {
|
if err == nil || apierrors.IsAlreadyExists(err) {
|
||||||
registered.Insert(node.Name)
|
glog.Infof("Registered node in registry: %v", n.Name)
|
||||||
glog.Infof("Registered node in registry: %s", node.Name)
|
atomic.AddInt32(&successfullyRegistered, 1)
|
||||||
|
return
|
||||||
} else {
|
} else {
|
||||||
glog.Errorf("Error registering node %s, retrying: %s", node.Name, err)
|
glog.Errorf("Error registering node %v (retries left: %v): %v", n.Name, retryCount-i-1, err)
|
||||||
}
|
|
||||||
if registered.Len() == len(nodes.Items) {
|
|
||||||
glog.Infof("Successfully registered all nodes")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
time.Sleep(retryInterval)
|
time.Sleep(retryInterval)
|
||||||
}
|
}
|
||||||
if registered.Len() != len(nodes.Items) {
|
glog.Errorf("Unable to register node %v", n.Name)
|
||||||
|
}(node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
if int32(toRegister.Len()) != atomic.LoadInt32(&successfullyRegistered) {
|
||||||
return ErrRegistration
|
return ErrRegistration
|
||||||
} else {
|
} else {
|
||||||
return nil
|
return nil
|
||||||
@@ -230,43 +304,69 @@ func (nc *NodeController) syncCloudNodes() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
nodeMap := make(map[string]*api.Node)
|
nodeMap := make(map[string]*api.Node)
|
||||||
|
nodeMapLock := sync.Mutex{}
|
||||||
for i := range nodes.Items {
|
for i := range nodes.Items {
|
||||||
node := nodes.Items[i]
|
node := nodes.Items[i]
|
||||||
|
nodeMapLock.Lock()
|
||||||
nodeMap[node.Name] = &node
|
nodeMap[node.Name] = &node
|
||||||
|
nodeMapLock.Unlock()
|
||||||
}
|
}
|
||||||
|
if nc.allocateNodeCIDRs {
|
||||||
|
nc.reconcilePodCIDRs(matches, nodes)
|
||||||
|
}
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(matches.Items))
|
||||||
// Create nodes which have been created in cloud, but not in kubernetes cluster
|
// Create nodes which have been created in cloud, but not in kubernetes cluster
|
||||||
// Skip nodes if we hit an error while trying to get their addresses.
|
// Skip nodes if we hit an error while trying to get their addresses.
|
||||||
for _, node := range matches.Items {
|
for i := range matches.Items {
|
||||||
if _, ok := nodeMap[node.Name]; !ok {
|
go func(node *api.Node) {
|
||||||
|
defer wg.Done()
|
||||||
|
nodeMapLock.Lock()
|
||||||
|
_, ok := nodeMap[node.Name]
|
||||||
|
nodeMapLock.Unlock()
|
||||||
|
if !ok {
|
||||||
glog.V(3).Infof("Querying addresses for new node: %s", node.Name)
|
glog.V(3).Infof("Querying addresses for new node: %s", node.Name)
|
||||||
nodeList := &api.NodeList{}
|
nodeList := &api.NodeList{}
|
||||||
nodeList.Items = []api.Node{node}
|
nodeList.Items = []api.Node{*node}
|
||||||
_, err = nc.populateAddresses(nodeList)
|
_, err = nc.populateAddresses(nodeList)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error fetching addresses for new node %s: %v", node.Name, err)
|
glog.Errorf("Error fetching addresses for new node %s: %v", node.Name, err)
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
node.Status.Addresses = nodeList.Items[0].Status.Addresses
|
node.Status.Addresses = nodeList.Items[0].Status.Addresses
|
||||||
|
if nc.allocateNodeCIDRs {
|
||||||
|
nc.configureNodeCIDR(node)
|
||||||
|
}
|
||||||
glog.Infof("Create node in registry: %s", node.Name)
|
glog.Infof("Create node in registry: %s", node.Name)
|
||||||
_, err = nc.kubeClient.Nodes().Create(&node)
|
_, err = nc.kubeClient.Nodes().Create(node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Create node %s error: %v", node.Name, err)
|
glog.Errorf("Create node %s error: %v", node.Name, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
nodeMapLock.Lock()
|
||||||
delete(nodeMap, node.Name)
|
delete(nodeMap, node.Name)
|
||||||
|
nodeMapLock.Unlock()
|
||||||
|
}(&matches.Items[i])
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
wg.Add(len(nodeMap))
|
||||||
// Delete nodes which have been deleted from cloud, but not from kubernetes cluster.
|
// Delete nodes which have been deleted from cloud, but not from kubernetes cluster.
|
||||||
for nodeID := range nodeMap {
|
for nodeID := range nodeMap {
|
||||||
|
go func(nodeID string) {
|
||||||
|
defer wg.Done()
|
||||||
|
if nc.allocateNodeCIDRs {
|
||||||
|
nc.unassignNodeCIDR(nodeID)
|
||||||
|
}
|
||||||
glog.Infof("Delete node from registry: %s", nodeID)
|
glog.Infof("Delete node from registry: %s", nodeID)
|
||||||
err = nc.kubeClient.Nodes().Delete(nodeID)
|
err = nc.kubeClient.Nodes().Delete(nodeID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Delete node %s error: %v", nodeID, err)
|
glog.Errorf("Delete node %s error: %v", nodeID, err)
|
||||||
}
|
}
|
||||||
nc.deletePods(nodeID)
|
nc.deletePods(nodeID)
|
||||||
|
}(nodeID)
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@@ -21,6 +21,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -59,6 +60,9 @@ type FakeNodeHandler struct {
|
|||||||
UpdatedNodes []*api.Node
|
UpdatedNodes []*api.Node
|
||||||
UpdatedNodeStatuses []*api.Node
|
UpdatedNodeStatuses []*api.Node
|
||||||
RequestCount int
|
RequestCount int
|
||||||
|
|
||||||
|
// Synchronization
|
||||||
|
createLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FakeNodeHandler) Nodes() client.NodeInterface {
|
func (c *FakeNodeHandler) Nodes() client.NodeInterface {
|
||||||
@@ -66,7 +70,11 @@ func (c *FakeNodeHandler) Nodes() client.NodeInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *FakeNodeHandler) Create(node *api.Node) (*api.Node, error) {
|
func (m *FakeNodeHandler) Create(node *api.Node) (*api.Node, error) {
|
||||||
defer func() { m.RequestCount++ }()
|
m.createLock.Lock()
|
||||||
|
defer func() {
|
||||||
|
m.RequestCount++
|
||||||
|
m.createLock.Unlock()
|
||||||
|
}()
|
||||||
for _, n := range m.Existing {
|
for _, n := range m.Existing {
|
||||||
if n.Name == node.Name {
|
if n.Name == node.Name {
|
||||||
return nil, apierrors.NewAlreadyExists("Minion", node.Name)
|
return nil, apierrors.NewAlreadyExists("Minion", node.Name)
|
||||||
@@ -238,7 +246,7 @@ func TestRegisterNodes(t *testing.T) {
|
|||||||
nodes.Items = append(nodes.Items, *newNode(machine))
|
nodes.Items = append(nodes.Items, *newNode(machine))
|
||||||
}
|
}
|
||||||
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
|
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
|
||||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false)
|
||||||
err := nodeController.registerNodes(&nodes, item.retryCount, time.Millisecond)
|
err := nodeController.registerNodes(&nodes, item.retryCount, time.Millisecond)
|
||||||
if !item.expectedFail && err != nil {
|
if !item.expectedFail && err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@@ -324,7 +332,7 @@ func TestCreateGetStaticNodesWithSpec(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(nil, "", item.machines, &resources, nil, 10, time.Minute,
|
nodeController := NewNodeController(nil, "", item.machines, &resources, nil, 10, time.Minute,
|
||||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false)
|
||||||
nodes, err := nodeController.getStaticNodesWithSpec()
|
nodes, err := nodeController.getStaticNodesWithSpec()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@@ -386,7 +394,7 @@ func TestCreateGetCloudNodesWithSpec(t *testing.T) {
|
|||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, 10, time.Minute,
|
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, 10, time.Minute,
|
||||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false)
|
||||||
nodes, err := nodeController.getCloudNodesWithSpec()
|
nodes, err := nodeController.getCloudNodesWithSpec()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@@ -496,7 +504,7 @@ func TestSyncCloudNodes(t *testing.T) {
|
|||||||
item.fakeNodeHandler.Fake = testclient.NewSimpleFake()
|
item.fakeNodeHandler.Fake = testclient.NewSimpleFake()
|
||||||
}
|
}
|
||||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
|
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
|
||||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false)
|
||||||
if err := nodeController.syncCloudNodes(); err != nil {
|
if err := nodeController.syncCloudNodes(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@@ -580,7 +588,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) {
|
|||||||
item.fakeNodeHandler.Fake = testclient.NewSimpleFake()
|
item.fakeNodeHandler.Fake = testclient.NewSimpleFake()
|
||||||
}
|
}
|
||||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
|
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
|
||||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false)
|
||||||
if err := nodeController.syncCloudNodes(); err != nil {
|
if err := nodeController.syncCloudNodes(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@@ -620,7 +628,7 @@ func TestPopulateNodeAddresses(t *testing.T) {
|
|||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, 10, time.Minute,
|
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, 10, time.Minute,
|
||||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false)
|
||||||
result, err := nodeController.populateAddresses(item.nodes)
|
result, err := nodeController.populateAddresses(item.nodes)
|
||||||
// In case of IP querying error, we should continue.
|
// In case of IP querying error, we should continue.
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -820,7 +828,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
|
|||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, 10,
|
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, 10,
|
||||||
evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod,
|
evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod,
|
||||||
testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false)
|
||||||
nodeController.now = func() util.Time { return fakeNow }
|
nodeController.now = func() util.Time { return fakeNow }
|
||||||
if err := nodeController.monitorNodeStatus(); err != nil {
|
if err := nodeController.monitorNodeStatus(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@@ -1022,7 +1030,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
|
|||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, 10, 5*time.Minute, util.NewFakeRateLimiter(),
|
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, 10, 5*time.Minute, util.NewFakeRateLimiter(),
|
||||||
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false)
|
||||||
nodeController.now = func() util.Time { return fakeNow }
|
nodeController.now = func() util.Time { return fakeNow }
|
||||||
if err := nodeController.monitorNodeStatus(); err != nil {
|
if err := nodeController.monitorNodeStatus(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
@@ -389,6 +389,14 @@ func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) {
|
|||||||
return rsrc, nil
|
return rsrc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (i *Instances) Configure(name string, spec *api.NodeSpec) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *Instances) Release(name string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (os *OpenStack) Clusters() (cloudprovider.Clusters, bool) {
|
func (os *OpenStack) Clusters() (cloudprovider.Clusters, bool) {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
@@ -250,3 +250,11 @@ func (v *OVirtCloud) List(filter string) ([]string, error) {
|
|||||||
func (v *OVirtCloud) GetNodeResources(name string) (*api.NodeResources, error) {
|
func (v *OVirtCloud) GetNodeResources(name string) (*api.NodeResources, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (v *OVirtCloud) Configure(name string, spec *api.NodeSpec) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *OVirtCloud) Release(name string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@@ -395,6 +395,14 @@ func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) {
|
|||||||
return rsrc, nil
|
return rsrc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (i *Instances) Configure(name string, spec *api.NodeSpec) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *Instances) Release(name string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (os *Rackspace) Clusters() (cloudprovider.Clusters, bool) {
|
func (os *Rackspace) Clusters() (cloudprovider.Clusters, bool) {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
@@ -239,3 +239,11 @@ func (v *VagrantCloud) List(filter string) ([]string, error) {
|
|||||||
func (v *VagrantCloud) GetNodeResources(name string) (*api.NodeResources, error) {
|
func (v *VagrantCloud) GetNodeResources(name string) (*api.NodeResources, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (v *VagrantCloud) Configure(name string, spec *api.NodeSpec) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *VagrantCloud) Release(name string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@@ -139,6 +139,15 @@ func (s StringSet) List() []string {
|
|||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Returns a single element from the set.
|
||||||
|
func (s StringSet) PopAny() (string, bool) {
|
||||||
|
for key := range s {
|
||||||
|
s.Delete(key)
|
||||||
|
return key, true
|
||||||
|
}
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
|
||||||
// Len returns the size of the set.
|
// Len returns the size of the set.
|
||||||
func (s StringSet) Len() int {
|
func (s StringSet) Len() int {
|
||||||
return len(s)
|
return len(s)
|
||||||
|
Reference in New Issue
Block a user