Extract kubelet node status into separate file

This commit is contained in:
Paul Morie 2016-07-20 18:08:47 -04:00
parent 180e671972
commit 249da77371
5 changed files with 1594 additions and 1517 deletions

View File

@ -21,13 +21,11 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"net"
"net/http"
"os"
"path"
"path/filepath"
goRuntime "runtime"
"sort"
"strings"
"sync"
@ -37,9 +35,7 @@ import (
"github.com/golang/glog"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
utilpod "k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/apis/componentconfig"
@ -83,7 +79,6 @@ import (
"k8s.io/kubernetes/pkg/util/flowcontrol"
kubeio "k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/procfs"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
@ -93,7 +88,6 @@ import (
utilvalidation "k8s.io/kubernetes/pkg/util/validation"
"k8s.io/kubernetes/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
"k8s.io/kubernetes/pkg/watch"
@ -770,6 +764,8 @@ type Kubelet struct {
// oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up.
oneTimeInitializer sync.Once
// flannelExperimentalOverlay determines whether the experimental flannel
// network overlay is active.
flannelExperimentalOverlay bool
// TODO: Flannelhelper doesn't store any state, we can instantiate it
@ -827,15 +823,6 @@ type Kubelet struct {
enableControllerAttachDetach bool
}
// dirExists returns true if the path exists and represents a directory.
func dirExists(path string) bool {
s, err := os.Stat(path)
if err != nil {
return false
}
return s.IsDir()
}
// setupDataDirs creates:
// 1. the root directory
// 2. the pods directory
@ -975,187 +962,6 @@ func (kl *Kubelet) getActivePods() []*api.Pod {
return activePods
}
// initialNodeStatus determines the initial node status, incorporating node
// labels and information from the cloud provider.
func (kl *Kubelet) initialNodeStatus() (*api.Node, error) {
node := &api.Node{
ObjectMeta: api.ObjectMeta{
Name: kl.nodeName,
Labels: map[string]string{
unversioned.LabelHostname: kl.hostname,
unversioned.LabelOS: goRuntime.GOOS,
unversioned.LabelArch: goRuntime.GOARCH,
},
},
Spec: api.NodeSpec{
Unschedulable: !kl.registerSchedulable,
},
}
// Initially, set NodeNetworkUnavailable to true.
if kl.providerRequiresNetworkingConfiguration() {
node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{
Type: api.NodeNetworkUnavailable,
Status: api.ConditionTrue,
Reason: "NoRouteCreated",
Message: "Node created without a route",
LastTransitionTime: unversioned.NewTime(kl.clock.Now()),
})
}
if kl.enableControllerAttachDetach {
if node.Annotations == nil {
node.Annotations = make(map[string]string)
}
node.Annotations[volumehelper.ControllerManagedAttachAnnotation] = "true"
}
// @question: should this be place after the call to the cloud provider? which also applies labels
for k, v := range kl.nodeLabels {
if cv, found := node.ObjectMeta.Labels[k]; found {
glog.Warningf("the node label %s=%s will overwrite default setting %s", k, v, cv)
}
node.ObjectMeta.Labels[k] = v
}
if kl.cloud != nil {
instances, ok := kl.cloud.Instances()
if !ok {
return nil, fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
// TODO: ExternalID is deprecated, we'll have to drop this code
externalID, err := instances.ExternalID(kl.nodeName)
if err != nil {
return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err)
}
node.Spec.ExternalID = externalID
// TODO: We can't assume that the node has credentials to talk to the
// cloudprovider from arbitrary nodes. At most, we should talk to a
// local metadata server here.
node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.nodeName)
if err != nil {
return nil, err
}
instanceType, err := instances.InstanceType(kl.nodeName)
if err != nil {
return nil, err
}
if instanceType != "" {
glog.Infof("Adding node label from cloud provider: %s=%s", unversioned.LabelInstanceType, instanceType)
node.ObjectMeta.Labels[unversioned.LabelInstanceType] = instanceType
}
// If the cloud has zone information, label the node with the zone information
zones, ok := kl.cloud.Zones()
if ok {
zone, err := zones.GetZone()
if err != nil {
return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err)
}
if zone.FailureDomain != "" {
glog.Infof("Adding node label from cloud provider: %s=%s", unversioned.LabelZoneFailureDomain, zone.FailureDomain)
node.ObjectMeta.Labels[unversioned.LabelZoneFailureDomain] = zone.FailureDomain
}
if zone.Region != "" {
glog.Infof("Adding node label from cloud provider: %s=%s", unversioned.LabelZoneRegion, zone.Region)
node.ObjectMeta.Labels[unversioned.LabelZoneRegion] = zone.Region
}
}
} else {
node.Spec.ExternalID = kl.hostname
if kl.autoDetectCloudProvider {
// If no cloud provider is defined - use the one detected by cadvisor
info, err := kl.GetCachedMachineInfo()
if err == nil {
kl.updateCloudProviderFromMachineInfo(node, info)
}
}
}
if err := kl.setNodeStatus(node); err != nil {
return nil, err
}
return node, nil
}
// registerWithApiserver registers the node with the cluster master. It is safe
// to call multiple times, but not concurrently (kl.registrationCompleted is
// not locked).
func (kl *Kubelet) registerWithApiserver() {
if kl.registrationCompleted {
return
}
step := 100 * time.Millisecond
for {
time.Sleep(step)
step = step * 2
if step >= 7*time.Second {
step = 7 * time.Second
}
node, err := kl.initialNodeStatus()
if err != nil {
glog.Errorf("Unable to construct api.Node object for kubelet: %v", err)
continue
}
glog.V(2).Infof("Attempting to register node %s", node.Name)
if _, err := kl.kubeClient.Core().Nodes().Create(node); err != nil {
if !apierrors.IsAlreadyExists(err) {
glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err)
continue
}
currentNode, err := kl.kubeClient.Core().Nodes().Get(kl.nodeName)
if err != nil {
glog.Errorf("error getting node %q: %v", kl.nodeName, err)
continue
}
if currentNode == nil {
glog.Errorf("no node instance returned for %q", kl.nodeName)
continue
}
if currentNode.Spec.ExternalID == node.Spec.ExternalID {
glog.Infof("Node %s was previously registered", node.Name)
kl.registrationCompleted = true
return
}
glog.Errorf(
"Previously %q had externalID %q; now it is %q; will delete and recreate.",
kl.nodeName, node.Spec.ExternalID, currentNode.Spec.ExternalID,
)
if err := kl.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil {
glog.Errorf("Unable to delete old node: %v", err)
} else {
glog.Errorf("Deleted old node object %q", kl.nodeName)
}
continue
}
glog.Infof("Successfully registered node %s", node.Name)
kl.registrationCompleted = true
return
}
}
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master, registering the kubelet first if
// necessary.
func (kl *Kubelet) syncNodeStatus() {
if kl.kubeClient == nil {
return
}
if kl.registerNode {
// This will exit immediately if it doesn't need to do anything.
kl.registerWithApiserver()
}
if err := kl.updateNodeStatus(); err != nil {
glog.Errorf("Unable to update node status: %v", err)
}
}
// relabelVolumes relabels SELinux volumes to match the pod's
// SELinuxOptions specification. This is only needed if the pod uses
// hostPID or hostIPC. Otherwise relabeling is delegated to docker.
@ -1678,10 +1484,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
//
// Arguments:
//
// pod - the pod to sync
// mirrorPod - the mirror pod for the pod to sync, if it is a static pod
// podStatus - the current status (TODO: always from the status manager?)
// updateType - the type of update (ADD, UPDATE, REMOVE, RECONCILE, DELETE)
// o - the SyncPodOptions for this invocation
//
// The workflow is:
// * If the pod is being created, record pod worker start latency
@ -1855,8 +1658,11 @@ func podUsesHostNetwork(pod *api.Pod) bool {
return pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork
}
// getPullSecretsForPod inspects the Pod and retrieves the referenced pull secrets
// TODO duplicate secrets are being retrieved multiple times and there is no cache. Creating and using a secret manager interface will make this easier to address.
// getPullSecretsForPod inspects the Pod and retrieves the referenced pull
// secrets.
// TODO: duplicate secrets are being retrieved multiple times and there
// is no cache. Creating and using a secret manager interface will make this
// easier to address.
func (kl *Kubelet) getPullSecretsForPod(pod *api.Pod) ([]api.Secret, error) {
pullSecrets := []api.Secret{}
@ -2635,94 +2441,6 @@ func (kl *Kubelet) updateRuntimeUp() {
kl.runtimeState.setRuntimeSync(kl.clock.Now())
}
// updateNodeStatus updates node status to master with retries.
func (kl *Kubelet) updateNodeStatus() error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
if err := kl.tryUpdateNodeStatus(); err != nil {
glog.Errorf("Error updating node status, will retry: %v", err)
} else {
return nil
}
}
return fmt.Errorf("update node status exceeds retry count")
}
// recordNodeStatusEvent records an event of the given type with the given
// message for the node.
func (kl *Kubelet) recordNodeStatusEvent(eventtype, event string) {
glog.V(2).Infof("Recording %s event message for node %s", event, kl.nodeName)
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
kl.recorder.Eventf(kl.nodeRef, eventtype, event, "Node %s status is now: %s", kl.nodeName, event)
}
// Set addresses for the node.
func (kl *Kubelet) setNodeAddress(node *api.Node) error {
// Set addresses for the node.
if kl.cloud != nil {
instances, ok := kl.cloud.Instances()
if !ok {
return fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface
nodeAddresses, err := instances.NodeAddresses(kl.nodeName)
if err != nil {
return fmt.Errorf("failed to get node address from cloud provider: %v", err)
}
node.Status.Addresses = nodeAddresses
} else {
if kl.nodeIP != nil {
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: kl.nodeIP.String()},
{Type: api.NodeInternalIP, Address: kl.nodeIP.String()},
}
} else if addr := net.ParseIP(kl.hostname); addr != nil {
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: addr.String()},
{Type: api.NodeInternalIP, Address: addr.String()},
}
} else {
addrs, err := net.LookupIP(node.Name)
if err != nil {
return fmt.Errorf("can't get ip address of node %s: %v", node.Name, err)
} else if len(addrs) == 0 {
return fmt.Errorf("no ip address for node %v", node.Name)
} else {
// check all ip addresses for this node.Name and try to find the first non-loopback IPv4 address.
// If no match is found, it uses the IP of the interface with gateway on it.
for _, ip := range addrs {
if ip.IsLoopback() {
continue
}
if ip.To4() != nil {
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: ip.String()},
{Type: api.NodeInternalIP, Address: ip.String()},
}
break
}
}
if len(node.Status.Addresses) == 0 {
ip, err := utilnet.ChooseHostInterface()
if err != nil {
return err
}
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: ip.String()},
{Type: api.NodeInternalIP, Address: ip.String()},
}
}
}
}
}
return nil
}
func (kl *Kubelet) updateCloudProviderFromMachineInfo(node *api.Node, info *cadvisorapi.MachineInfo) {
if info.CloudProvider != cadvisorapi.UnknownProvider &&
info.CloudProvider != cadvisorapi.Baremetal {
@ -2735,118 +2453,6 @@ func (kl *Kubelet) updateCloudProviderFromMachineInfo(node *api.Node, info *cadv
}
}
func (kl *Kubelet) setNodeStatusMachineInfo(node *api.Node) {
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := kl.GetCachedMachineInfo()
if err != nil {
// TODO(roberthbailey): This is required for test-cmd.sh to pass.
// See if the test should be updated instead.
node.Status.Capacity = api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI),
api.ResourceMemory: resource.MustParse("0Gi"),
api.ResourcePods: *resource.NewQuantity(int64(kl.maxPods), resource.DecimalSI),
api.ResourceNvidiaGPU: *resource.NewQuantity(int64(kl.nvidiaGPUs), resource.DecimalSI),
}
glog.Errorf("Error getting machine info: %v", err)
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
node.Status.Capacity = cadvisor.CapacityFromMachineInfo(info)
if kl.podsPerCore > 0 {
node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity(
int64(math.Min(float64(info.NumCores*kl.podsPerCore), float64(kl.maxPods))), resource.DecimalSI)
} else {
node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity(
int64(kl.maxPods), resource.DecimalSI)
}
node.Status.Capacity[api.ResourceNvidiaGPU] = *resource.NewQuantity(
int64(kl.nvidiaGPUs), resource.DecimalSI)
if node.Status.NodeInfo.BootID != "" &&
node.Status.NodeInfo.BootID != info.BootID {
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, events.NodeRebooted,
"Node %s has been rebooted, boot id: %s", kl.nodeName, info.BootID)
}
node.Status.NodeInfo.BootID = info.BootID
}
// Set Allocatable.
node.Status.Allocatable = make(api.ResourceList)
for k, v := range node.Status.Capacity {
value := *(v.Copy())
if kl.reservation.System != nil {
value.Sub(kl.reservation.System[k])
}
if kl.reservation.Kubernetes != nil {
value.Sub(kl.reservation.Kubernetes[k])
}
if value.Sign() < 0 {
// Negative Allocatable resources don't make sense.
value.Set(0)
}
node.Status.Allocatable[k] = value
}
}
// Set versioninfo for the node.
func (kl *Kubelet) setNodeStatusVersionInfo(node *api.Node) {
verinfo, err := kl.cadvisor.VersionInfo()
if err != nil {
glog.Errorf("Error getting version info: %v", err)
} else {
node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion
node.Status.NodeInfo.OSImage = verinfo.ContainerOsVersion
runtimeVersion := "Unknown"
if runtimeVer, err := kl.containerRuntime.Version(); err == nil {
runtimeVersion = runtimeVer.String()
}
node.Status.NodeInfo.ContainerRuntimeVersion = fmt.Sprintf("%s://%s", kl.containerRuntime.Type(), runtimeVersion)
node.Status.NodeInfo.KubeletVersion = version.Get().String()
// TODO: kube-proxy might be different version from kubelet in the future
node.Status.NodeInfo.KubeProxyVersion = version.Get().String()
}
}
// Set daemonEndpoints for the node.
func (kl *Kubelet) setNodeStatusDaemonEndpoints(node *api.Node) {
node.Status.DaemonEndpoints = *kl.daemonEndpoints
}
// Set images list for the node
func (kl *Kubelet) setNodeStatusImages(node *api.Node) {
// Update image list of this node
var imagesOnNode []api.ContainerImage
containerImages, err := kl.imageManager.GetImageList()
if err != nil {
glog.Errorf("Error getting image list: %v", err)
} else {
// sort the images from max to min, and only set top N images into the node status.
sort.Sort(byImageSize(containerImages))
if maxImagesInNodeStatus < len(containerImages) {
containerImages = containerImages[0:maxImagesInNodeStatus]
}
for _, image := range containerImages {
imagesOnNode = append(imagesOnNode, api.ContainerImage{
Names: append(image.RepoTags, image.RepoDigests...),
SizeBytes: image.Size,
})
}
}
node.Status.Images = imagesOnNode
}
// Set the GOOS and GOARCH for this node
func (kl *Kubelet) setNodeStatusGoRuntime(node *api.Node) {
node.Status.NodeInfo.OperatingSystem = goRuntime.GOOS
node.Status.NodeInfo.Architecture = goRuntime.GOARCH
}
type byImageSize []kubecontainer.Image
// Sort from max to min
@ -2856,298 +2462,6 @@ func (a byImageSize) Less(i, j int) bool {
func (a byImageSize) Len() int { return len(a) }
func (a byImageSize) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// Set status for the node.
func (kl *Kubelet) setNodeStatusInfo(node *api.Node) {
kl.setNodeStatusMachineInfo(node)
kl.setNodeStatusVersionInfo(node)
kl.setNodeStatusDaemonEndpoints(node)
kl.setNodeStatusImages(node)
kl.setNodeStatusGoRuntime(node)
}
// Set Readycondition for the node.
func (kl *Kubelet) setNodeReadyCondition(node *api.Node) {
// NOTE(aaronlevy): NodeReady condition needs to be the last in the list of node conditions.
// This is due to an issue with version skewed kubelet and master components.
// ref: https://github.com/kubernetes/kubernetes/issues/16961
currentTime := unversioned.NewTime(kl.clock.Now())
var newNodeReadyCondition api.NodeCondition
if rs := kl.runtimeState.errors(); len(rs) == 0 {
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: "kubelet is posting ready status",
LastHeartbeatTime: currentTime,
}
} else {
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionFalse,
Reason: "KubeletNotReady",
Message: strings.Join(rs, ","),
LastHeartbeatTime: currentTime,
}
}
// Record any soft requirements that were not met in the container manager.
status := kl.containerManager.Status()
if status.SoftRequirements != nil {
newNodeReadyCondition.Message = fmt.Sprintf("%s. WARNING: %s", newNodeReadyCondition.Message, status.SoftRequirements.Error())
}
readyConditionUpdated := false
needToRecordEvent := false
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == api.NodeReady {
if node.Status.Conditions[i].Status == newNodeReadyCondition.Status {
newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime
} else {
newNodeReadyCondition.LastTransitionTime = currentTime
needToRecordEvent = true
}
node.Status.Conditions[i] = newNodeReadyCondition
readyConditionUpdated = true
break
}
}
if !readyConditionUpdated {
newNodeReadyCondition.LastTransitionTime = currentTime
node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition)
}
if needToRecordEvent {
if newNodeReadyCondition.Status == api.ConditionTrue {
kl.recordNodeStatusEvent(api.EventTypeNormal, events.NodeReady)
} else {
kl.recordNodeStatusEvent(api.EventTypeNormal, events.NodeNotReady)
}
}
}
// setNodeMemoryPressureCondition for the node.
// TODO: this needs to move somewhere centralized...
func (kl *Kubelet) setNodeMemoryPressureCondition(node *api.Node) {
currentTime := unversioned.NewTime(kl.clock.Now())
var condition *api.NodeCondition
// Check if NodeMemoryPressure condition already exists and if it does, just pick it up for update.
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == api.NodeMemoryPressure {
condition = &node.Status.Conditions[i]
}
}
newCondition := false
// If the NodeMemoryPressure condition doesn't exist, create one
if condition == nil {
condition = &api.NodeCondition{
Type: api.NodeMemoryPressure,
Status: api.ConditionUnknown,
}
// cannot be appended to node.Status.Conditions here because it gets
// copied to the slice. So if we append to the slice here none of the
// updates we make below are reflected in the slice.
newCondition = true
}
// Update the heartbeat time
condition.LastHeartbeatTime = currentTime
// Note: The conditions below take care of the case when a new NodeMemoryPressure condition is
// created and as well as the case when the condition already exists. When a new condition
// is created its status is set to api.ConditionUnknown which matches either
// condition.Status != api.ConditionTrue or
// condition.Status != api.ConditionFalse in the conditions below depending on whether
// the kubelet is under memory pressure or not.
if kl.evictionManager.IsUnderMemoryPressure() {
if condition.Status != api.ConditionTrue {
condition.Status = api.ConditionTrue
condition.Reason = "KubeletHasInsufficientMemory"
condition.Message = "kubelet has insufficient memory available"
condition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasInsufficientMemory")
}
} else {
if condition.Status != api.ConditionFalse {
condition.Status = api.ConditionFalse
condition.Reason = "KubeletHasSufficientMemory"
condition.Message = "kubelet has sufficient memory available"
condition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasSufficientMemory")
}
}
if newCondition {
node.Status.Conditions = append(node.Status.Conditions, *condition)
}
}
// Set OODcondition for the node.
func (kl *Kubelet) setNodeOODCondition(node *api.Node) {
currentTime := unversioned.NewTime(kl.clock.Now())
var nodeOODCondition *api.NodeCondition
// Check if NodeOutOfDisk condition already exists and if it does, just pick it up for update.
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == api.NodeOutOfDisk {
nodeOODCondition = &node.Status.Conditions[i]
}
}
newOODCondition := false
// If the NodeOutOfDisk condition doesn't exist, create one.
if nodeOODCondition == nil {
nodeOODCondition = &api.NodeCondition{
Type: api.NodeOutOfDisk,
Status: api.ConditionUnknown,
}
// nodeOODCondition cannot be appended to node.Status.Conditions here because it gets
// copied to the slice. So if we append nodeOODCondition to the slice here none of the
// updates we make to nodeOODCondition below are reflected in the slice.
newOODCondition = true
}
// Update the heartbeat time irrespective of all the conditions.
nodeOODCondition.LastHeartbeatTime = currentTime
// Note: The conditions below take care of the case when a new NodeOutOfDisk condition is
// created and as well as the case when the condition already exists. When a new condition
// is created its status is set to api.ConditionUnknown which matches either
// nodeOODCondition.Status != api.ConditionTrue or
// nodeOODCondition.Status != api.ConditionFalse in the conditions below depending on whether
// the kubelet is out of disk or not.
if kl.isOutOfDisk() {
if nodeOODCondition.Status != api.ConditionTrue {
nodeOODCondition.Status = api.ConditionTrue
nodeOODCondition.Reason = "KubeletOutOfDisk"
nodeOODCondition.Message = "out of disk space"
nodeOODCondition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeOutOfDisk")
}
} else {
if nodeOODCondition.Status != api.ConditionFalse {
// Update the out of disk condition when the condition status is unknown even if we
// are within the outOfDiskTransitionFrequency duration. We do this to set the
// condition status correctly at kubelet startup.
if nodeOODCondition.Status == api.ConditionUnknown || kl.clock.Since(nodeOODCondition.LastTransitionTime.Time) >= kl.outOfDiskTransitionFrequency {
nodeOODCondition.Status = api.ConditionFalse
nodeOODCondition.Reason = "KubeletHasSufficientDisk"
nodeOODCondition.Message = "kubelet has sufficient disk space available"
nodeOODCondition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasSufficientDisk")
} else {
glog.Infof("Node condition status for OutOfDisk is false, but last transition time is less than %s", kl.outOfDiskTransitionFrequency)
}
}
}
if newOODCondition {
node.Status.Conditions = append(node.Status.Conditions, *nodeOODCondition)
}
}
// Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus()
var oldNodeUnschedulable bool
// record if node schedulable change.
func (kl *Kubelet) recordNodeSchedulableEvent(node *api.Node) {
if oldNodeUnschedulable != node.Spec.Unschedulable {
if node.Spec.Unschedulable {
kl.recordNodeStatusEvent(api.EventTypeNormal, events.NodeNotSchedulable)
} else {
kl.recordNodeStatusEvent(api.EventTypeNormal, events.NodeSchedulable)
}
oldNodeUnschedulable = node.Spec.Unschedulable
}
}
// Update VolumesInUse field in Node Status
func (kl *Kubelet) setNodeVolumesInUseStatus(node *api.Node) {
node.Status.VolumesInUse = kl.volumeManager.GetVolumesInUse()
}
// setNodeStatus fills in the Status fields of the given Node, overwriting
// any fields that are currently set.
// TODO(madhusudancs): Simplify the logic for setting node conditions and
// refactor the node status condtion code out to a different file.
func (kl *Kubelet) setNodeStatus(node *api.Node) error {
for _, f := range kl.setNodeStatusFuncs {
if err := f(node); err != nil {
return err
}
}
return nil
}
// defaultNodeStatusFuncs is a factory that generates the default set of setNodeStatus funcs
func (kl *Kubelet) defaultNodeStatusFuncs() []func(*api.Node) error {
// initial set of node status update handlers, can be modified by Option's
withoutError := func(f func(*api.Node)) func(*api.Node) error {
return func(n *api.Node) error {
f(n)
return nil
}
}
return []func(*api.Node) error{
kl.setNodeAddress,
withoutError(kl.setNodeStatusInfo),
withoutError(kl.setNodeOODCondition),
withoutError(kl.setNodeMemoryPressureCondition),
withoutError(kl.setNodeReadyCondition),
withoutError(kl.setNodeVolumesInUseStatus),
withoutError(kl.recordNodeSchedulableEvent),
}
}
// SetNodeStatus returns a functional Option that adds the given node status update handler to the Kubelet
func SetNodeStatus(f func(*api.Node) error) Option {
return func(k *Kubelet) {
k.setNodeStatusFuncs = append(k.setNodeStatusFuncs, f)
}
}
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly.
func (kl *Kubelet) tryUpdateNodeStatus() error {
node, err := kl.kubeClient.Core().Nodes().Get(kl.nodeName)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
}
if node == nil {
return fmt.Errorf("no node instance returned for %q", kl.nodeName)
}
// Flannel is the authoritative source of pod CIDR, if it's running.
// This is a short term compromise till we get flannel working in
// reservation mode.
if kl.flannelExperimentalOverlay {
flannelPodCIDR := kl.runtimeState.podCIDR()
if node.Spec.PodCIDR != flannelPodCIDR {
node.Spec.PodCIDR = flannelPodCIDR
glog.Infof("Updating podcidr to %v", node.Spec.PodCIDR)
if updatedNode, err := kl.kubeClient.Core().Nodes().Update(node); err != nil {
glog.Warningf("Failed to update podCIDR: %v", err)
} else {
// Update the node resourceVersion so the status update doesn't fail.
node = updatedNode
}
}
} else if kl.reconcileCIDR {
kl.updatePodCIDR(node.Spec.PodCIDR)
}
if err := kl.setNodeStatus(node); err != nil {
return err
}
// Update the current status on the API server
updatedNode, err := kl.kubeClient.Core().Nodes().UpdateStatus(node)
if err == nil {
kl.volumeManager.MarkVolumesAsReportedInUse(
updatedNode.Status.VolumesInUse)
}
return err
}
// GetPhase returns the phase of a pod given its container info.
// This func is exported to simplify integration with 3rd party kubelet
// integrations like kubernetes-mesos.

View File

@ -0,0 +1,715 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
"math"
"net"
goRuntime "runtime"
"sort"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/events"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
// registerWithApiserver registers the node with the cluster master. It is safe
// to call multiple times, but not concurrently (kl.registrationCompleted is
// not locked).
func (kl *Kubelet) registerWithApiserver() {
if kl.registrationCompleted {
return
}
step := 100 * time.Millisecond
for {
time.Sleep(step)
step = step * 2
if step >= 7*time.Second {
step = 7 * time.Second
}
node, err := kl.initialNodeStatus()
if err != nil {
glog.Errorf("Unable to construct api.Node object for kubelet: %v", err)
continue
}
glog.V(2).Infof("Attempting to register node %s", node.Name)
if _, err := kl.kubeClient.Core().Nodes().Create(node); err != nil {
if !apierrors.IsAlreadyExists(err) {
glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err)
continue
}
currentNode, err := kl.kubeClient.Core().Nodes().Get(kl.nodeName)
if err != nil {
glog.Errorf("error getting node %q: %v", kl.nodeName, err)
continue
}
if currentNode == nil {
glog.Errorf("no node instance returned for %q", kl.nodeName)
continue
}
if currentNode.Spec.ExternalID == node.Spec.ExternalID {
glog.Infof("Node %s was previously registered", node.Name)
kl.registrationCompleted = true
return
}
glog.Errorf(
"Previously %q had externalID %q; now it is %q; will delete and recreate.",
kl.nodeName, node.Spec.ExternalID, currentNode.Spec.ExternalID,
)
if err := kl.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil {
glog.Errorf("Unable to delete old node: %v", err)
} else {
glog.Errorf("Deleted old node object %q", kl.nodeName)
}
continue
}
glog.Infof("Successfully registered node %s", node.Name)
kl.registrationCompleted = true
return
}
}
// initialNodeStatus determines the initial node status, incorporating node
// labels and information from the cloud provider.
func (kl *Kubelet) initialNodeStatus() (*api.Node, error) {
node := &api.Node{
ObjectMeta: api.ObjectMeta{
Name: kl.nodeName,
Labels: map[string]string{
unversioned.LabelHostname: kl.hostname,
unversioned.LabelOS: goRuntime.GOOS,
unversioned.LabelArch: goRuntime.GOARCH,
},
},
Spec: api.NodeSpec{
Unschedulable: !kl.registerSchedulable,
},
}
// Initially, set NodeNetworkUnavailable to true.
if kl.providerRequiresNetworkingConfiguration() {
node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{
Type: api.NodeNetworkUnavailable,
Status: api.ConditionTrue,
Reason: "NoRouteCreated",
Message: "Node created without a route",
LastTransitionTime: unversioned.NewTime(kl.clock.Now()),
})
}
if kl.enableControllerAttachDetach {
if node.Annotations == nil {
node.Annotations = make(map[string]string)
}
node.Annotations[volumehelper.ControllerManagedAttachAnnotation] = "true"
}
// @question: should this be place after the call to the cloud provider? which also applies labels
for k, v := range kl.nodeLabels {
if cv, found := node.ObjectMeta.Labels[k]; found {
glog.Warningf("the node label %s=%s will overwrite default setting %s", k, v, cv)
}
node.ObjectMeta.Labels[k] = v
}
if kl.cloud != nil {
instances, ok := kl.cloud.Instances()
if !ok {
return nil, fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
// TODO: ExternalID is deprecated, we'll have to drop this code
externalID, err := instances.ExternalID(kl.nodeName)
if err != nil {
return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err)
}
node.Spec.ExternalID = externalID
// TODO: We can't assume that the node has credentials to talk to the
// cloudprovider from arbitrary nodes. At most, we should talk to a
// local metadata server here.
node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.nodeName)
if err != nil {
return nil, err
}
instanceType, err := instances.InstanceType(kl.nodeName)
if err != nil {
return nil, err
}
if instanceType != "" {
glog.Infof("Adding node label from cloud provider: %s=%s", unversioned.LabelInstanceType, instanceType)
node.ObjectMeta.Labels[unversioned.LabelInstanceType] = instanceType
}
// If the cloud has zone information, label the node with the zone information
zones, ok := kl.cloud.Zones()
if ok {
zone, err := zones.GetZone()
if err != nil {
return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err)
}
if zone.FailureDomain != "" {
glog.Infof("Adding node label from cloud provider: %s=%s", unversioned.LabelZoneFailureDomain, zone.FailureDomain)
node.ObjectMeta.Labels[unversioned.LabelZoneFailureDomain] = zone.FailureDomain
}
if zone.Region != "" {
glog.Infof("Adding node label from cloud provider: %s=%s", unversioned.LabelZoneRegion, zone.Region)
node.ObjectMeta.Labels[unversioned.LabelZoneRegion] = zone.Region
}
}
} else {
node.Spec.ExternalID = kl.hostname
if kl.autoDetectCloudProvider {
// If no cloud provider is defined - use the one detected by cadvisor
info, err := kl.GetCachedMachineInfo()
if err == nil {
kl.updateCloudProviderFromMachineInfo(node, info)
}
}
}
if err := kl.setNodeStatus(node); err != nil {
return nil, err
}
return node, nil
}
// syncNodeStatus should be called periodically from a goroutine.
// It synchronizes node status to master, registering the kubelet first if
// necessary.
func (kl *Kubelet) syncNodeStatus() {
if kl.kubeClient == nil {
return
}
if kl.registerNode {
// This will exit immediately if it doesn't need to do anything.
kl.registerWithApiserver()
}
if err := kl.updateNodeStatus(); err != nil {
glog.Errorf("Unable to update node status: %v", err)
}
}
// updateNodeStatus updates node status to master with retries.
func (kl *Kubelet) updateNodeStatus() error {
for i := 0; i < nodeStatusUpdateRetry; i++ {
if err := kl.tryUpdateNodeStatus(); err != nil {
glog.Errorf("Error updating node status, will retry: %v", err)
} else {
return nil
}
}
return fmt.Errorf("update node status exceeds retry count")
}
// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0
// is set, this function will also confirm that cbr0 is configured correctly.
func (kl *Kubelet) tryUpdateNodeStatus() error {
node, err := kl.kubeClient.Core().Nodes().Get(kl.nodeName)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
}
if node == nil {
return fmt.Errorf("no node instance returned for %q", kl.nodeName)
}
// Flannel is the authoritative source of pod CIDR, if it's running.
// This is a short term compromise till we get flannel working in
// reservation mode.
if kl.flannelExperimentalOverlay {
flannelPodCIDR := kl.runtimeState.podCIDR()
if node.Spec.PodCIDR != flannelPodCIDR {
node.Spec.PodCIDR = flannelPodCIDR
glog.Infof("Updating podcidr to %v", node.Spec.PodCIDR)
if updatedNode, err := kl.kubeClient.Core().Nodes().Update(node); err != nil {
glog.Warningf("Failed to update podCIDR: %v", err)
} else {
// Update the node resourceVersion so the status update doesn't fail.
node = updatedNode
}
}
} else if kl.reconcileCIDR {
kl.updatePodCIDR(node.Spec.PodCIDR)
}
if err := kl.setNodeStatus(node); err != nil {
return err
}
// Update the current status on the API server
updatedNode, err := kl.kubeClient.Core().Nodes().UpdateStatus(node)
if err == nil {
kl.volumeManager.MarkVolumesAsReportedInUse(
updatedNode.Status.VolumesInUse)
}
return err
}
// recordNodeStatusEvent records an event of the given type with the given
// message for the node.
func (kl *Kubelet) recordNodeStatusEvent(eventtype, event string) {
glog.V(2).Infof("Recording %s event message for node %s", event, kl.nodeName)
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
kl.recorder.Eventf(kl.nodeRef, eventtype, event, "Node %s status is now: %s", kl.nodeName, event)
}
// Set addresses for the node.
func (kl *Kubelet) setNodeAddress(node *api.Node) error {
// Set addresses for the node.
if kl.cloud != nil {
instances, ok := kl.cloud.Instances()
if !ok {
return fmt.Errorf("failed to get instances from cloud provider")
}
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface
nodeAddresses, err := instances.NodeAddresses(kl.nodeName)
if err != nil {
return fmt.Errorf("failed to get node address from cloud provider: %v", err)
}
node.Status.Addresses = nodeAddresses
} else {
if kl.nodeIP != nil {
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: kl.nodeIP.String()},
{Type: api.NodeInternalIP, Address: kl.nodeIP.String()},
}
} else if addr := net.ParseIP(kl.hostname); addr != nil {
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: addr.String()},
{Type: api.NodeInternalIP, Address: addr.String()},
}
} else {
addrs, err := net.LookupIP(node.Name)
if err != nil {
return fmt.Errorf("can't get ip address of node %s: %v", node.Name, err)
} else if len(addrs) == 0 {
return fmt.Errorf("no ip address for node %v", node.Name)
} else {
// check all ip addresses for this node.Name and try to find the first non-loopback IPv4 address.
// If no match is found, it uses the IP of the interface with gateway on it.
for _, ip := range addrs {
if ip.IsLoopback() {
continue
}
if ip.To4() != nil {
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: ip.String()},
{Type: api.NodeInternalIP, Address: ip.String()},
}
break
}
}
if len(node.Status.Addresses) == 0 {
ip, err := utilnet.ChooseHostInterface()
if err != nil {
return err
}
node.Status.Addresses = []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: ip.String()},
{Type: api.NodeInternalIP, Address: ip.String()},
}
}
}
}
}
return nil
}
func (kl *Kubelet) setNodeStatusMachineInfo(node *api.Node) {
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := kl.GetCachedMachineInfo()
if err != nil {
// TODO(roberthbailey): This is required for test-cmd.sh to pass.
// See if the test should be updated instead.
node.Status.Capacity = api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI),
api.ResourceMemory: resource.MustParse("0Gi"),
api.ResourcePods: *resource.NewQuantity(int64(kl.maxPods), resource.DecimalSI),
api.ResourceNvidiaGPU: *resource.NewQuantity(int64(kl.nvidiaGPUs), resource.DecimalSI),
}
glog.Errorf("Error getting machine info: %v", err)
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
node.Status.Capacity = cadvisor.CapacityFromMachineInfo(info)
if kl.podsPerCore > 0 {
node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity(
int64(math.Min(float64(info.NumCores*kl.podsPerCore), float64(kl.maxPods))), resource.DecimalSI)
} else {
node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity(
int64(kl.maxPods), resource.DecimalSI)
}
node.Status.Capacity[api.ResourceNvidiaGPU] = *resource.NewQuantity(
int64(kl.nvidiaGPUs), resource.DecimalSI)
if node.Status.NodeInfo.BootID != "" &&
node.Status.NodeInfo.BootID != info.BootID {
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, events.NodeRebooted,
"Node %s has been rebooted, boot id: %s", kl.nodeName, info.BootID)
}
node.Status.NodeInfo.BootID = info.BootID
}
// Set Allocatable.
node.Status.Allocatable = make(api.ResourceList)
for k, v := range node.Status.Capacity {
value := *(v.Copy())
if kl.reservation.System != nil {
value.Sub(kl.reservation.System[k])
}
if kl.reservation.Kubernetes != nil {
value.Sub(kl.reservation.Kubernetes[k])
}
if value.Sign() < 0 {
// Negative Allocatable resources don't make sense.
value.Set(0)
}
node.Status.Allocatable[k] = value
}
}
// Set versioninfo for the node.
func (kl *Kubelet) setNodeStatusVersionInfo(node *api.Node) {
verinfo, err := kl.cadvisor.VersionInfo()
if err != nil {
glog.Errorf("Error getting version info: %v", err)
} else {
node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion
node.Status.NodeInfo.OSImage = verinfo.ContainerOsVersion
runtimeVersion := "Unknown"
if runtimeVer, err := kl.containerRuntime.Version(); err == nil {
runtimeVersion = runtimeVer.String()
}
node.Status.NodeInfo.ContainerRuntimeVersion = fmt.Sprintf("%s://%s", kl.containerRuntime.Type(), runtimeVersion)
node.Status.NodeInfo.KubeletVersion = version.Get().String()
// TODO: kube-proxy might be different version from kubelet in the future
node.Status.NodeInfo.KubeProxyVersion = version.Get().String()
}
}
// Set daemonEndpoints for the node.
func (kl *Kubelet) setNodeStatusDaemonEndpoints(node *api.Node) {
node.Status.DaemonEndpoints = *kl.daemonEndpoints
}
// Set images list for the node
func (kl *Kubelet) setNodeStatusImages(node *api.Node) {
// Update image list of this node
var imagesOnNode []api.ContainerImage
containerImages, err := kl.imageManager.GetImageList()
if err != nil {
glog.Errorf("Error getting image list: %v", err)
} else {
// sort the images from max to min, and only set top N images into the node status.
sort.Sort(byImageSize(containerImages))
if maxImagesInNodeStatus < len(containerImages) {
containerImages = containerImages[0:maxImagesInNodeStatus]
}
for _, image := range containerImages {
imagesOnNode = append(imagesOnNode, api.ContainerImage{
Names: append(image.RepoTags, image.RepoDigests...),
SizeBytes: image.Size,
})
}
}
node.Status.Images = imagesOnNode
}
// Set the GOOS and GOARCH for this node
func (kl *Kubelet) setNodeStatusGoRuntime(node *api.Node) {
node.Status.NodeInfo.OperatingSystem = goRuntime.GOOS
node.Status.NodeInfo.Architecture = goRuntime.GOARCH
}
// Set status for the node.
func (kl *Kubelet) setNodeStatusInfo(node *api.Node) {
kl.setNodeStatusMachineInfo(node)
kl.setNodeStatusVersionInfo(node)
kl.setNodeStatusDaemonEndpoints(node)
kl.setNodeStatusImages(node)
kl.setNodeStatusGoRuntime(node)
}
// Set Ready condition for the node.
func (kl *Kubelet) setNodeReadyCondition(node *api.Node) {
// NOTE(aaronlevy): NodeReady condition needs to be the last in the list of node conditions.
// This is due to an issue with version skewed kubelet and master components.
// ref: https://github.com/kubernetes/kubernetes/issues/16961
currentTime := unversioned.NewTime(kl.clock.Now())
var newNodeReadyCondition api.NodeCondition
if rs := kl.runtimeState.errors(); len(rs) == 0 {
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: "kubelet is posting ready status",
LastHeartbeatTime: currentTime,
}
} else {
newNodeReadyCondition = api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionFalse,
Reason: "KubeletNotReady",
Message: strings.Join(rs, ","),
LastHeartbeatTime: currentTime,
}
}
// Record any soft requirements that were not met in the container manager.
status := kl.containerManager.Status()
if status.SoftRequirements != nil {
newNodeReadyCondition.Message = fmt.Sprintf("%s. WARNING: %s", newNodeReadyCondition.Message, status.SoftRequirements.Error())
}
readyConditionUpdated := false
needToRecordEvent := false
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == api.NodeReady {
if node.Status.Conditions[i].Status == newNodeReadyCondition.Status {
newNodeReadyCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime
} else {
newNodeReadyCondition.LastTransitionTime = currentTime
needToRecordEvent = true
}
node.Status.Conditions[i] = newNodeReadyCondition
readyConditionUpdated = true
break
}
}
if !readyConditionUpdated {
newNodeReadyCondition.LastTransitionTime = currentTime
node.Status.Conditions = append(node.Status.Conditions, newNodeReadyCondition)
}
if needToRecordEvent {
if newNodeReadyCondition.Status == api.ConditionTrue {
kl.recordNodeStatusEvent(api.EventTypeNormal, events.NodeReady)
} else {
kl.recordNodeStatusEvent(api.EventTypeNormal, events.NodeNotReady)
}
}
}
// setNodeMemoryPressureCondition for the node.
// TODO: this needs to move somewhere centralized...
func (kl *Kubelet) setNodeMemoryPressureCondition(node *api.Node) {
currentTime := unversioned.NewTime(kl.clock.Now())
var condition *api.NodeCondition
// Check if NodeMemoryPressure condition already exists and if it does, just pick it up for update.
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == api.NodeMemoryPressure {
condition = &node.Status.Conditions[i]
}
}
newCondition := false
// If the NodeMemoryPressure condition doesn't exist, create one
if condition == nil {
condition = &api.NodeCondition{
Type: api.NodeMemoryPressure,
Status: api.ConditionUnknown,
}
// cannot be appended to node.Status.Conditions here because it gets
// copied to the slice. So if we append to the slice here none of the
// updates we make below are reflected in the slice.
newCondition = true
}
// Update the heartbeat time
condition.LastHeartbeatTime = currentTime
// Note: The conditions below take care of the case when a new NodeMemoryPressure condition is
// created and as well as the case when the condition already exists. When a new condition
// is created its status is set to api.ConditionUnknown which matches either
// condition.Status != api.ConditionTrue or
// condition.Status != api.ConditionFalse in the conditions below depending on whether
// the kubelet is under memory pressure or not.
if kl.evictionManager.IsUnderMemoryPressure() {
if condition.Status != api.ConditionTrue {
condition.Status = api.ConditionTrue
condition.Reason = "KubeletHasInsufficientMemory"
condition.Message = "kubelet has insufficient memory available"
condition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasInsufficientMemory")
}
} else {
if condition.Status != api.ConditionFalse {
condition.Status = api.ConditionFalse
condition.Reason = "KubeletHasSufficientMemory"
condition.Message = "kubelet has sufficient memory available"
condition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasSufficientMemory")
}
}
if newCondition {
node.Status.Conditions = append(node.Status.Conditions, *condition)
}
}
// Set OODcondition for the node.
func (kl *Kubelet) setNodeOODCondition(node *api.Node) {
currentTime := unversioned.NewTime(kl.clock.Now())
var nodeOODCondition *api.NodeCondition
// Check if NodeOutOfDisk condition already exists and if it does, just pick it up for update.
for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == api.NodeOutOfDisk {
nodeOODCondition = &node.Status.Conditions[i]
}
}
newOODCondition := false
// If the NodeOutOfDisk condition doesn't exist, create one.
if nodeOODCondition == nil {
nodeOODCondition = &api.NodeCondition{
Type: api.NodeOutOfDisk,
Status: api.ConditionUnknown,
}
// nodeOODCondition cannot be appended to node.Status.Conditions here because it gets
// copied to the slice. So if we append nodeOODCondition to the slice here none of the
// updates we make to nodeOODCondition below are reflected in the slice.
newOODCondition = true
}
// Update the heartbeat time irrespective of all the conditions.
nodeOODCondition.LastHeartbeatTime = currentTime
// Note: The conditions below take care of the case when a new NodeOutOfDisk condition is
// created and as well as the case when the condition already exists. When a new condition
// is created its status is set to api.ConditionUnknown which matches either
// nodeOODCondition.Status != api.ConditionTrue or
// nodeOODCondition.Status != api.ConditionFalse in the conditions below depending on whether
// the kubelet is out of disk or not.
if kl.isOutOfDisk() {
if nodeOODCondition.Status != api.ConditionTrue {
nodeOODCondition.Status = api.ConditionTrue
nodeOODCondition.Reason = "KubeletOutOfDisk"
nodeOODCondition.Message = "out of disk space"
nodeOODCondition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeOutOfDisk")
}
} else {
if nodeOODCondition.Status != api.ConditionFalse {
// Update the out of disk condition when the condition status is unknown even if we
// are within the outOfDiskTransitionFrequency duration. We do this to set the
// condition status correctly at kubelet startup.
if nodeOODCondition.Status == api.ConditionUnknown || kl.clock.Since(nodeOODCondition.LastTransitionTime.Time) >= kl.outOfDiskTransitionFrequency {
nodeOODCondition.Status = api.ConditionFalse
nodeOODCondition.Reason = "KubeletHasSufficientDisk"
nodeOODCondition.Message = "kubelet has sufficient disk space available"
nodeOODCondition.LastTransitionTime = currentTime
kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasSufficientDisk")
} else {
glog.Infof("Node condition status for OutOfDisk is false, but last transition time is less than %s", kl.outOfDiskTransitionFrequency)
}
}
}
if newOODCondition {
node.Status.Conditions = append(node.Status.Conditions, *nodeOODCondition)
}
}
// Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus()
// TODO: why is this a package var?
var oldNodeUnschedulable bool
// record if node schedulable change.
func (kl *Kubelet) recordNodeSchedulableEvent(node *api.Node) {
if oldNodeUnschedulable != node.Spec.Unschedulable {
if node.Spec.Unschedulable {
kl.recordNodeStatusEvent(api.EventTypeNormal, events.NodeNotSchedulable)
} else {
kl.recordNodeStatusEvent(api.EventTypeNormal, events.NodeSchedulable)
}
oldNodeUnschedulable = node.Spec.Unschedulable
}
}
// Update VolumesInUse field in Node Status
func (kl *Kubelet) setNodeVolumesInUseStatus(node *api.Node) {
node.Status.VolumesInUse = kl.volumeManager.GetVolumesInUse()
}
// setNodeStatus fills in the Status fields of the given Node, overwriting
// any fields that are currently set.
// TODO(madhusudancs): Simplify the logic for setting node conditions and
// refactor the node status condtion code out to a different file.
func (kl *Kubelet) setNodeStatus(node *api.Node) error {
for _, f := range kl.setNodeStatusFuncs {
if err := f(node); err != nil {
return err
}
}
return nil
}
// defaultNodeStatusFuncs is a factory that generates the default set of
// setNodeStatus funcs
func (kl *Kubelet) defaultNodeStatusFuncs() []func(*api.Node) error {
// initial set of node status update handlers, can be modified by Option's
withoutError := func(f func(*api.Node)) func(*api.Node) error {
return func(n *api.Node) error {
f(n)
return nil
}
}
return []func(*api.Node) error{
kl.setNodeAddress,
withoutError(kl.setNodeStatusInfo),
withoutError(kl.setNodeOODCondition),
withoutError(kl.setNodeMemoryPressureCondition),
withoutError(kl.setNodeReadyCondition),
withoutError(kl.setNodeVolumesInUseStatus),
withoutError(kl.recordNodeSchedulableEvent),
}
}
// SetNodeStatus returns a functional Option that adds the given node status
// update handler to the Kubelet
func SetNodeStatus(f func(*api.Node) error) Option {
return func(k *Kubelet) {
k.setNodeStatusFuncs = append(k.setNodeStatusFuncs, f)
}
}

View File

@ -0,0 +1,860 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
"reflect"
goruntime "runtime"
"sort"
"strconv"
"testing"
"time"
cadvisorapi "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/util/rand"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
)
// generateTestingImageList generate randomly generated image list and corresponding expectedImageList.
func generateTestingImageList(count int) ([]kubecontainer.Image, []api.ContainerImage) {
// imageList is randomly generated image list
var imageList []kubecontainer.Image
for ; count > 0; count-- {
imageItem := kubecontainer.Image{
ID: string(util.NewUUID()),
RepoTags: generateImageTags(),
Size: rand.Int63nRange(minImgSize, maxImgSize+1),
}
imageList = append(imageList, imageItem)
}
// expectedImageList is generated by imageList according to size and maxImagesInNodeStatus
// 1. sort the imageList by size
sort.Sort(byImageSize(imageList))
// 2. convert sorted imageList to api.ContainerImage list
var expectedImageList []api.ContainerImage
for _, kubeImage := range imageList {
apiImage := api.ContainerImage{
Names: kubeImage.RepoTags,
SizeBytes: kubeImage.Size,
}
expectedImageList = append(expectedImageList, apiImage)
}
// 3. only returns the top maxImagesInNodeStatus images in expectedImageList
return imageList, expectedImageList[0:maxImagesInNodeStatus]
}
func generateImageTags() []string {
var tagList []string
count := rand.IntnRange(1, maxImageTagsForTest+1)
for ; count > 0; count-- {
tagList = append(tagList, "gcr.io/google_containers:v"+strconv.Itoa(count))
}
return tagList
}
func TestUpdateNewNodeStatus(t *testing.T) {
// generate one more than maxImagesInNodeStatus in inputImageList
inputImageList, expectedImageList := generateTestingImageList(maxImagesInNodeStatus + 1)
testKubelet := newTestKubeletWithImageList(
t, inputImageList, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 10E9, // 10G
}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
// Make kubelet report that it has sufficient disk space.
if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil {
t.Fatalf("can't update disk space manager: %v", err)
}
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeOutOfDisk,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientDisk",
Message: fmt.Sprintf("kubelet has sufficient disk space available"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeMemoryPressure,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
},
NodeInfo: api.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: "3.16.0-0.bpo.4-amd64",
OSImage: "Debian GNU/Linux 7 (wheezy)",
OperatingSystem: goruntime.GOOS,
Architecture: goruntime.GOARCH,
ContainerRuntimeVersion: "test://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI),
},
Allocatable: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"},
{Type: api.NodeInternalIP, Address: "127.0.0.1"},
},
Images: expectedImageList,
},
}
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected object type")
}
for i, cond := range updatedNode.Status.Conditions {
if cond.LastHeartbeatTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp for %v condition", cond.Type)
}
if cond.LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp for %v condition", cond.Type)
}
updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{}
updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{}
}
// Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961
if updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type != api.NodeReady {
t.Errorf("unexpected node condition order. NodeReady should be last.")
}
if maxImagesInNodeStatus != len(updatedNode.Status.Images) {
t.Errorf("unexpected image list length in node status, expected: %v, got: %v", maxImagesInNodeStatus, len(updatedNode.Status.Images))
} else {
if !api.Semantic.DeepEqual(expectedNode, updatedNode) {
t.Errorf("unexpected objects: %s", diff.ObjectDiff(expectedNode, updatedNode))
}
}
}
func TestUpdateNewNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
// Make Kubelet report that it has sufficient disk space.
if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil {
t.Fatalf("can't update disk space manager: %v", err)
}
kubelet.outOfDiskTransitionFrequency = 10 * time.Second
expectedNodeOutOfDiskCondition := api.NodeCondition{
Type: api.NodeOutOfDisk,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientDisk",
Message: fmt.Sprintf("kubelet has sufficient disk space available"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
}
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected object type")
}
var oodCondition api.NodeCondition
for i, cond := range updatedNode.Status.Conditions {
if cond.LastHeartbeatTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp for %v condition", cond.Type)
}
if cond.LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp for %v condition", cond.Type)
}
updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{}
updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{}
if cond.Type == api.NodeOutOfDisk {
oodCondition = updatedNode.Status.Conditions[i]
}
}
if !reflect.DeepEqual(expectedNodeOutOfDiskCondition, oodCondition) {
t.Errorf("unexpected objects: %s", diff.ObjectDiff(expectedNodeOutOfDiskCondition, oodCondition))
}
}
func TestUpdateExistingNodeStatus(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeOutOfDisk,
Status: api.ConditionTrue,
Reason: "KubeletOutOfDisk",
Message: "out of disk space",
LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
Type: api.NodeMemoryPressure,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Allocatable: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2800, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
},
},
}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 20E9,
}
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
// Make kubelet report that it is out of disk space.
if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 50, 50, 100, 100); err != nil {
t.Fatalf("can't update disk space manager: %v", err)
}
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeOutOfDisk,
Status: api.ConditionTrue,
Reason: "KubeletOutOfDisk",
Message: "out of disk space",
LastHeartbeatTime: unversioned.Time{}, // placeholder
LastTransitionTime: unversioned.Time{}, // placeholder
},
{
Type: api.NodeMemoryPressure,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: unversioned.Time{}, // placeholder
LastTransitionTime: unversioned.Time{}, // placeholder
},
},
NodeInfo: api.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: "3.16.0-0.bpo.4-amd64",
OSImage: "Debian GNU/Linux 7 (wheezy)",
OperatingSystem: goruntime.GOOS,
Architecture: goruntime.GOARCH,
ContainerRuntimeVersion: "test://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI),
},
Allocatable: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"},
{Type: api.NodeInternalIP, Address: "127.0.0.1"},
},
// images will be sorted from max to min in node status.
Images: []api.ContainerImage{
{
Names: []string{"gcr.io/google_containers:v3", "gcr.io/google_containers:v4"},
SizeBytes: 456,
},
{
Names: []string{"gcr.io/google_containers:v1", "gcr.io/google_containers:v2"},
SizeBytes: 123,
},
},
},
}
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Errorf("unexpected actions: %v", actions)
}
updateAction, ok := actions[1].(core.UpdateAction)
if !ok {
t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1])
}
updatedNode, ok := updateAction.GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected object type")
}
for i, cond := range updatedNode.Status.Conditions {
// Expect LastProbeTime to be updated to Now, while LastTransitionTime to be the same.
if old := unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time; reflect.DeepEqual(cond.LastHeartbeatTime.Rfc3339Copy().UTC(), old) {
t.Errorf("Condition %v LastProbeTime: expected \n%v\n, got \n%v", cond.Type, unversioned.Now(), old)
}
if got, want := cond.LastTransitionTime.Rfc3339Copy().UTC(), unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time; !reflect.DeepEqual(got, want) {
t.Errorf("Condition %v LastTransitionTime: expected \n%#v\n, got \n%#v", cond.Type, want, got)
}
updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{}
updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{}
}
// Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961
if updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type != api.NodeReady {
t.Errorf("unexpected node condition order. NodeReady should be last.")
}
if !api.Semantic.DeepEqual(expectedNode, updatedNode) {
t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode)
}
}
func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
clock := testKubelet.fakeClock
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: unversioned.NewTime(clock.Now()),
LastTransitionTime: unversioned.NewTime(clock.Now()),
},
{
Type: api.NodeOutOfDisk,
Status: api.ConditionTrue,
Reason: "KubeletOutOfDisk",
Message: "out of disk space",
LastHeartbeatTime: unversioned.NewTime(clock.Now()),
LastTransitionTime: unversioned.NewTime(clock.Now()),
},
},
},
},
}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor.On("Start").Return(nil)
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
kubelet.outOfDiskTransitionFrequency = 5 * time.Second
ood := api.NodeCondition{
Type: api.NodeOutOfDisk,
Status: api.ConditionTrue,
Reason: "KubeletOutOfDisk",
Message: "out of disk space",
LastHeartbeatTime: unversioned.NewTime(clock.Now()), // placeholder
LastTransitionTime: unversioned.NewTime(clock.Now()), // placeholder
}
noOod := api.NodeCondition{
Type: api.NodeOutOfDisk,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientDisk",
Message: fmt.Sprintf("kubelet has sufficient disk space available"),
LastHeartbeatTime: unversioned.NewTime(clock.Now()), // placeholder
LastTransitionTime: unversioned.NewTime(clock.Now()), // placeholder
}
testCases := []struct {
rootFsAvail uint64
dockerFsAvail uint64
expected api.NodeCondition
}{
{
// NodeOutOfDisk==false
rootFsAvail: 200,
dockerFsAvail: 200,
expected: ood,
},
{
// NodeOutOfDisk==true
rootFsAvail: 50,
dockerFsAvail: 200,
expected: ood,
},
{
// NodeOutOfDisk==false
rootFsAvail: 200,
dockerFsAvail: 200,
expected: ood,
},
{
// NodeOutOfDisk==true
rootFsAvail: 200,
dockerFsAvail: 50,
expected: ood,
},
{
// NodeOutOfDisk==false
rootFsAvail: 200,
dockerFsAvail: 200,
expected: noOod,
},
}
kubelet.updateRuntimeUp()
for tcIdx, tc := range testCases {
// Step by a second
clock.Step(1 * time.Second)
// Setup expected times.
tc.expected.LastHeartbeatTime = unversioned.NewTime(clock.Now())
// In the last case, there should be a status transition for NodeOutOfDisk
if tcIdx == len(testCases)-1 {
tc.expected.LastTransitionTime = unversioned.NewTime(clock.Now())
}
// Make kubelet report that it has sufficient disk space
if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, tc.rootFsAvail, tc.dockerFsAvail, 100, 100); err != nil {
t.Fatalf("can't update disk space manager: %v", err)
}
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Errorf("%d. unexpected actions: %v", tcIdx, actions)
}
updateAction, ok := actions[1].(core.UpdateAction)
if !ok {
t.Errorf("%d. unexpected action type. expected UpdateAction, got %#v", tcIdx, actions[1])
}
updatedNode, ok := updateAction.GetObject().(*api.Node)
if !ok {
t.Errorf("%d. unexpected object type", tcIdx)
}
kubeClient.ClearActions()
var oodCondition api.NodeCondition
for i, cond := range updatedNode.Status.Conditions {
if cond.Type == api.NodeOutOfDisk {
oodCondition = updatedNode.Status.Conditions[i]
}
}
if !reflect.DeepEqual(tc.expected, oodCondition) {
t.Errorf("%d.\nwant \n%v\n, got \n%v", tcIdx, tc.expected, oodCondition)
}
}
}
func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
clock := testKubelet.fakeClock
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 10E9,
}
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
// Make kubelet report that it has sufficient disk space.
if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil {
t.Fatalf("can't update disk space manager: %v", err)
}
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeOutOfDisk,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientDisk",
Message: "kubelet has sufficient disk space available",
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeMemoryPressure,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{}, //placeholder
},
NodeInfo: api.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: "3.16.0-0.bpo.4-amd64",
OSImage: "Debian GNU/Linux 7 (wheezy)",
OperatingSystem: goruntime.GOOS,
Architecture: goruntime.GOARCH,
ContainerRuntimeVersion: "test://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI),
},
Allocatable: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"},
{Type: api.NodeInternalIP, Address: "127.0.0.1"},
},
Images: []api.ContainerImage{
{
Names: []string{"gcr.io/google_containers:v3", "gcr.io/google_containers:v4"},
SizeBytes: 456,
},
{
Names: []string{"gcr.io/google_containers:v1", "gcr.io/google_containers:v2"},
SizeBytes: 123,
},
},
},
}
checkNodeStatus := func(status api.ConditionStatus, reason, message string) {
kubeClient.ClearActions()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1])
}
for i, cond := range updatedNode.Status.Conditions {
if cond.LastHeartbeatTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp")
}
if cond.LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp")
}
updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{}
updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{}
}
// Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961
lastIndex := len(updatedNode.Status.Conditions) - 1
if updatedNode.Status.Conditions[lastIndex].Type != api.NodeReady {
t.Errorf("unexpected node condition order. NodeReady should be last.")
}
expectedNode.Status.Conditions[lastIndex] = api.NodeCondition{
Type: api.NodeReady,
Status: status,
Reason: reason,
Message: message,
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
}
if !api.Semantic.DeepEqual(expectedNode, updatedNode) {
t.Errorf("unexpected objects: %s", diff.ObjectDiff(expectedNode, updatedNode))
}
}
readyMessage := "kubelet is posting ready status"
downMessage := "container runtime is down"
// Should report kubelet not ready if the runtime check is out of date
clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime))
kubelet.updateRuntimeUp()
checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage)
// Should report kubelet ready if the runtime check is updated
clock.SetTime(time.Now())
kubelet.updateRuntimeUp()
checkNodeStatus(api.ConditionTrue, "KubeletReady", readyMessage)
// Should report kubelet not ready if the runtime check is out of date
clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime))
kubelet.updateRuntimeUp()
checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage)
// Should report kubelet not ready if the runtime check failed
fakeRuntime := testKubelet.fakeRuntime
// Inject error into fake runtime status check, node should be NotReady
fakeRuntime.StatusErr = fmt.Errorf("injected runtime status error")
clock.SetTime(time.Now())
kubelet.updateRuntimeUp()
checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage)
}
func TestUpdateNodeStatusError(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
// No matching node for the kubelet
testKubelet.fakeKubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{}}).ReactionChain
if err := kubelet.updateNodeStatus(); err == nil {
t.Errorf("unexpected non error: %v", err)
}
if len(testKubelet.fakeKubeClient.Actions()) != nodeStatusUpdateRetry {
t.Errorf("unexpected actions: %v", testKubelet.fakeKubeClient.Actions())
}
}
func TestRegisterExistingNodeWithApiserver(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.AddReactor("create", "nodes", func(action core.Action) (bool, runtime.Object, error) {
// Return an error on create.
return true, &api.Node{}, &apierrors.StatusError{
ErrStatus: unversioned.Status{Reason: unversioned.StatusReasonAlreadyExists},
}
})
kubeClient.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
// Return an existing (matching) node on get.
return true, &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{ExternalID: testKubeletHostname},
}, nil
})
kubeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
})
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
mockCadvisor.On("ImagesFsInfo").Return(cadvisorapiv2.FsInfo{
Usage: 400 * mb,
Capacity: 1000 * mb,
Available: 600 * mb,
}, nil)
mockCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{
Usage: 9 * mb,
Capacity: 10 * mb,
}, nil)
done := make(chan struct{})
go func() {
kubelet.registerWithApiserver()
done <- struct{}{}
}()
select {
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for registration")
case <-done:
return
}
}

View File

@ -24,9 +24,7 @@ import (
"net"
"os"
"reflect"
goruntime "runtime"
"sort"
"strconv"
"testing"
"time"
@ -34,7 +32,6 @@ import (
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
@ -68,12 +65,10 @@ import (
"k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/rand"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/term"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume"
_ "k8s.io/kubernetes/pkg/volume/host_path"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
@ -126,45 +121,6 @@ func newTestKubelet(t *testing.T, controllerAttachDetachEnabled bool) *TestKubel
return newTestKubeletWithImageList(t, imageList, controllerAttachDetachEnabled)
}
// generateTestingImageList generate randomly generated image list and corresponding expectedImageList.
func generateTestingImageList(count int) ([]kubecontainer.Image, []api.ContainerImage) {
// imageList is randomly generated image list
var imageList []kubecontainer.Image
for ; count > 0; count-- {
imageItem := kubecontainer.Image{
ID: string(util.NewUUID()),
RepoTags: generateImageTags(),
Size: rand.Int63nRange(minImgSize, maxImgSize+1),
}
imageList = append(imageList, imageItem)
}
// expectedImageList is generated by imageList according to size and maxImagesInNodeStatus
// 1. sort the imageList by size
sort.Sort(byImageSize(imageList))
// 2. convert sorted imageList to api.ContainerImage list
var expectedImageList []api.ContainerImage
for _, kubeImage := range imageList {
apiImage := api.ContainerImage{
Names: kubeImage.RepoTags,
SizeBytes: kubeImage.Size,
}
expectedImageList = append(expectedImageList, apiImage)
}
// 3. only returns the top maxImagesInNodeStatus images in expectedImageList
return imageList, expectedImageList[0:maxImagesInNodeStatus]
}
func generateImageTags() []string {
var tagList []string
count := rand.IntnRange(1, maxImageTagsForTest+1)
for ; count > 0; count-- {
tagList = append(tagList, "gcr.io/google_containers:v"+strconv.Itoa(count))
}
return tagList
}
func newTestKubeletWithImageList(
t *testing.T,
imageList []kubecontainer.Image,
@ -2597,726 +2553,6 @@ func updateDiskSpacePolicy(kubelet *Kubelet, mockCadvisor *cadvisortest.Mock, ro
return nil
}
func TestUpdateNewNodeStatus(t *testing.T) {
// generate one more than maxImagesInNodeStatus in inputImageList
inputImageList, expectedImageList := generateTestingImageList(maxImagesInNodeStatus + 1)
testKubelet := newTestKubeletWithImageList(
t, inputImageList, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 10E9, // 10G
}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
// Make kubelet report that it has sufficient disk space.
if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil {
t.Fatalf("can't update disk space manager: %v", err)
}
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeOutOfDisk,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientDisk",
Message: fmt.Sprintf("kubelet has sufficient disk space available"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeMemoryPressure,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
},
NodeInfo: api.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: "3.16.0-0.bpo.4-amd64",
OSImage: "Debian GNU/Linux 7 (wheezy)",
OperatingSystem: goruntime.GOOS,
Architecture: goruntime.GOARCH,
ContainerRuntimeVersion: "test://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI),
},
Allocatable: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"},
{Type: api.NodeInternalIP, Address: "127.0.0.1"},
},
Images: expectedImageList,
},
}
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected object type")
}
for i, cond := range updatedNode.Status.Conditions {
if cond.LastHeartbeatTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp for %v condition", cond.Type)
}
if cond.LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp for %v condition", cond.Type)
}
updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{}
updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{}
}
// Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961
if updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type != api.NodeReady {
t.Errorf("unexpected node condition order. NodeReady should be last.")
}
if maxImagesInNodeStatus != len(updatedNode.Status.Images) {
t.Errorf("unexpected image list length in node status, expected: %v, got: %v", maxImagesInNodeStatus, len(updatedNode.Status.Images))
} else {
if !api.Semantic.DeepEqual(expectedNode, updatedNode) {
t.Errorf("unexpected objects: %s", diff.ObjectDiff(expectedNode, updatedNode))
}
}
}
func TestUpdateNewNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
// Make Kubelet report that it has sufficient disk space.
if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil {
t.Fatalf("can't update disk space manager: %v", err)
}
kubelet.outOfDiskTransitionFrequency = 10 * time.Second
expectedNodeOutOfDiskCondition := api.NodeCondition{
Type: api.NodeOutOfDisk,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientDisk",
Message: fmt.Sprintf("kubelet has sufficient disk space available"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
}
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected object type")
}
var oodCondition api.NodeCondition
for i, cond := range updatedNode.Status.Conditions {
if cond.LastHeartbeatTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp for %v condition", cond.Type)
}
if cond.LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp for %v condition", cond.Type)
}
updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{}
updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{}
if cond.Type == api.NodeOutOfDisk {
oodCondition = updatedNode.Status.Conditions[i]
}
}
if !reflect.DeepEqual(expectedNodeOutOfDiskCondition, oodCondition) {
t.Errorf("unexpected objects: %s", diff.ObjectDiff(expectedNodeOutOfDiskCondition, oodCondition))
}
}
func TestUpdateExistingNodeStatus(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeOutOfDisk,
Status: api.ConditionTrue,
Reason: "KubeletOutOfDisk",
Message: "out of disk space",
LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
Type: api.NodeMemoryPressure,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(3000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
Allocatable: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2800, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
},
},
},
}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 20E9,
}
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
// Make kubelet report that it is out of disk space.
if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 50, 50, 100, 100); err != nil {
t.Fatalf("can't update disk space manager: %v", err)
}
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeOutOfDisk,
Status: api.ConditionTrue,
Reason: "KubeletOutOfDisk",
Message: "out of disk space",
LastHeartbeatTime: unversioned.Time{}, // placeholder
LastTransitionTime: unversioned.Time{}, // placeholder
},
{
Type: api.NodeMemoryPressure,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: unversioned.Time{}, // placeholder
LastTransitionTime: unversioned.Time{}, // placeholder
},
},
NodeInfo: api.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: "3.16.0-0.bpo.4-amd64",
OSImage: "Debian GNU/Linux 7 (wheezy)",
OperatingSystem: goruntime.GOOS,
Architecture: goruntime.GOARCH,
ContainerRuntimeVersion: "test://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(20E9, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI),
},
Allocatable: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(19900E6, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"},
{Type: api.NodeInternalIP, Address: "127.0.0.1"},
},
// images will be sorted from max to min in node status.
Images: []api.ContainerImage{
{
Names: []string{"gcr.io/google_containers:v3", "gcr.io/google_containers:v4"},
SizeBytes: 456,
},
{
Names: []string{"gcr.io/google_containers:v1", "gcr.io/google_containers:v2"},
SizeBytes: 123,
},
},
},
}
kubelet.updateRuntimeUp()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Errorf("unexpected actions: %v", actions)
}
updateAction, ok := actions[1].(core.UpdateAction)
if !ok {
t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1])
}
updatedNode, ok := updateAction.GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected object type")
}
for i, cond := range updatedNode.Status.Conditions {
// Expect LastProbeTime to be updated to Now, while LastTransitionTime to be the same.
if old := unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time; reflect.DeepEqual(cond.LastHeartbeatTime.Rfc3339Copy().UTC(), old) {
t.Errorf("Condition %v LastProbeTime: expected \n%v\n, got \n%v", cond.Type, unversioned.Now(), old)
}
if got, want := cond.LastTransitionTime.Rfc3339Copy().UTC(), unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC).Time; !reflect.DeepEqual(got, want) {
t.Errorf("Condition %v LastTransitionTime: expected \n%#v\n, got \n%#v", cond.Type, want, got)
}
updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{}
updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{}
}
// Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961
if updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type != api.NodeReady {
t.Errorf("unexpected node condition order. NodeReady should be last.")
}
if !api.Semantic.DeepEqual(expectedNode, updatedNode) {
t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode)
}
}
func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
clock := testKubelet.fakeClock
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionTrue,
Reason: "KubeletReady",
Message: fmt.Sprintf("kubelet is posting ready status"),
LastHeartbeatTime: unversioned.NewTime(clock.Now()),
LastTransitionTime: unversioned.NewTime(clock.Now()),
},
{
Type: api.NodeOutOfDisk,
Status: api.ConditionTrue,
Reason: "KubeletOutOfDisk",
Message: "out of disk space",
LastHeartbeatTime: unversioned.NewTime(clock.Now()),
LastTransitionTime: unversioned.NewTime(clock.Now()),
},
},
},
},
}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor.On("Start").Return(nil)
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
kubelet.outOfDiskTransitionFrequency = 5 * time.Second
ood := api.NodeCondition{
Type: api.NodeOutOfDisk,
Status: api.ConditionTrue,
Reason: "KubeletOutOfDisk",
Message: "out of disk space",
LastHeartbeatTime: unversioned.NewTime(clock.Now()), // placeholder
LastTransitionTime: unversioned.NewTime(clock.Now()), // placeholder
}
noOod := api.NodeCondition{
Type: api.NodeOutOfDisk,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientDisk",
Message: fmt.Sprintf("kubelet has sufficient disk space available"),
LastHeartbeatTime: unversioned.NewTime(clock.Now()), // placeholder
LastTransitionTime: unversioned.NewTime(clock.Now()), // placeholder
}
testCases := []struct {
rootFsAvail uint64
dockerFsAvail uint64
expected api.NodeCondition
}{
{
// NodeOutOfDisk==false
rootFsAvail: 200,
dockerFsAvail: 200,
expected: ood,
},
{
// NodeOutOfDisk==true
rootFsAvail: 50,
dockerFsAvail: 200,
expected: ood,
},
{
// NodeOutOfDisk==false
rootFsAvail: 200,
dockerFsAvail: 200,
expected: ood,
},
{
// NodeOutOfDisk==true
rootFsAvail: 200,
dockerFsAvail: 50,
expected: ood,
},
{
// NodeOutOfDisk==false
rootFsAvail: 200,
dockerFsAvail: 200,
expected: noOod,
},
}
kubelet.updateRuntimeUp()
for tcIdx, tc := range testCases {
// Step by a second
clock.Step(1 * time.Second)
// Setup expected times.
tc.expected.LastHeartbeatTime = unversioned.NewTime(clock.Now())
// In the last case, there should be a status transition for NodeOutOfDisk
if tcIdx == len(testCases)-1 {
tc.expected.LastTransitionTime = unversioned.NewTime(clock.Now())
}
// Make kubelet report that it has sufficient disk space
if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, tc.rootFsAvail, tc.dockerFsAvail, 100, 100); err != nil {
t.Fatalf("can't update disk space manager: %v", err)
}
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Errorf("%d. unexpected actions: %v", tcIdx, actions)
}
updateAction, ok := actions[1].(core.UpdateAction)
if !ok {
t.Errorf("%d. unexpected action type. expected UpdateAction, got %#v", tcIdx, actions[1])
}
updatedNode, ok := updateAction.GetObject().(*api.Node)
if !ok {
t.Errorf("%d. unexpected object type", tcIdx)
}
kubeClient.ClearActions()
var oodCondition api.NodeCondition
for i, cond := range updatedNode.Status.Conditions {
if cond.Type == api.NodeOutOfDisk {
oodCondition = updatedNode.Status.Conditions[i]
}
}
if !reflect.DeepEqual(tc.expected, oodCondition) {
t.Errorf("%d.\nwant \n%v\n, got \n%v", tcIdx, tc.expected, oodCondition)
}
}
}
func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
clock := testKubelet.fakeClock
kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{
{ObjectMeta: api.ObjectMeta{Name: testKubeletHostname}},
}}).ReactionChain
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("Start").Return(nil)
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 10E9,
}
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
// Make kubelet report that it has sufficient disk space.
if err := updateDiskSpacePolicy(kubelet, mockCadvisor, 500, 500, 200, 200, 100, 100); err != nil {
t.Fatalf("can't update disk space manager: %v", err)
}
expectedNode := &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeOutOfDisk,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientDisk",
Message: "kubelet has sufficient disk space available",
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{
Type: api.NodeMemoryPressure,
Status: api.ConditionFalse,
Reason: "KubeletHasSufficientMemory",
Message: fmt.Sprintf("kubelet has sufficient memory available"),
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
},
{}, //placeholder
},
NodeInfo: api.NodeSystemInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
KernelVersion: "3.16.0-0.bpo.4-amd64",
OSImage: "Debian GNU/Linux 7 (wheezy)",
OperatingSystem: goruntime.GOOS,
Architecture: goruntime.GOARCH,
ContainerRuntimeVersion: "test://1.5.0",
KubeletVersion: version.Get().String(),
KubeProxyVersion: version.Get().String(),
},
Capacity: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI),
},
Allocatable: api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(1800, resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(9900E6, resource.BinarySI),
api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI),
api.ResourceNvidiaGPU: *resource.NewQuantity(0, resource.DecimalSI),
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"},
{Type: api.NodeInternalIP, Address: "127.0.0.1"},
},
Images: []api.ContainerImage{
{
Names: []string{"gcr.io/google_containers:v3", "gcr.io/google_containers:v4"},
SizeBytes: 456,
},
{
Names: []string{"gcr.io/google_containers:v1", "gcr.io/google_containers:v2"},
SizeBytes: 123,
},
},
},
}
checkNodeStatus := func(status api.ConditionStatus, reason, message string) {
kubeClient.ClearActions()
if err := kubelet.updateNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
actions := kubeClient.Actions()
if len(actions) != 2 {
t.Fatalf("unexpected actions: %v", actions)
}
if !actions[1].Matches("update", "nodes") || actions[1].GetSubresource() != "status" {
t.Fatalf("unexpected actions: %v", actions)
}
updatedNode, ok := actions[1].(core.UpdateAction).GetObject().(*api.Node)
if !ok {
t.Errorf("unexpected action type. expected UpdateAction, got %#v", actions[1])
}
for i, cond := range updatedNode.Status.Conditions {
if cond.LastHeartbeatTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp")
}
if cond.LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp")
}
updatedNode.Status.Conditions[i].LastHeartbeatTime = unversioned.Time{}
updatedNode.Status.Conditions[i].LastTransitionTime = unversioned.Time{}
}
// Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961
lastIndex := len(updatedNode.Status.Conditions) - 1
if updatedNode.Status.Conditions[lastIndex].Type != api.NodeReady {
t.Errorf("unexpected node condition order. NodeReady should be last.")
}
expectedNode.Status.Conditions[lastIndex] = api.NodeCondition{
Type: api.NodeReady,
Status: status,
Reason: reason,
Message: message,
LastHeartbeatTime: unversioned.Time{},
LastTransitionTime: unversioned.Time{},
}
if !api.Semantic.DeepEqual(expectedNode, updatedNode) {
t.Errorf("unexpected objects: %s", diff.ObjectDiff(expectedNode, updatedNode))
}
}
readyMessage := "kubelet is posting ready status"
downMessage := "container runtime is down"
// Should report kubelet not ready if the runtime check is out of date
clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime))
kubelet.updateRuntimeUp()
checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage)
// Should report kubelet ready if the runtime check is updated
clock.SetTime(time.Now())
kubelet.updateRuntimeUp()
checkNodeStatus(api.ConditionTrue, "KubeletReady", readyMessage)
// Should report kubelet not ready if the runtime check is out of date
clock.SetTime(time.Now().Add(-maxWaitForContainerRuntime))
kubelet.updateRuntimeUp()
checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage)
// Should report kubelet not ready if the runtime check failed
fakeRuntime := testKubelet.fakeRuntime
// Inject error into fake runtime status check, node should be NotReady
fakeRuntime.StatusErr = fmt.Errorf("injected runtime status error")
clock.SetTime(time.Now())
kubelet.updateRuntimeUp()
checkNodeStatus(api.ConditionFalse, "KubeletNotReady", downMessage)
}
func TestUpdateNodeStatusError(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
// No matching node for the kubelet
testKubelet.fakeKubeClient.ReactionChain = fake.NewSimpleClientset(&api.NodeList{Items: []api.Node{}}).ReactionChain
if err := kubelet.updateNodeStatus(); err == nil {
t.Errorf("unexpected non error: %v", err)
}
if len(testKubelet.fakeKubeClient.Actions()) != nodeStatusUpdateRetry {
t.Errorf("unexpected actions: %v", testKubelet.fakeKubeClient.Actions())
}
}
func TestCreateMirrorPod(t *testing.T) {
for _, updateType := range []kubetypes.SyncPodType{kubetypes.SyncPodCreate, kubetypes.SyncPodUpdate} {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
@ -3623,7 +2859,7 @@ func TestPrivilegeContainerAllowed(t *testing.T) {
}
}
func TestPrivilegeContainerDisallowed(t *testing.T) {
func TestPrivilegedContainerDisallowed(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
testKubelet.fakeCadvisor.On("VersionInfo").Return(&cadvisorapi.VersionInfo{}, nil)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
@ -3668,64 +2904,6 @@ func TestFilterOutTerminatedPods(t *testing.T) {
}
}
func TestRegisterExistingNodeWithApiserver(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.AddReactor("create", "nodes", func(action core.Action) (bool, runtime.Object, error) {
// Return an error on create.
return true, &api.Node{}, &apierrors.StatusError{
ErrStatus: unversioned.Status{Reason: unversioned.StatusReasonAlreadyExists},
}
})
kubeClient.AddReactor("get", "nodes", func(action core.Action) (bool, runtime.Object, error) {
// Return an existing (matching) node on get.
return true, &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Spec: api.NodeSpec{ExternalID: testKubeletHostname},
}, nil
})
kubeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
})
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
BootID: "1b3",
NumCores: 2,
MemoryCapacity: 1024,
}
mockCadvisor := testKubelet.fakeCadvisor
mockCadvisor.On("MachineInfo").Return(machineInfo, nil)
versionInfo := &cadvisorapi.VersionInfo{
KernelVersion: "3.16.0-0.bpo.4-amd64",
ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)",
DockerVersion: "1.5.0",
}
mockCadvisor.On("VersionInfo").Return(versionInfo, nil)
mockCadvisor.On("ImagesFsInfo").Return(cadvisorapiv2.FsInfo{
Usage: 400 * mb,
Capacity: 1000 * mb,
Available: 600 * mb,
}, nil)
mockCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{
Usage: 9 * mb,
Capacity: 10 * mb,
}, nil)
done := make(chan struct{})
go func() {
kubelet.registerWithApiserver()
done <- struct{}{}
}()
select {
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for registration")
case <-done:
return
}
}
func TestMakePortMappings(t *testing.T) {
port := func(name string, protocol api.Protocol, containerPort, hostPort int32, ip string) api.ContainerPort {
return api.ContainerPort{

View File

@ -18,6 +18,7 @@ package kubelet
import (
"fmt"
"os"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/capabilities"
@ -118,3 +119,12 @@ func allowHostIPC(pod *api.Pod) (bool, error) {
}
return false, nil
}
// dirExists returns true if the path exists and represents a directory.
func dirExists(path string) bool {
s, err := os.Stat(path)
if err != nil {
return false
}
return s.IsDir()
}