Merge pull request #31395 from yujuhong/getpods
Automatic merge from submit-queue Instruct PLEG to detect pod sandbox state changes This PR adds a Sandboxes list in `kubecontainer.Pod`, so that PLEG can check sandbox changes using `GetPods()` . The sandboxes are treated as regular containers (type `kubecontainer.Container`) for now to avoid additional changes in PLEG. /cc @feiskyer @yifan-gu @euank
This commit is contained in:
commit
b2d02bd1ab
@ -154,6 +154,11 @@ type Pod struct {
|
||||
// List of containers that belongs to this pod. It may contain only
|
||||
// running containers, or mixed with dead ones (when GetPods(true)).
|
||||
Containers []*Container
|
||||
// List of sandboxes associated with this pod. The sandboxes are converted
|
||||
// to Container temporariliy to avoid substantial changes to other
|
||||
// components. This is only populated by kuberuntime.
|
||||
// TODO: use the runtimeApi.PodSandbox type directly.
|
||||
Sandboxes []*Container
|
||||
}
|
||||
|
||||
// PodPair contains both runtime#Pod and api#Pod
|
||||
@ -463,6 +468,15 @@ func (p *Pod) FindContainerByID(id ContainerID) *Container {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Pod) FindSandboxByID(id ContainerID) *Container {
|
||||
for _, c := range p.Sandboxes {
|
||||
if c.ID == id {
|
||||
return c
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ToAPIPod converts Pod to api.Pod. Note that if a field in api.Pod has no
|
||||
// corresponding field in Pod, the field would not be populated.
|
||||
func (p *Pod) ToAPIPod() *api.Pod {
|
||||
|
@ -77,6 +77,21 @@ func toKubeContainerState(state runtimeApi.ContainerState) kubecontainer.Contain
|
||||
return kubecontainer.ContainerStateUnknown
|
||||
}
|
||||
|
||||
// sandboxToKubeContainerState converts runtimeApi.PodSandboxState to
|
||||
// kubecontainer.ContainerState.
|
||||
// This is only needed because we need to return sandboxes as if they were
|
||||
// kubecontainer.Containers to avoid substantial changes to PLEG.
|
||||
// TODO: Remove this once it becomes obsolete.
|
||||
func sandboxToKubeContainerState(state runtimeApi.PodSandBoxState) kubecontainer.ContainerState {
|
||||
switch state {
|
||||
case runtimeApi.PodSandBoxState_READY:
|
||||
return kubecontainer.ContainerStateRunning
|
||||
case runtimeApi.PodSandBoxState_NOTREADY:
|
||||
return kubecontainer.ContainerStateExited
|
||||
}
|
||||
return kubecontainer.ContainerStateUnknown
|
||||
}
|
||||
|
||||
// toRuntimeProtocol converts api.Protocol to runtimeApi.Protocol.
|
||||
func toRuntimeProtocol(protocol api.Protocol) runtimeApi.Protocol {
|
||||
switch protocol {
|
||||
@ -107,6 +122,21 @@ func (m *kubeGenericRuntimeManager) toKubeContainer(c *runtimeApi.Container) (*k
|
||||
}, nil
|
||||
}
|
||||
|
||||
// sandboxToKubeContainer converts runtimeApi.PodSandbox to kubecontainer.Container.
|
||||
// This is only needed because we need to return sandboxes as if they were
|
||||
// kubecontainer.Containers to avoid substantial changes to PLEG.
|
||||
// TODO: Remove this once it becomes obsolete.
|
||||
func (m *kubeGenericRuntimeManager) sandboxToKubeContainer(s *runtimeApi.PodSandbox) (*kubecontainer.Container, error) {
|
||||
if s == nil || s.Id == nil || s.State == nil {
|
||||
return nil, fmt.Errorf("unable to convert a nil pointer to a runtime container")
|
||||
}
|
||||
|
||||
return &kubecontainer.Container{
|
||||
ID: kubecontainer.ContainerID{Type: m.runtimeName, ID: s.GetId()},
|
||||
State: sandboxToKubeContainerState(s.GetState()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// milliCPUToShares converts milliCPU to CPU shares
|
||||
func milliCPUToShares(milliCPU int64) int64 {
|
||||
if milliCPU == 0 {
|
||||
|
@ -230,20 +230,40 @@ func (m *kubeGenericRuntimeManager) GetPods(all bool) ([]*kubecontainer.Pod, err
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, s := range sandboxes {
|
||||
podUID := kubetypes.UID(s.Metadata.GetUid())
|
||||
pods[podUID] = &kubecontainer.Pod{
|
||||
ID: podUID,
|
||||
Name: s.Metadata.GetName(),
|
||||
Namespace: s.Metadata.GetNamespace(),
|
||||
for i := range sandboxes {
|
||||
s := sandboxes[i]
|
||||
if s.Metadata == nil {
|
||||
glog.V(4).Infof("Sandbox does not have metadata: %+v", s)
|
||||
continue
|
||||
}
|
||||
podUID := kubetypes.UID(s.Metadata.GetUid())
|
||||
if _, ok := pods[podUID]; !ok {
|
||||
pods[podUID] = &kubecontainer.Pod{
|
||||
ID: podUID,
|
||||
Name: s.Metadata.GetName(),
|
||||
Namespace: s.Metadata.GetNamespace(),
|
||||
}
|
||||
}
|
||||
p := pods[podUID]
|
||||
converted, err := m.sandboxToKubeContainer(s)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("Convert %q sandbox %v of pod %q failed: %v", m.runtimeName, s, podUID, err)
|
||||
continue
|
||||
}
|
||||
p.Sandboxes = append(p.Sandboxes, converted)
|
||||
}
|
||||
|
||||
containers, err := m.getKubeletContainers(all)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, c := range containers {
|
||||
for i := range containers {
|
||||
c := containers[i]
|
||||
if c.Metadata == nil {
|
||||
glog.V(4).Infof("Container does not have metadata: %+v", c)
|
||||
continue
|
||||
}
|
||||
|
||||
labelledInfo := getContainerInfoFromLabels(c.Labels)
|
||||
pod, found := pods[labelledInfo.PodUID]
|
||||
if !found {
|
||||
@ -257,7 +277,7 @@ func (m *kubeGenericRuntimeManager) GetPods(all bool) ([]*kubecontainer.Pod, err
|
||||
|
||||
converted, err := m.toKubeContainer(c)
|
||||
if err != nil {
|
||||
glog.Warningf("Convert %s container %v of pod %q failed: %v", m.runtimeName, c, labelledInfo.PodUID, err)
|
||||
glog.V(4).Infof("Convert %s container %v of pod %q failed: %v", m.runtimeName, c, labelledInfo.PodUID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -239,7 +239,7 @@ func TestGetPods(t *testing.T) {
|
||||
}
|
||||
|
||||
// Set fake sandbox and fake containers to fakeRuntime.
|
||||
_, fakeContainers, err := makeAndSetFakePod(m, fakeRuntime, pod)
|
||||
fakeSandbox, fakeContainers, err := makeAndSetFakePod(m, fakeRuntime, pod)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Convert the fakeContainers to kubecontainer.Container
|
||||
@ -259,12 +259,25 @@ func TestGetPods(t *testing.T) {
|
||||
}
|
||||
containers[i] = c
|
||||
}
|
||||
// Convert fakeSandbox to kubecontainer.Container
|
||||
sandbox, err := m.sandboxToKubeContainer(&runtimeApi.PodSandbox{
|
||||
Id: fakeSandbox.Id,
|
||||
Metadata: fakeSandbox.Metadata,
|
||||
State: fakeSandbox.State,
|
||||
CreatedAt: fakeSandbox.CreatedAt,
|
||||
Labels: fakeSandbox.Labels,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error %v", err)
|
||||
}
|
||||
|
||||
expected := []*kubecontainer.Pod{
|
||||
{
|
||||
ID: kubetypes.UID("12345678"),
|
||||
Name: "foo",
|
||||
Namespace: "new",
|
||||
Containers: []*kubecontainer.Container{containers[0], containers[1]},
|
||||
Sandboxes: []*kubecontainer.Container{sandbox},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -141,6 +141,7 @@ func generateEvents(podID types.UID, cid string, oldState, newState plegContaine
|
||||
if newState == oldState {
|
||||
return nil
|
||||
}
|
||||
|
||||
glog.V(4).Infof("GenericPLEG: %v/%v: %v -> %v", podID, cid, oldState, newState)
|
||||
switch newState {
|
||||
case plegContainerRunning:
|
||||
@ -294,6 +295,17 @@ func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Containe
|
||||
cidSet.Insert(cid)
|
||||
containers = append(containers, c)
|
||||
}
|
||||
// Update sandboxes as containers
|
||||
// TODO: keep track of sandboxes explicitly.
|
||||
for _, c := range p.Sandboxes {
|
||||
cid := string(c.ID.ID)
|
||||
if cidSet.Has(cid) {
|
||||
continue
|
||||
}
|
||||
cidSet.Insert(cid)
|
||||
containers = append(containers, c)
|
||||
}
|
||||
|
||||
}
|
||||
return containers
|
||||
}
|
||||
@ -345,11 +357,17 @@ func getContainerState(pod *kubecontainer.Pod, cid *kubecontainer.ContainerID) p
|
||||
if pod == nil {
|
||||
return state
|
||||
}
|
||||
container := pod.FindContainerByID(*cid)
|
||||
if container == nil {
|
||||
return state
|
||||
c := pod.FindContainerByID(*cid)
|
||||
if c != nil {
|
||||
return convertState(c.State)
|
||||
}
|
||||
return convertState(container.State)
|
||||
// Search through sandboxes too.
|
||||
c = pod.FindSandboxByID(*cid)
|
||||
if c != nil {
|
||||
return convertState(c.State)
|
||||
}
|
||||
|
||||
return state
|
||||
}
|
||||
|
||||
func (pr podRecords) getOld(id types.UID) *kubecontainer.Pod {
|
||||
|
@ -123,7 +123,7 @@ func TestRelisting(t *testing.T) {
|
||||
actual := getEventsFromChannel(ch)
|
||||
verifyEvents(t, expected, actual)
|
||||
|
||||
// The second relist should not send out any event because no container
|
||||
// The second relist should not send out any event because no container has
|
||||
// changed.
|
||||
pleg.relist()
|
||||
verifyEvents(t, expected, actual)
|
||||
@ -430,3 +430,69 @@ func TestRelistWithReinspection(t *testing.T) {
|
||||
// containers was the same as relist #1, nothing "changed", so there are no new events.
|
||||
assert.Exactly(t, []*PodLifecycleEvent{}, actualEvents)
|
||||
}
|
||||
|
||||
// Test detecting sandbox state changes.
|
||||
func TestRelistingWithSandboxes(t *testing.T) {
|
||||
testPleg := newTestGenericPLEG()
|
||||
pleg, runtime := testPleg.pleg, testPleg.runtime
|
||||
ch := pleg.Watch()
|
||||
// The first relist should send a PodSync event to each pod.
|
||||
runtime.AllPodList = []*containertest.FakePod{
|
||||
{Pod: &kubecontainer.Pod{
|
||||
ID: "1234",
|
||||
Sandboxes: []*kubecontainer.Container{
|
||||
createTestContainer("c1", kubecontainer.ContainerStateExited),
|
||||
createTestContainer("c2", kubecontainer.ContainerStateRunning),
|
||||
createTestContainer("c3", kubecontainer.ContainerStateUnknown),
|
||||
},
|
||||
}},
|
||||
{Pod: &kubecontainer.Pod{
|
||||
ID: "4567",
|
||||
Sandboxes: []*kubecontainer.Container{
|
||||
createTestContainer("c1", kubecontainer.ContainerStateExited),
|
||||
},
|
||||
}},
|
||||
}
|
||||
pleg.relist()
|
||||
// Report every running/exited container if we see them for the first time.
|
||||
expected := []*PodLifecycleEvent{
|
||||
{ID: "1234", Type: ContainerStarted, Data: "c2"},
|
||||
{ID: "4567", Type: ContainerDied, Data: "c1"},
|
||||
{ID: "1234", Type: ContainerDied, Data: "c1"},
|
||||
}
|
||||
actual := getEventsFromChannel(ch)
|
||||
verifyEvents(t, expected, actual)
|
||||
|
||||
// The second relist should not send out any event because no container has
|
||||
// changed.
|
||||
pleg.relist()
|
||||
verifyEvents(t, expected, actual)
|
||||
|
||||
runtime.AllPodList = []*containertest.FakePod{
|
||||
{Pod: &kubecontainer.Pod{
|
||||
ID: "1234",
|
||||
Sandboxes: []*kubecontainer.Container{
|
||||
createTestContainer("c2", kubecontainer.ContainerStateExited),
|
||||
createTestContainer("c3", kubecontainer.ContainerStateRunning),
|
||||
},
|
||||
}},
|
||||
{Pod: &kubecontainer.Pod{
|
||||
ID: "4567",
|
||||
Sandboxes: []*kubecontainer.Container{
|
||||
createTestContainer("c4", kubecontainer.ContainerStateRunning),
|
||||
},
|
||||
}},
|
||||
}
|
||||
pleg.relist()
|
||||
// Only report containers that transitioned to running or exited status.
|
||||
expected = []*PodLifecycleEvent{
|
||||
{ID: "1234", Type: ContainerRemoved, Data: "c1"},
|
||||
{ID: "1234", Type: ContainerDied, Data: "c2"},
|
||||
{ID: "1234", Type: ContainerStarted, Data: "c3"},
|
||||
{ID: "4567", Type: ContainerRemoved, Data: "c1"},
|
||||
{ID: "4567", Type: ContainerStarted, Data: "c4"},
|
||||
}
|
||||
|
||||
actual = getEventsFromChannel(ch)
|
||||
verifyEvents(t, expected, actual)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user