containerd/integration/container_event_test.go
Yury Gargay 2e8e03389c
Add multi-subscriber support to GetContainerEvents CRI API
This commit addresses issue #7318 by introducing events broadcasting
to the current implementation. The integration/container_event_test.go
is extended to demonstrate the broadcasting capabilities
of two simultaneous connected clients.

Signed-off-by: Yury Gargay <yury.gargay@gmail.com>
2024-01-19 21:27:06 -08:00

166 lines
7.0 KiB
Go

/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package integration
import (
"context"
"testing"
"time"
"github.com/containerd/containerd/v2/integration/images"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
const (
drainContainerEventChannelTimeout = 2 * time.Second
readContainerEventChannelTimeout = 10 * time.Second
)
func TestContainerEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
t.Log("Set up container events streaming clients")
containerEventsStreamingClient1, err := runtimeService.GetContainerEvents(ctx, &runtime.GetEventsRequest{})
assert.NoError(t, err)
containerEventsStreamingClient2, err := runtimeService.GetContainerEvents(ctx, &runtime.GetEventsRequest{})
assert.NoError(t, err)
containerEventsChan1 := make(chan *runtime.ContainerEventResponse)
containerEventsChan2 := make(chan *runtime.ContainerEventResponse)
go listenToEventChannel(ctx, t, containerEventsChan1, containerEventsStreamingClient1)
go listenToEventChannel(ctx, t, containerEventsChan2, containerEventsStreamingClient2)
// drain all events emitted by previous tests.
drainContainerEventsChan(containerEventsChan1)
drainContainerEventsChan(containerEventsChan2)
containerEventsChannels := []chan *runtime.ContainerEventResponse{containerEventsChan1, containerEventsChan2}
t.Logf("Step 1: RunPodSandbox and check for expected events")
sandboxName := "container_events_sandbox"
sbConfig := PodSandboxConfig(sandboxName, "container_events")
sb, err := runtimeService.RunPodSandbox(sbConfig, *runtimeHandler)
require.NoError(t, err)
t.Cleanup(func() {
expectedContainerStates := []runtime.ContainerState{}
expectedPodSandboxStatus := &runtime.PodSandboxStatus{State: runtime.PodSandboxState_SANDBOX_NOTREADY}
t.Logf("Step 6: StopPodSandbox and check events")
assert.NoError(t, runtimeService.StopPodSandbox(sb))
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
t.Logf("Step 7: RemovePodSandbox and check events")
assert.NoError(t, runtimeService.RemovePodSandbox(sb))
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, nil, expectedContainerStates)
})
// PodSandbox ready, container state list empty
expectedPodSandboxStatus := &runtime.PodSandboxStatus{State: runtime.PodSandboxState_SANDBOX_READY}
expectedContainerStates := []runtime.ContainerState{}
// PodSandbox created. Check for start event for podsandbox container. Should be zero containers in the podsandbox
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
// PodSandbox started. Check for start event for podsandbox container. Should be zero containers in the podsandbox
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
t.Logf("Step 2: CreateContainer and check events")
pauseImage := images.Get(images.Pause)
EnsureImageExists(t, pauseImage)
containerConfig := ContainerConfig(
"container1",
pauseImage,
WithTestLabels(),
WithTestAnnotations(),
)
cn, err := runtimeService.CreateContainer(sb, containerConfig, sbConfig)
require.NoError(t, err)
expectedContainerStates = []runtime.ContainerState{runtime.ContainerState_CONTAINER_CREATED}
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
t.Cleanup(func() {
t.Logf("Step 5: RemoveContainer and check events")
assert.NoError(t, runtimeService.RemoveContainer(cn))
// No container status after the container is removed
expectedContainerStates := []runtime.ContainerState{}
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
})
t.Logf("Step 3: StartContainer and check events")
require.NoError(t, runtimeService.StartContainer(cn))
expectedContainerStates = []runtime.ContainerState{runtime.ContainerState_CONTAINER_RUNNING}
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
t.Cleanup(func() {
t.Logf("Step 4: StopContainer and check events")
assert.NoError(t, runtimeService.StopContainer(cn, 10))
expectedContainerStates := []runtime.ContainerState{runtime.ContainerState_CONTAINER_EXITED}
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
})
}
func listenToEventChannel(ctx context.Context, t *testing.T, containerEventsChan chan *runtime.ContainerEventResponse, containerEventsStreamingClient runtime.RuntimeService_GetContainerEventsClient) {
t.Helper()
for {
resp, err := containerEventsStreamingClient.Recv()
// Early return if context is canceled, in which case the error will be
// context canceled
select {
case <-ctx.Done():
return
default:
}
assert.NoError(t, err)
if resp != nil {
containerEventsChan <- resp
}
}
}
func drainContainerEventsChan(containerEventsChan chan *runtime.ContainerEventResponse) {
for {
select {
case <-containerEventsChan:
case <-time.After(drainContainerEventChannelTimeout):
return
}
}
}
func checkContainerEventResponse(t *testing.T, containerEventsChans []chan *runtime.ContainerEventResponse, expectedType runtime.ContainerEventType, expectedPodSandboxStatus *runtime.PodSandboxStatus, expectedContainerStates []runtime.ContainerState) {
t.Helper()
for _, ch := range containerEventsChans {
var resp *runtime.ContainerEventResponse
select {
case resp = <-ch:
case <-time.After(readContainerEventChannelTimeout):
t.Error("assertContainerEventResponse: timeout waiting for events from channel")
}
t.Logf("Container Event response received: %+v", *resp)
assert.Equal(t, expectedType, resp.ContainerEventType)
// Checking only the State field of PodSandboxStatus
if expectedPodSandboxStatus != nil {
assert.Equal(t, expectedPodSandboxStatus.State, resp.PodSandboxStatus.State)
}
// Checking only the State field of ContainersStatus
for i, cs := range resp.ContainersStatuses {
assert.Equal(t, expectedContainerStates[i], cs.State)
}
}
}