Merge pull request #6949 from roberthbailey/node-register

Modify nodes to register directly with the master.
This commit is contained in:
Eric Tune
2015-05-19 11:29:43 -07:00
12 changed files with 314 additions and 876 deletions

View File

@@ -31,6 +31,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
@@ -66,11 +67,6 @@ const (
// Max amount of time to wait for the container runtime to come up.
maxWaitForContainerRuntime = 5 * time.Minute
// Initial node status update frequency and incremental frequency, for faster cluster startup.
// The update frequency will be increameted linearly, until it reaches status_update_frequency.
initialNodeStatusUpdateFrequency = 100 * time.Millisecond
nodeStatusUpdateFrequencyInc = 500 * time.Millisecond
// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5
@@ -122,6 +118,7 @@ func NewMainKubelet(
pullBurst int,
containerGCPolicy ContainerGCPolicy,
sourcesReady SourcesReadyFn,
registerNode bool,
clusterDomain string,
clusterDNS net.IP,
masterServiceNamespace string,
@@ -224,6 +221,7 @@ func NewMainKubelet(
readinessManager: readinessManager,
httpClient: &http.Client{},
sourcesReady: sourcesReady,
registerNode: registerNode,
clusterDomain: clusterDomain,
clusterDNS: clusterDNS,
serviceLister: serviceLister,
@@ -373,6 +371,9 @@ type Kubelet struct {
// cAdvisor used for container information.
cadvisor cadvisor.Interface
// Set to true to have the node register itself with the apiserver.
registerNode bool
// If non-empty, use this for container DNS search.
clusterDomain string
@@ -657,26 +658,22 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
glog.Infof("Running in container %q", kl.resourceContainer)
}
err := kl.imageManager.Start()
if err != nil {
if err := kl.imageManager.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start ImageManager %v", err)
glog.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)
}
err = kl.cadvisor.Start()
if err != nil {
if err := kl.cadvisor.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start CAdvisor %v", err)
glog.Errorf("Failed to start CAdvisor, system may not be properly monitored: %v", err)
}
err = kl.containerManager.Start()
if err != nil {
if err := kl.containerManager.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start ContainerManager %v", err)
glog.Errorf("Failed to start ContainerManager, system may not be properly isolated: %v", err)
}
err = kl.oomWatcher.Start(kl.nodeRef)
if err != nil {
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start OOM watcher %v", err)
glog.Errorf("Failed to start OOM watching: %v", err)
}
@@ -688,20 +685,83 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
kl.syncLoop(updates, kl)
}
func (kl *Kubelet) initialNodeStatus() (*api.Node, error) {
node := &api.Node{
ObjectMeta: api.ObjectMeta{
Name: kl.hostname,
Labels: map[string]string{"kubernetes.io/hostname": kl.hostname},
},
}
if kl.cloud != nil {
instances, ok := kl.cloud.Instances()
if !ok {
return nil, fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
instanceID, err := instances.ExternalID(kl.hostname)
if err != nil {
return nil, fmt.Errorf("failed to get instance ID from cloud provider: %v", err)
}
node.Spec.ExternalID = instanceID
} else {
node.Spec.ExternalID = kl.hostname
}
if err := kl.setNodeStatus(node); err != nil {
return nil, err
}
return node, nil
}
// registerWithApiserver registers the node with the cluster master.
func (kl *Kubelet) registerWithApiserver() {
step := 100 * time.Millisecond
for {
time.Sleep(step)
step = step * 2
if step >= 7*time.Second {
step = 7 * time.Second
}
node, err := kl.initialNodeStatus()
if err != nil {
glog.Errorf("Unable to construct api.Node object for kubelet: %v", err)
continue
}
glog.V(2).Infof("Attempting to register node %s", node.Name)
if _, err := kl.kubeClient.Nodes().Create(node); err != nil {
if apierrors.IsAlreadyExists(err) {
currentNode, err := kl.kubeClient.Nodes().Get(kl.hostname)
if err != nil {
glog.Errorf("error getting node %q: %v", kl.hostname, err)
continue
}
if currentNode == nil {
glog.Errorf("no node instance returned for %q", kl.hostname)
continue
}
if currentNode.Spec.ExternalID == node.Spec.ExternalID {
glog.Infof("Node %s was previously registered", node.Name)
return
}
}
glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err)
continue
}
glog.Infof("Successfully registered node %s", node.Name)
return
}
}
// syncNodeStatus periodically synchronizes node status to master.
func (kl *Kubelet) syncNodeStatus() {
if kl.kubeClient == nil {
return
}
glog.Infof("Starting node status updates")
for feq := initialNodeStatusUpdateFrequency; feq < kl.nodeStatusUpdateFrequency; feq += nodeStatusUpdateFrequencyInc {
select {
case <-time.After(feq):
if err := kl.updateNodeStatus(); err != nil {
glog.Errorf("Unable to update node status: %v", err)
}
}
if kl.registerNode {
kl.registerWithApiserver()
}
glog.Infof("Starting node status updates")
for {
select {
case <-time.After(kl.nodeStatusUpdateFrequency):
@@ -1711,14 +1771,13 @@ func (kl *Kubelet) reconcileCBR0(podCIDR string) error {
// updateNodeStatus updates node status to master with retries.
func (kl *Kubelet) updateNodeStatus() error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
err := kl.tryUpdateNodeStatus()
if err != nil {
glog.Errorf("error updating node status, will retry: %v", err)
if err := kl.tryUpdateNodeStatus(); err != nil {
glog.Errorf("Error updating node status, will retry: %v", err)
} else {
return nil
}
}
return fmt.Errorf("Update node status exceeds retry count")
return fmt.Errorf("update node status exceeds retry count")
}
func (kl *Kubelet) recordNodeOnlineEvent() {
@@ -1742,15 +1801,36 @@ func (kl *Kubelet) recordNodeUnschedulableEvent() {
// Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus()
var oldNodeUnschedulable bool
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly.
func (kl *Kubelet) tryUpdateNodeStatus() error {
node, err := kl.kubeClient.Nodes().Get(kl.hostname)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.hostname, err)
}
if node == nil {
return fmt.Errorf("no node instance returned for %q", kl.hostname)
// setNodeStatus fills in the Status fields of the given Node, overwriting
// any fields that are currently set.
func (kl *Kubelet) setNodeStatus(node *api.Node) error {
// Set addresses for the node.
if kl.cloud != nil {
instances, ok := kl.cloud.Instances()
if !ok {
return fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
nodeAddresses, err := instances.NodeAddresses(kl.hostname)
if err != nil {
return fmt.Errorf("failed to get node address from cloud provider: %v", err)
}
node.Status.Addresses = nodeAddresses
} else {
addr := net.ParseIP(kl.hostname)
if addr != nil {
node.Status.Addresses = []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: addr.String()}}
} else {
addrs, err := net.LookupIP(node.Name)
if err != nil {
return fmt.Errorf("can't get ip address of node %s: %v", node.Name, err)
} else if len(addrs) == 0 {
return fmt.Errorf("no ip address for node %v", node.Name)
} else {
node.Status.Addresses = []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: addrs[0].String()}}
}
}
}
networkConfigured := true
@@ -1765,7 +1845,13 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := kl.GetCachedMachineInfo()
if err != nil {
glog.Errorf("error getting machine info: %v", err)
// TODO(roberthbailey): This is required for test-cmd.sh to pass.
// See if the test should be updated instead.
node.Status.Capacity = api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI),
api.ResourceMemory: resource.MustParse("0Gi"),
}
glog.Errorf("Error getting machine info: %v", err)
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
@@ -1784,7 +1870,7 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
verinfo, err := kl.cadvisor.VersionInfo()
if err != nil {
glog.Errorf("error getting version info: %v", err)
glog.Errorf("Error getting version info: %v", err)
} else {
node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion
node.Status.NodeInfo.OsImage = verinfo.ContainerOsVersion
@@ -1852,7 +1938,22 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
}
oldNodeUnschedulable = node.Spec.Unschedulable
}
return nil
}
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly.
func (kl *Kubelet) tryUpdateNodeStatus() error {
node, err := kl.kubeClient.Nodes().Get(kl.hostname)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.hostname, err)
}
if node == nil {
return fmt.Errorf("no node instance returned for %q", kl.hostname)
}
if err := kl.setNodeStatus(node); err != nil {
return err
}
// Update the current status on the API server
_, err = kl.kubeClient.Nodes().UpdateStatus(node)
return err

View File

@@ -34,6 +34,7 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
@@ -44,6 +45,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
@@ -3239,7 +3241,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactFn = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: "testnode"}},
{ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}},
}}).ReactFn
machineInfo := &cadvisorApi.MachineInfo{
MachineID: "123",
@@ -3257,7 +3259,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: "testnode"},
ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
@@ -3284,6 +3286,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}},
},
}
@@ -3317,7 +3320,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactFn = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "testnode"},
ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
@@ -3353,7 +3356,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: "testnode"},
ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
@@ -3380,6 +3383,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}},
},
}
@@ -3419,7 +3423,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
fakeDocker.VersionInfo = []string{}
kubeClient.ReactFn = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: "testnode"}},
{ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}},
}}).ReactFn
mockCadvisor := testKubelet.fakeCadvisor
machineInfo := &cadvisorApi.MachineInfo{
@@ -3438,7 +3442,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: "testnode"},
ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
@@ -3465,6 +3469,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) {
api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}},
},
}
@@ -4402,6 +4407,62 @@ func TestFilterOutTerminatedPods(t *testing.T) {
}
}
func TestRegisterExistingNodeWithApiserver(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
kubelet.hostname = "127.0.0.1"
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactFn = func(action testclient.FakeAction) (runtime.Object, error) {
segments := strings.Split(action.Action, "-")
if len(segments) < 2 {
return nil, fmt.Errorf("unrecognized action, need two or three segments <verb>-<resource> or <verb>-<subresource>-<resource>: %s", action.Action)
}
verb := segments[0]
switch verb {
case "create":
// Return an error on create.
return &api.Node{}, &apierrors.StatusError{
ErrStatus: api.Status{Reason: api.StatusReasonAlreadyExists},
}
case "get":
// Return an existing (matching) node on get.
return &api.Node{
ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"},
Spec: api.NodeSpec{ExternalID: "127.0.0.1"},
}, nil
default:
return nil, fmt.Errorf("no reaction implemented for %s", action.Action)
}
}
machineInfo := &cadvisorApi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorApi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
done := make(chan struct{})
go func() {
kubelet.registerWithApiserver()
done <- struct{}{}
}()
select {
case <-time.After(5 * time.Second):
t.Errorf("timed out waiting for registration")
case <-done:
return
}
}
func TestMakePortMappings(t *testing.T) {
tests := []struct {
container *api.Container