Merge pull request #18240 from timstclair/readiness
Update PodReady condition when updating container readiness
This commit is contained in:
commit
5457f5ace2
@ -3064,59 +3064,6 @@ func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func readyPodCondition(isPodReady bool, reason, message string) []api.PodCondition {
|
|
||||||
condition := api.PodCondition{
|
|
||||||
Type: api.PodReady,
|
|
||||||
}
|
|
||||||
if isPodReady {
|
|
||||||
condition.Status = api.ConditionTrue
|
|
||||||
} else {
|
|
||||||
condition.Status = api.ConditionFalse
|
|
||||||
}
|
|
||||||
condition.Reason = reason
|
|
||||||
condition.Message = message
|
|
||||||
return []api.PodCondition{condition}
|
|
||||||
}
|
|
||||||
|
|
||||||
// getPodReadyCondition returns ready condition if all containers in a pod are ready, else it returns an unready condition.
|
|
||||||
func getPodReadyCondition(spec *api.PodSpec, containerStatuses []api.ContainerStatus, podPhase api.PodPhase) []api.PodCondition {
|
|
||||||
// Find if all containers are ready or not.
|
|
||||||
if containerStatuses == nil {
|
|
||||||
return readyPodCondition(false, "UnknownContainerStatuses", "")
|
|
||||||
}
|
|
||||||
unknownContainers := []string{}
|
|
||||||
unreadyContainers := []string{}
|
|
||||||
for _, container := range spec.Containers {
|
|
||||||
if containerStatus, ok := api.GetContainerStatus(containerStatuses, container.Name); ok {
|
|
||||||
if !containerStatus.Ready {
|
|
||||||
unreadyContainers = append(unreadyContainers, container.Name)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
unknownContainers = append(unknownContainers, container.Name)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// In case of unexist unknowContainers, If pod has derminated successed and it has unreadyContainers, just return PodCompleted
|
|
||||||
if podPhase == api.PodSucceeded && len(unknownContainers) == 0 {
|
|
||||||
return readyPodCondition(false, fmt.Sprint("PodCompleted"), "")
|
|
||||||
}
|
|
||||||
|
|
||||||
unreadyMessages := []string{}
|
|
||||||
if len(unknownContainers) > 0 {
|
|
||||||
unreadyMessages = append(unreadyMessages, fmt.Sprintf("containers with unknown status: %s", unknownContainers))
|
|
||||||
}
|
|
||||||
if len(unreadyContainers) > 0 {
|
|
||||||
unreadyMessages = append(unreadyMessages, fmt.Sprintf("containers with unready status: %s", unreadyContainers))
|
|
||||||
}
|
|
||||||
unreadyMessage := strings.Join(unreadyMessages, ", ")
|
|
||||||
if unreadyMessage != "" {
|
|
||||||
// return unready status.
|
|
||||||
return readyPodCondition(false, "ContainersNotReady", unreadyMessage)
|
|
||||||
}
|
|
||||||
// return ready status.
|
|
||||||
return readyPodCondition(true, "", "")
|
|
||||||
}
|
|
||||||
|
|
||||||
// By passing the pod directly, this method avoids pod lookup, which requires
|
// By passing the pod directly, this method avoids pod lookup, which requires
|
||||||
// grabbing a lock.
|
// grabbing a lock.
|
||||||
// TODO (random-liu) api.PodStatus is named as podStatus, this maybe confusing, this may happen in other functions
|
// TODO (random-liu) api.PodStatus is named as podStatus, this maybe confusing, this may happen in other functions
|
||||||
@ -3164,7 +3111,7 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {
|
|||||||
podStatus.Phase = GetPhase(spec, podStatus.ContainerStatuses)
|
podStatus.Phase = GetPhase(spec, podStatus.ContainerStatuses)
|
||||||
kl.probeManager.UpdatePodStatus(pod.UID, podStatus)
|
kl.probeManager.UpdatePodStatus(pod.UID, podStatus)
|
||||||
|
|
||||||
podStatus.Conditions = append(podStatus.Conditions, getPodReadyCondition(spec, podStatus.ContainerStatuses, podStatus.Phase)...)
|
podStatus.Conditions = append(podStatus.Conditions, status.GeneratePodReadyCondition(spec, podStatus.ContainerStatuses, podStatus.Phase))
|
||||||
|
|
||||||
if !kl.standaloneMode {
|
if !kl.standaloneMode {
|
||||||
hostIP, err := kl.GetHostIP()
|
hostIP, err := kl.GetHostIP()
|
||||||
|
@ -1909,119 +1909,6 @@ func TestPodPhaseWithRestartOnFailure(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getReadyStatus(cName string) api.ContainerStatus {
|
|
||||||
return api.ContainerStatus{
|
|
||||||
Name: cName,
|
|
||||||
Ready: true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
func getNotReadyStatus(cName string) api.ContainerStatus {
|
|
||||||
return api.ContainerStatus{
|
|
||||||
Name: cName,
|
|
||||||
Ready: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
func getReadyCondition(status api.ConditionStatus, reason, message string) []api.PodCondition {
|
|
||||||
return []api.PodCondition{{
|
|
||||||
Type: api.PodReady,
|
|
||||||
Status: status,
|
|
||||||
Reason: reason,
|
|
||||||
Message: message,
|
|
||||||
}}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetPodReadyCondition(t *testing.T) {
|
|
||||||
tests := []struct {
|
|
||||||
spec *api.PodSpec
|
|
||||||
containerStatuses []api.ContainerStatus
|
|
||||||
podPhase api.PodPhase
|
|
||||||
expected []api.PodCondition
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
spec: nil,
|
|
||||||
containerStatuses: nil,
|
|
||||||
podPhase: api.PodRunning,
|
|
||||||
expected: getReadyCondition(api.ConditionFalse, "UnknownContainerStatuses", ""),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
spec: &api.PodSpec{},
|
|
||||||
containerStatuses: []api.ContainerStatus{},
|
|
||||||
podPhase: api.PodRunning,
|
|
||||||
expected: getReadyCondition(api.ConditionTrue, "", ""),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
spec: &api.PodSpec{
|
|
||||||
Containers: []api.Container{
|
|
||||||
{Name: "1234"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
containerStatuses: []api.ContainerStatus{},
|
|
||||||
podPhase: api.PodRunning,
|
|
||||||
expected: getReadyCondition(api.ConditionFalse, "ContainersNotReady", "containers with unknown status: [1234]"),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
spec: &api.PodSpec{
|
|
||||||
Containers: []api.Container{
|
|
||||||
{Name: "1234"},
|
|
||||||
{Name: "5678"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
containerStatuses: []api.ContainerStatus{
|
|
||||||
getReadyStatus("1234"),
|
|
||||||
getReadyStatus("5678"),
|
|
||||||
},
|
|
||||||
podPhase: api.PodRunning,
|
|
||||||
expected: getReadyCondition(api.ConditionTrue, "", ""),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
spec: &api.PodSpec{
|
|
||||||
Containers: []api.Container{
|
|
||||||
{Name: "1234"},
|
|
||||||
{Name: "5678"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
containerStatuses: []api.ContainerStatus{
|
|
||||||
getReadyStatus("1234"),
|
|
||||||
},
|
|
||||||
podPhase: api.PodRunning,
|
|
||||||
expected: getReadyCondition(api.ConditionFalse, "ContainersNotReady", "containers with unknown status: [5678]"),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
spec: &api.PodSpec{
|
|
||||||
Containers: []api.Container{
|
|
||||||
{Name: "1234"},
|
|
||||||
{Name: "5678"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
containerStatuses: []api.ContainerStatus{
|
|
||||||
getReadyStatus("1234"),
|
|
||||||
getNotReadyStatus("5678"),
|
|
||||||
},
|
|
||||||
podPhase: api.PodRunning,
|
|
||||||
expected: getReadyCondition(api.ConditionFalse, "ContainersNotReady", "containers with unready status: [5678]"),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
spec: &api.PodSpec{
|
|
||||||
Containers: []api.Container{
|
|
||||||
{Name: "1234"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
containerStatuses: []api.ContainerStatus{
|
|
||||||
getNotReadyStatus("1234"),
|
|
||||||
},
|
|
||||||
podPhase: api.PodSucceeded,
|
|
||||||
expected: getReadyCondition(api.ConditionFalse, "PodCompleted", ""),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for i, test := range tests {
|
|
||||||
condition := getPodReadyCondition(test.spec, test.containerStatuses, test.podPhase)
|
|
||||||
if !reflect.DeepEqual(condition, test.expected) {
|
|
||||||
t.Errorf("On test case %v, expected:\n%+v\ngot\n%+v\n", i, test.expected, condition)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestExecInContainerNoSuchPod(t *testing.T) {
|
func TestExecInContainerNoSuchPod(t *testing.T) {
|
||||||
testKubelet := newTestKubelet(t)
|
testKubelet := newTestKubelet(t)
|
||||||
kubelet := testKubelet.kubelet
|
kubelet := testKubelet.kubelet
|
||||||
|
79
pkg/kubelet/status/generate.go
Normal file
79
pkg/kubelet/status/generate.go
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package status
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GeneratePodReadyCondition returns ready condition if all containers in a pod are ready, else it
|
||||||
|
// returns an unready condition.
|
||||||
|
func GeneratePodReadyCondition(spec *api.PodSpec, containerStatuses []api.ContainerStatus, podPhase api.PodPhase) api.PodCondition {
|
||||||
|
// Find if all containers are ready or not.
|
||||||
|
if containerStatuses == nil {
|
||||||
|
return api.PodCondition{
|
||||||
|
Type: api.PodReady,
|
||||||
|
Status: api.ConditionFalse,
|
||||||
|
Reason: "UnknownContainerStatuses",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
unknownContainers := []string{}
|
||||||
|
unreadyContainers := []string{}
|
||||||
|
for _, container := range spec.Containers {
|
||||||
|
if containerStatus, ok := api.GetContainerStatus(containerStatuses, container.Name); ok {
|
||||||
|
if !containerStatus.Ready {
|
||||||
|
unreadyContainers = append(unreadyContainers, container.Name)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
unknownContainers = append(unknownContainers, container.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If all containers are known and succeeded, just return PodCompleted.
|
||||||
|
if podPhase == api.PodSucceeded && len(unknownContainers) == 0 {
|
||||||
|
return api.PodCondition{
|
||||||
|
Type: api.PodReady,
|
||||||
|
Status: api.ConditionFalse,
|
||||||
|
Reason: "PodCompleted",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unreadyMessages := []string{}
|
||||||
|
if len(unknownContainers) > 0 {
|
||||||
|
unreadyMessages = append(unreadyMessages, fmt.Sprintf("containers with unknown status: %s", unknownContainers))
|
||||||
|
}
|
||||||
|
if len(unreadyContainers) > 0 {
|
||||||
|
unreadyMessages = append(unreadyMessages, fmt.Sprintf("containers with unready status: %s", unreadyContainers))
|
||||||
|
}
|
||||||
|
unreadyMessage := strings.Join(unreadyMessages, ", ")
|
||||||
|
if unreadyMessage != "" {
|
||||||
|
return api.PodCondition{
|
||||||
|
Type: api.PodReady,
|
||||||
|
Status: api.ConditionFalse,
|
||||||
|
Reason: "ContainersNotReady",
|
||||||
|
Message: unreadyMessage,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return api.PodCondition{
|
||||||
|
Type: api.PodReady,
|
||||||
|
Status: api.ConditionTrue,
|
||||||
|
}
|
||||||
|
}
|
143
pkg/kubelet/status/generate_test.go
Normal file
143
pkg/kubelet/status/generate_test.go
Normal file
@ -0,0 +1,143 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package status
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGeneratePodReadyCondition(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
spec *api.PodSpec
|
||||||
|
containerStatuses []api.ContainerStatus
|
||||||
|
podPhase api.PodPhase
|
||||||
|
expected api.PodCondition
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
spec: nil,
|
||||||
|
containerStatuses: nil,
|
||||||
|
podPhase: api.PodRunning,
|
||||||
|
expected: getReadyCondition(false, "UnknownContainerStatuses", ""),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
spec: &api.PodSpec{},
|
||||||
|
containerStatuses: []api.ContainerStatus{},
|
||||||
|
podPhase: api.PodRunning,
|
||||||
|
expected: getReadyCondition(true, "", ""),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
spec: &api.PodSpec{
|
||||||
|
Containers: []api.Container{
|
||||||
|
{Name: "1234"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
containerStatuses: []api.ContainerStatus{},
|
||||||
|
podPhase: api.PodRunning,
|
||||||
|
expected: getReadyCondition(false, "ContainersNotReady", "containers with unknown status: [1234]"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
spec: &api.PodSpec{
|
||||||
|
Containers: []api.Container{
|
||||||
|
{Name: "1234"},
|
||||||
|
{Name: "5678"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
containerStatuses: []api.ContainerStatus{
|
||||||
|
getReadyStatus("1234"),
|
||||||
|
getReadyStatus("5678"),
|
||||||
|
},
|
||||||
|
podPhase: api.PodRunning,
|
||||||
|
expected: getReadyCondition(true, "", ""),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
spec: &api.PodSpec{
|
||||||
|
Containers: []api.Container{
|
||||||
|
{Name: "1234"},
|
||||||
|
{Name: "5678"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
containerStatuses: []api.ContainerStatus{
|
||||||
|
getReadyStatus("1234"),
|
||||||
|
},
|
||||||
|
podPhase: api.PodRunning,
|
||||||
|
expected: getReadyCondition(false, "ContainersNotReady", "containers with unknown status: [5678]"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
spec: &api.PodSpec{
|
||||||
|
Containers: []api.Container{
|
||||||
|
{Name: "1234"},
|
||||||
|
{Name: "5678"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
containerStatuses: []api.ContainerStatus{
|
||||||
|
getReadyStatus("1234"),
|
||||||
|
getNotReadyStatus("5678"),
|
||||||
|
},
|
||||||
|
podPhase: api.PodRunning,
|
||||||
|
expected: getReadyCondition(false, "ContainersNotReady", "containers with unready status: [5678]"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
spec: &api.PodSpec{
|
||||||
|
Containers: []api.Container{
|
||||||
|
{Name: "1234"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
containerStatuses: []api.ContainerStatus{
|
||||||
|
getNotReadyStatus("1234"),
|
||||||
|
},
|
||||||
|
podPhase: api.PodSucceeded,
|
||||||
|
expected: getReadyCondition(false, "PodCompleted", ""),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, test := range tests {
|
||||||
|
condition := GeneratePodReadyCondition(test.spec, test.containerStatuses, test.podPhase)
|
||||||
|
if !reflect.DeepEqual(condition, test.expected) {
|
||||||
|
t.Errorf("On test case %v, expected:\n%+v\ngot\n%+v\n", i, test.expected, condition)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getReadyCondition(ready bool, reason, message string) api.PodCondition {
|
||||||
|
status := api.ConditionFalse
|
||||||
|
if ready {
|
||||||
|
status = api.ConditionTrue
|
||||||
|
}
|
||||||
|
return api.PodCondition{
|
||||||
|
Type: api.PodReady,
|
||||||
|
Status: status,
|
||||||
|
Reason: reason,
|
||||||
|
Message: message,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getReadyStatus(cName string) api.ContainerStatus {
|
||||||
|
return api.ContainerStatus{
|
||||||
|
Name: cName,
|
||||||
|
Ready: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getNotReadyStatus(cName string) api.ContainerStatus {
|
||||||
|
return api.ContainerStatus{
|
||||||
|
Name: cName,
|
||||||
|
Ready: false,
|
||||||
|
}
|
||||||
|
}
|
@ -147,8 +147,93 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
|
|||||||
m.podStatusesLock.Lock()
|
m.podStatusesLock.Lock()
|
||||||
defer m.podStatusesLock.Unlock()
|
defer m.podStatusesLock.Unlock()
|
||||||
|
|
||||||
|
m.updateStatusInternal(pod, status)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *manager) SetContainerReadiness(pod *api.Pod, containerID kubecontainer.ContainerID, ready bool) {
|
||||||
|
m.podStatusesLock.Lock()
|
||||||
|
defer m.podStatusesLock.Unlock()
|
||||||
|
|
||||||
|
oldStatus, found := m.podStatuses[pod.UID]
|
||||||
|
if !found {
|
||||||
|
glog.Warningf("Container readiness changed before pod has synced: %q - %q",
|
||||||
|
format.Pod(pod), containerID.String())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find the container to update.
|
||||||
|
containerIndex := -1
|
||||||
|
for i, c := range oldStatus.status.ContainerStatuses {
|
||||||
|
if c.ContainerID == containerID.String() {
|
||||||
|
containerIndex = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if containerIndex == -1 {
|
||||||
|
glog.Warningf("Container readiness changed for unknown container: %q - %q",
|
||||||
|
format.Pod(pod), containerID.String())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if oldStatus.status.ContainerStatuses[containerIndex].Ready == ready {
|
||||||
|
glog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready,
|
||||||
|
format.Pod(pod), containerID.String())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure we're not updating the cached version.
|
||||||
|
clone, err := api.Scheme.DeepCopy(&oldStatus.status)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to clone status %+v: %v", oldStatus.status, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
status := *clone.(*api.PodStatus)
|
||||||
|
status.ContainerStatuses[containerIndex].Ready = ready
|
||||||
|
|
||||||
|
// Update pod condition.
|
||||||
|
readyConditionIndex := -1
|
||||||
|
for i, condition := range status.Conditions {
|
||||||
|
if condition.Type == api.PodReady {
|
||||||
|
readyConditionIndex = i
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
readyCondition := GeneratePodReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase)
|
||||||
|
if readyConditionIndex != -1 {
|
||||||
|
status.Conditions[readyConditionIndex] = readyCondition
|
||||||
|
} else {
|
||||||
|
glog.Warningf("PodStatus missing PodReady condition: %+v", status)
|
||||||
|
status.Conditions = append(status.Conditions, readyCondition)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.updateStatusInternal(pod, status)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *manager) TerminatePods(pods []*api.Pod) bool {
|
||||||
|
allSent := true
|
||||||
|
m.podStatusesLock.Lock()
|
||||||
|
defer m.podStatusesLock.Unlock()
|
||||||
|
for _, pod := range pods {
|
||||||
|
for i := range pod.Status.ContainerStatuses {
|
||||||
|
pod.Status.ContainerStatuses[i].State = api.ContainerState{
|
||||||
|
Terminated: &api.ContainerStateTerminated{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if sent := m.updateStatusInternal(pod, pod.Status); !sent {
|
||||||
|
glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", format.Pod(pod))
|
||||||
|
allSent = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return allSent
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateStatusInternal updates the internal status cache, and queues an update to the api server if
|
||||||
|
// necessary. Returns whether an update was triggered.
|
||||||
|
// This method IS NOT THREAD SAFE and must be called from a locked function.
|
||||||
|
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool {
|
||||||
var oldStatus api.PodStatus
|
var oldStatus api.PodStatus
|
||||||
if cachedStatus, ok := m.podStatuses[pod.UID]; ok {
|
cachedStatus, isCached := m.podStatuses[pod.UID]
|
||||||
|
if isCached {
|
||||||
oldStatus = cachedStatus.status
|
oldStatus = cachedStatus.status
|
||||||
} else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
|
} else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
|
||||||
oldStatus = mirrorPod.Status
|
oldStatus = mirrorPod.Status
|
||||||
@ -176,81 +261,16 @@ func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
|
|||||||
status.StartTime = &now
|
status.StartTime = &now
|
||||||
}
|
}
|
||||||
|
|
||||||
m.updateStatusInternal(pod, status)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *manager) SetContainerReadiness(pod *api.Pod, containerID kubecontainer.ContainerID, ready bool) {
|
|
||||||
m.podStatusesLock.Lock()
|
|
||||||
defer m.podStatusesLock.Unlock()
|
|
||||||
|
|
||||||
oldStatus, found := m.podStatuses[pod.UID]
|
|
||||||
if !found {
|
|
||||||
glog.Warningf("Container readiness changed before pod has synced: %q - %q",
|
|
||||||
format.Pod(pod), containerID.String())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
status := oldStatus.status
|
|
||||||
|
|
||||||
// Find the container to update.
|
|
||||||
containerIndex := -1
|
|
||||||
for i, c := range status.ContainerStatuses {
|
|
||||||
if c.ContainerID == containerID.String() {
|
|
||||||
containerIndex = i
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if containerIndex == -1 {
|
|
||||||
glog.Warningf("Container readiness changed for unknown container: %q - %q",
|
|
||||||
format.Pod(pod), containerID.String())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if status.ContainerStatuses[containerIndex].Ready == ready {
|
|
||||||
glog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready,
|
|
||||||
format.Pod(pod), containerID.String())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Make sure we're not updating the cached version.
|
|
||||||
status.ContainerStatuses = make([]api.ContainerStatus, len(status.ContainerStatuses))
|
|
||||||
copy(status.ContainerStatuses, oldStatus.status.ContainerStatuses)
|
|
||||||
status.ContainerStatuses[containerIndex].Ready = ready
|
|
||||||
m.updateStatusInternal(pod, status)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *manager) TerminatePods(pods []*api.Pod) bool {
|
|
||||||
allSent := true
|
|
||||||
m.podStatusesLock.Lock()
|
|
||||||
defer m.podStatusesLock.Unlock()
|
|
||||||
for _, pod := range pods {
|
|
||||||
for i := range pod.Status.ContainerStatuses {
|
|
||||||
pod.Status.ContainerStatuses[i].State = api.ContainerState{
|
|
||||||
Terminated: &api.ContainerStateTerminated{},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if sent := m.updateStatusInternal(pod, pod.Status); !sent {
|
|
||||||
glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", format.Pod(pod))
|
|
||||||
allSent = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return allSent
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateStatusInternal updates the internal status cache, and queues an update to the api server if
|
|
||||||
// necessary. Returns whether an update was triggered.
|
|
||||||
// This method IS NOT THREAD SAFE and must be called from a locked function.
|
|
||||||
func (m *manager) updateStatusInternal(pod *api.Pod, status api.PodStatus) bool {
|
|
||||||
// The intent here is to prevent concurrent updates to a pod's status from
|
// The intent here is to prevent concurrent updates to a pod's status from
|
||||||
// clobbering each other so the phase of a pod progresses monotonically.
|
// clobbering each other so the phase of a pod progresses monotonically.
|
||||||
oldStatus, found := m.podStatuses[pod.UID]
|
if isCached && isStatusEqual(&cachedStatus.status, &status) && pod.DeletionTimestamp == nil {
|
||||||
if found && isStatusEqual(&oldStatus.status, &status) && pod.DeletionTimestamp == nil {
|
|
||||||
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
|
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
|
||||||
return false // No new status.
|
return false // No new status.
|
||||||
}
|
}
|
||||||
|
|
||||||
newStatus := versionedPodStatus{
|
newStatus := versionedPodStatus{
|
||||||
status: status,
|
status: status,
|
||||||
version: oldStatus.version + 1,
|
version: cachedStatus.version + 1,
|
||||||
podName: pod.Name,
|
podName: pod.Name,
|
||||||
podNamespace: pod.Namespace,
|
podNamespace: pod.Namespace,
|
||||||
}
|
}
|
||||||
@ -361,7 +381,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// We failed to update status, wait for periodic sync to retry.
|
// We failed to update status, wait for periodic sync to retry.
|
||||||
glog.Warningf("Failed to updated status for pod %q: %v", format.Pod(pod), err)
|
glog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// needsUpdate returns whether the status is stale for the given pod UID.
|
// needsUpdate returns whether the status is stale for the given pod UID.
|
||||||
|
@ -102,7 +102,7 @@ func TestNewStatus(t *testing.T) {
|
|||||||
syncer.SetPodStatus(testPod, getRandomPodStatus())
|
syncer.SetPodStatus(testPod, getRandomPodStatus())
|
||||||
verifyUpdates(t, syncer, 1)
|
verifyUpdates(t, syncer, 1)
|
||||||
|
|
||||||
status, _ := syncer.GetPodStatus(testPod.UID)
|
status := expectPodStatus(t, syncer, testPod)
|
||||||
if status.StartTime.IsZero() {
|
if status.StartTime.IsZero() {
|
||||||
t.Errorf("SetPodStatus did not set a proper start time value")
|
t.Errorf("SetPodStatus did not set a proper start time value")
|
||||||
}
|
}
|
||||||
@ -123,7 +123,7 @@ func TestNewStatusPreservesPodStartTime(t *testing.T) {
|
|||||||
pod.Status.StartTime = &startTime
|
pod.Status.StartTime = &startTime
|
||||||
syncer.SetPodStatus(pod, getRandomPodStatus())
|
syncer.SetPodStatus(pod, getRandomPodStatus())
|
||||||
|
|
||||||
status, _ := syncer.GetPodStatus(pod.UID)
|
status := expectPodStatus(t, syncer, pod)
|
||||||
if !status.StartTime.Time.Equal(startTime.Time) {
|
if !status.StartTime.Time.Equal(startTime.Time) {
|
||||||
t.Errorf("Unexpected start time, expected %v, actual %v", startTime, status.StartTime)
|
t.Errorf("Unexpected start time, expected %v, actual %v", startTime, status.StartTime)
|
||||||
}
|
}
|
||||||
@ -153,7 +153,7 @@ func TestNewStatusSetsReadyTransitionTime(t *testing.T) {
|
|||||||
}
|
}
|
||||||
syncer.SetPodStatus(pod, podStatus)
|
syncer.SetPodStatus(pod, podStatus)
|
||||||
verifyUpdates(t, syncer, 1)
|
verifyUpdates(t, syncer, 1)
|
||||||
status, _ := syncer.GetPodStatus(pod.UID)
|
status := expectPodStatus(t, syncer, pod)
|
||||||
readyCondition := api.GetPodReadyCondition(status)
|
readyCondition := api.GetPodReadyCondition(status)
|
||||||
if readyCondition.LastTransitionTime.IsZero() {
|
if readyCondition.LastTransitionTime.IsZero() {
|
||||||
t.Errorf("Unexpected: last transition time not set")
|
t.Errorf("Unexpected: last transition time not set")
|
||||||
@ -175,7 +175,7 @@ func TestChangedStatusKeepsStartTime(t *testing.T) {
|
|||||||
syncer.SetPodStatus(testPod, firstStatus)
|
syncer.SetPodStatus(testPod, firstStatus)
|
||||||
syncer.SetPodStatus(testPod, getRandomPodStatus())
|
syncer.SetPodStatus(testPod, getRandomPodStatus())
|
||||||
verifyUpdates(t, syncer, 2)
|
verifyUpdates(t, syncer, 2)
|
||||||
finalStatus, _ := syncer.GetPodStatus(testPod.UID)
|
finalStatus := expectPodStatus(t, syncer, testPod)
|
||||||
if finalStatus.StartTime.IsZero() {
|
if finalStatus.StartTime.IsZero() {
|
||||||
t.Errorf("StartTime should not be zero")
|
t.Errorf("StartTime should not be zero")
|
||||||
}
|
}
|
||||||
@ -197,12 +197,12 @@ func TestChangedStatusUpdatesLastTransitionTime(t *testing.T) {
|
|||||||
}
|
}
|
||||||
syncer.SetPodStatus(pod, podStatus)
|
syncer.SetPodStatus(pod, podStatus)
|
||||||
verifyUpdates(t, syncer, 1)
|
verifyUpdates(t, syncer, 1)
|
||||||
oldStatus, _ := syncer.GetPodStatus(pod.UID)
|
oldStatus := expectPodStatus(t, syncer, pod)
|
||||||
anotherStatus := getReadyPodStatus()
|
anotherStatus := getReadyPodStatus()
|
||||||
anotherStatus.Conditions[0].Status = api.ConditionFalse
|
anotherStatus.Conditions[0].Status = api.ConditionFalse
|
||||||
syncer.SetPodStatus(pod, anotherStatus)
|
syncer.SetPodStatus(pod, anotherStatus)
|
||||||
verifyUpdates(t, syncer, 1)
|
verifyUpdates(t, syncer, 1)
|
||||||
newStatus, _ := syncer.GetPodStatus(pod.UID)
|
newStatus := expectPodStatus(t, syncer, pod)
|
||||||
|
|
||||||
oldReadyCondition := api.GetPodReadyCondition(oldStatus)
|
oldReadyCondition := api.GetPodReadyCondition(oldStatus)
|
||||||
newReadyCondition := api.GetPodReadyCondition(newStatus)
|
newReadyCondition := api.GetPodReadyCondition(newStatus)
|
||||||
@ -235,12 +235,12 @@ func TestUnchangedStatusPreservesLastTransitionTime(t *testing.T) {
|
|||||||
}
|
}
|
||||||
syncer.SetPodStatus(pod, podStatus)
|
syncer.SetPodStatus(pod, podStatus)
|
||||||
verifyUpdates(t, syncer, 1)
|
verifyUpdates(t, syncer, 1)
|
||||||
oldStatus, _ := syncer.GetPodStatus(pod.UID)
|
oldStatus := expectPodStatus(t, syncer, pod)
|
||||||
anotherStatus := getReadyPodStatus()
|
anotherStatus := getReadyPodStatus()
|
||||||
syncer.SetPodStatus(pod, anotherStatus)
|
syncer.SetPodStatus(pod, anotherStatus)
|
||||||
// No update.
|
// No update.
|
||||||
verifyUpdates(t, syncer, 0)
|
verifyUpdates(t, syncer, 0)
|
||||||
newStatus, _ := syncer.GetPodStatus(pod.UID)
|
newStatus := expectPodStatus(t, syncer, pod)
|
||||||
|
|
||||||
oldReadyCondition := api.GetPodReadyCondition(oldStatus)
|
oldReadyCondition := api.GetPodReadyCondition(oldStatus)
|
||||||
newReadyCondition := api.GetPodReadyCondition(newStatus)
|
newReadyCondition := api.GetPodReadyCondition(newStatus)
|
||||||
@ -468,7 +468,7 @@ func TestStaticPodStatus(t *testing.T) {
|
|||||||
status.StartTime = &now
|
status.StartTime = &now
|
||||||
|
|
||||||
m.SetPodStatus(&staticPod, status)
|
m.SetPodStatus(&staticPod, status)
|
||||||
retrievedStatus, _ := m.GetPodStatus(staticPod.UID)
|
retrievedStatus := expectPodStatus(t, m, &staticPod)
|
||||||
assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
|
assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
|
||||||
retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID)
|
retrievedStatus, _ = m.GetPodStatus(mirrorPod.UID)
|
||||||
assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
|
assert.True(t, isStatusEqual(&status, &retrievedStatus), "Expected: %+v, Got: %+v", status, retrievedStatus)
|
||||||
@ -506,37 +506,91 @@ func TestStaticPodStatus(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestSetContainerReadiness(t *testing.T) {
|
func TestSetContainerReadiness(t *testing.T) {
|
||||||
containerID := kubecontainer.ContainerID{"test", "cOnTaInEr_Id"}
|
cID1 := kubecontainer.ContainerID{"test", "1"}
|
||||||
containerStatus := api.ContainerStatus{
|
cID2 := kubecontainer.ContainerID{"test", "2"}
|
||||||
Name: "cOnTaInEr_NaMe",
|
containerStatuses := []api.ContainerStatus{
|
||||||
ContainerID: containerID.String(),
|
{
|
||||||
Ready: false,
|
Name: "c1",
|
||||||
|
ContainerID: cID1.String(),
|
||||||
|
Ready: false,
|
||||||
|
}, {
|
||||||
|
Name: "c2",
|
||||||
|
ContainerID: cID2.String(),
|
||||||
|
Ready: false,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
status := api.PodStatus{
|
status := api.PodStatus{
|
||||||
ContainerStatuses: []api.ContainerStatus{containerStatus},
|
ContainerStatuses: containerStatuses,
|
||||||
|
Conditions: []api.PodCondition{{
|
||||||
|
Type: api.PodReady,
|
||||||
|
Status: api.ConditionFalse,
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
pod := new(api.Pod)
|
||||||
|
*pod = *testPod
|
||||||
|
pod.Spec.Containers = []api.Container{{Name: "c1"}, {Name: "c2"}}
|
||||||
|
|
||||||
|
// Verify expected readiness of containers & pod.
|
||||||
|
verifyReadiness := func(step string, status *api.PodStatus, c1Ready, c2Ready, podReady bool) {
|
||||||
|
for _, c := range status.ContainerStatuses {
|
||||||
|
switch c.ContainerID {
|
||||||
|
case cID1.String():
|
||||||
|
if c.Ready != c1Ready {
|
||||||
|
t.Errorf("[%s] Expected readiness of c1 to be %v but was %v", step, c1Ready, c.Ready)
|
||||||
|
}
|
||||||
|
case cID2.String():
|
||||||
|
if c.Ready != c2Ready {
|
||||||
|
t.Errorf("[%s] Expected readiness of c2 to be %v but was %v", step, c2Ready, c.Ready)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
t.Fatalf("[%s] Unexpected container: %+v", step, c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if status.Conditions[0].Type != api.PodReady {
|
||||||
|
t.Fatalf("[%s] Unexpected condition: %+v", step, status.Conditions[0])
|
||||||
|
} else if ready := (status.Conditions[0].Status == api.ConditionTrue); ready != podReady {
|
||||||
|
t.Errorf("[%s] Expected readiness of pod to be %v but was %v", step, podReady, ready)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
m := newTestManager(&testclient.Fake{})
|
m := newTestManager(&testclient.Fake{})
|
||||||
|
|
||||||
t.Log("Setting readiness before status should fail.")
|
t.Log("Setting readiness before status should fail.")
|
||||||
m.SetContainerReadiness(testPod, containerID, true)
|
m.SetContainerReadiness(pod, cID1, true)
|
||||||
verifyUpdates(t, m, 0)
|
verifyUpdates(t, m, 0)
|
||||||
|
if status, ok := m.GetPodStatus(pod.UID); ok {
|
||||||
|
t.Errorf("Unexpected PodStatus: %+v", status)
|
||||||
|
}
|
||||||
|
|
||||||
t.Log("Setting initial status.")
|
t.Log("Setting initial status.")
|
||||||
m.SetPodStatus(testPod, status)
|
m.SetPodStatus(pod, status)
|
||||||
verifyUpdates(t, m, 1)
|
verifyUpdates(t, m, 1)
|
||||||
|
status = expectPodStatus(t, m, pod)
|
||||||
|
verifyReadiness("initial", &status, false, false, false)
|
||||||
|
|
||||||
t.Log("Setting unchanged readiness should do nothing.")
|
t.Log("Setting unchanged readiness should do nothing.")
|
||||||
m.SetContainerReadiness(testPod, containerID, false)
|
m.SetContainerReadiness(pod, cID1, false)
|
||||||
verifyUpdates(t, m, 0)
|
verifyUpdates(t, m, 0)
|
||||||
|
status = expectPodStatus(t, m, pod)
|
||||||
|
verifyReadiness("unchanged", &status, false, false, false)
|
||||||
|
|
||||||
t.Log("Setting different readiness should generate update.")
|
t.Log("Setting container readiness should generate update but not pod readiness.")
|
||||||
m.SetContainerReadiness(testPod, containerID, true)
|
m.SetContainerReadiness(pod, cID1, true)
|
||||||
verifyUpdates(t, m, 1)
|
verifyUpdates(t, m, 1)
|
||||||
|
status = expectPodStatus(t, m, pod)
|
||||||
|
verifyReadiness("c1 ready", &status, true, false, false)
|
||||||
|
|
||||||
|
t.Log("Setting both containers to ready should update pod readiness.")
|
||||||
|
m.SetContainerReadiness(pod, cID2, true)
|
||||||
|
verifyUpdates(t, m, 1)
|
||||||
|
status = expectPodStatus(t, m, pod)
|
||||||
|
verifyReadiness("all ready", &status, true, true, true)
|
||||||
|
|
||||||
t.Log("Setting non-existant container readiness should fail.")
|
t.Log("Setting non-existant container readiness should fail.")
|
||||||
m.SetContainerReadiness(testPod, kubecontainer.ContainerID{"test", "foo"}, true)
|
m.SetContainerReadiness(pod, kubecontainer.ContainerID{"test", "foo"}, true)
|
||||||
verifyUpdates(t, m, 0)
|
verifyUpdates(t, m, 0)
|
||||||
|
status = expectPodStatus(t, m, pod)
|
||||||
|
verifyReadiness("ignore non-existant", &status, true, true, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSyncBatchCleanupVersions(t *testing.T) {
|
func TestSyncBatchCleanupVersions(t *testing.T) {
|
||||||
@ -577,3 +631,11 @@ func TestSyncBatchCleanupVersions(t *testing.T) {
|
|||||||
t.Errorf("Should not have cleared status for mirrorPod")
|
t.Errorf("Should not have cleared status for mirrorPod")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func expectPodStatus(t *testing.T, m *manager, pod *api.Pod) api.PodStatus {
|
||||||
|
status, ok := m.GetPodStatus(pod.UID)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Expected PodStatus for %q not found", pod.UID)
|
||||||
|
}
|
||||||
|
return status
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user