Merge pull request #21448 from timstclair/worker-race
Auto commit by PR queue bot
This commit is contained in:
		| @@ -109,8 +109,6 @@ func NewManager(kubeClient clientset.Interface, podManager kubepod.Manager) Mana | |||||||
| // This method normalizes the status before comparing so as to make sure that meaningless | // This method normalizes the status before comparing so as to make sure that meaningless | ||||||
| // changes will be ignored. | // changes will be ignored. | ||||||
| func isStatusEqual(oldStatus, status *api.PodStatus) bool { | func isStatusEqual(oldStatus, status *api.PodStatus) bool { | ||||||
| 	normalizeStatus(oldStatus) |  | ||||||
| 	normalizeStatus(status) |  | ||||||
| 	return api.Semantic.DeepEqual(status, oldStatus) | 	return api.Semantic.DeepEqual(status, oldStatus) | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -146,7 +144,11 @@ func (m *manager) GetPodStatus(uid types.UID) (api.PodStatus, bool) { | |||||||
| func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) { | func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) { | ||||||
| 	m.podStatusesLock.Lock() | 	m.podStatusesLock.Lock() | ||||||
| 	defer m.podStatusesLock.Unlock() | 	defer m.podStatusesLock.Unlock() | ||||||
|  | 	// Make sure we're caching a deep copy. | ||||||
|  | 	status, err := copyStatus(&status) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
| 	m.updateStatusInternal(pod, status) | 	m.updateStatusInternal(pod, status) | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -188,12 +190,10 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Make sure we're not updating the cached version. | 	// Make sure we're not updating the cached version. | ||||||
| 	clone, err := api.Scheme.DeepCopy(&oldStatus.status) | 	status, err := copyStatus(&oldStatus.status) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		glog.Errorf("Failed to clone status %+v: %v", oldStatus.status, err) |  | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	status := *clone.(*api.PodStatus) |  | ||||||
| 	status.ContainerStatuses[containerIndex].Ready = ready | 	status.ContainerStatuses[containerIndex].Ready = ready | ||||||
|  |  | ||||||
| 	// Update pod condition. | 	// Update pod condition. | ||||||
| @@ -267,6 +267,7 @@ func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool | |||||||
| 		status.StartTime = &now | 		status.StartTime = &now | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	normalizeStatus(&status) | ||||||
| 	// The intent here is to prevent concurrent updates to a pod's status from | 	// The intent here is to prevent concurrent updates to a pod's status from | ||||||
| 	// clobbering each other so the phase of a pod progresses monotonically. | 	// clobbering each other so the phase of a pod progresses monotonically. | ||||||
| 	if isCached && isStatusEqual(&cachedStatus.status, &status) && pod.DeletionTimestamp == nil { | 	if isCached && isStatusEqual(&cachedStatus.status, &status) && pod.DeletionTimestamp == nil { | ||||||
| @@ -435,13 +436,19 @@ func (m *manager) needsReconcile(uid types.UID, status api.PodStatus) bool { | |||||||
| 		pod = mirrorPod | 		pod = mirrorPod | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if isStatusEqual(&pod.Status, &status) { | 	podStatus, err := copyStatus(&pod.Status) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  | 	normalizeStatus(&podStatus) | ||||||
|  |  | ||||||
|  | 	if isStatusEqual(&podStatus, &status) { | ||||||
| 		// If the status from the source is the same with the cached status, | 		// If the status from the source is the same with the cached status, | ||||||
| 		// reconcile is not needed. Just return. | 		// reconcile is not needed. Just return. | ||||||
| 		return false | 		return false | ||||||
| 	} | 	} | ||||||
| 	glog.V(3).Infof("Pod status is inconsistent with cached status for pod %q, a reconciliation should be triggered:\n %+v", format.Pod(pod), | 	glog.V(3).Infof("Pod status is inconsistent with cached status for pod %q, a reconciliation should be triggered:\n %+v", format.Pod(pod), | ||||||
| 		util.ObjectDiff(pod.Status, status)) | 		util.ObjectDiff(podStatus, status)) | ||||||
|  |  | ||||||
| 	return true | 	return true | ||||||
| } | } | ||||||
| @@ -495,3 +502,13 @@ func notRunning(statuses []api.ContainerStatus) bool { | |||||||
| 	} | 	} | ||||||
| 	return true | 	return true | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func copyStatus(source *api.PodStatus) (api.PodStatus, error) { | ||||||
|  | 	clone, err := api.Scheme.DeepCopy(source) | ||||||
|  | 	if err != nil { | ||||||
|  | 		glog.Errorf("Failed to clone status %+v: %v", source, err) | ||||||
|  | 		return api.PodStatus{}, err | ||||||
|  | 	} | ||||||
|  | 	status := *clone.(*api.PodStatus) | ||||||
|  | 	return status, nil | ||||||
|  | } | ||||||
|   | |||||||
| @@ -205,8 +205,9 @@ func TestChangedStatusKeepsStartTime(t *testing.T) { | |||||||
| 	if finalStatus.StartTime.IsZero() { | 	if finalStatus.StartTime.IsZero() { | ||||||
| 		t.Errorf("StartTime should not be zero") | 		t.Errorf("StartTime should not be zero") | ||||||
| 	} | 	} | ||||||
| 	if !finalStatus.StartTime.Time.Equal(now.Time) { | 	expected := now.Rfc3339Copy() | ||||||
| 		t.Errorf("Expected %v, but got %v", now.Time, finalStatus.StartTime.Time) | 	if !finalStatus.StartTime.Equal(expected) { | ||||||
|  | 		t.Errorf("Expected %v, but got %v", expected, finalStatus.StartTime) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -464,8 +465,10 @@ func TestStatusEquality(t *testing.T) { | |||||||
| 		oldPodStatus := api.PodStatus{ | 		oldPodStatus := api.PodStatus{ | ||||||
| 			ContainerStatuses: shuffle(podStatus.ContainerStatuses), | 			ContainerStatuses: shuffle(podStatus.ContainerStatuses), | ||||||
| 		} | 		} | ||||||
|  | 		normalizeStatus(&oldPodStatus) | ||||||
|  | 		normalizeStatus(&podStatus) | ||||||
| 		if !isStatusEqual(&oldPodStatus, &podStatus) { | 		if !isStatusEqual(&oldPodStatus, &podStatus) { | ||||||
| 			t.Fatalf("Order of container statuses should not affect equality.") | 			t.Fatalf("Order of container statuses should not affect normalized equality.") | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| @@ -494,6 +497,7 @@ func TestStaticPodStatus(t *testing.T) { | |||||||
|  |  | ||||||
| 	m.SetPodStatus(staticPod, status) | 	m.SetPodStatus(staticPod, status) | ||||||
| 	retrievedStatus := expectPodStatus(t, m, staticPod) | 	retrievedStatus := expectPodStatus(t, m, staticPod) | ||||||
|  | 	normalizeStatus(&status) | ||||||
| 	assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) | 	assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) | ||||||
| 	retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID) | 	retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID) | ||||||
| 	assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) | 	assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 k8s-merge-robot
					k8s-merge-robot