Add multi-subscriber support to GetContainerEvents CRI API

This commit addresses issue #7318 by introducing events broadcasting
to the current implementation. The integration/container_event_test.go
is extended to demonstrate the broadcasting capabilities
of two simultaneous connected clients.

Signed-off-by: Yury Gargay <yury.gargay@gmail.com>
This commit is contained in:
Yury Gargay 2023-08-03 18:41:58 +00:00 committed by Derek McGowan
parent e7eb08eb56
commit 2e8e03389c
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
2 changed files with 47 additions and 29 deletions

View File

@ -36,14 +36,21 @@ func TestContainerEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
t.Log("Set up container events streaming client")
containerEventsStreamingClient, err := runtimeService.GetContainerEvents(ctx, &runtime.GetEventsRequest{})
t.Log("Set up container events streaming clients")
containerEventsStreamingClient1, err := runtimeService.GetContainerEvents(ctx, &runtime.GetEventsRequest{})
assert.NoError(t, err)
containerEventsChan := make(chan *runtime.ContainerEventResponse)
containerEventsStreamingClient2, err := runtimeService.GetContainerEvents(ctx, &runtime.GetEventsRequest{})
assert.NoError(t, err)
containerEventsChan1 := make(chan *runtime.ContainerEventResponse)
containerEventsChan2 := make(chan *runtime.ContainerEventResponse)
go listenToEventChannel(ctx, t, containerEventsChan, containerEventsStreamingClient)
go listenToEventChannel(ctx, t, containerEventsChan1, containerEventsStreamingClient1)
go listenToEventChannel(ctx, t, containerEventsChan2, containerEventsStreamingClient2)
// drain all events emitted by previous tests.
drainContainerEventsChan(containerEventsChan)
drainContainerEventsChan(containerEventsChan1)
drainContainerEventsChan(containerEventsChan2)
containerEventsChannels := []chan *runtime.ContainerEventResponse{containerEventsChan1, containerEventsChan2}
t.Logf("Step 1: RunPodSandbox and check for expected events")
sandboxName := "container_events_sandbox"
@ -56,19 +63,19 @@ func TestContainerEvents(t *testing.T) {
expectedPodSandboxStatus := &runtime.PodSandboxStatus{State: runtime.PodSandboxState_SANDBOX_NOTREADY}
t.Logf("Step 6: StopPodSandbox and check events")
assert.NoError(t, runtimeService.StopPodSandbox(sb))
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
t.Logf("Step 7: RemovePodSandbox and check events")
assert.NoError(t, runtimeService.RemovePodSandbox(sb))
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, nil, expectedContainerStates)
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, nil, expectedContainerStates)
})
// PodSandbox ready, container state list empty
expectedPodSandboxStatus := &runtime.PodSandboxStatus{State: runtime.PodSandboxState_SANDBOX_READY}
expectedContainerStates := []runtime.ContainerState{}
// PodSandbox created. Check for start event for podsandbox container. Should be zero containers in the podsandbox
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
// PodSandbox started. Check for start event for podsandbox container. Should be zero containers in the podsandbox
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
t.Logf("Step 2: CreateContainer and check events")
pauseImage := images.Get(images.Pause)
@ -82,26 +89,26 @@ func TestContainerEvents(t *testing.T) {
cn, err := runtimeService.CreateContainer(sb, containerConfig, sbConfig)
require.NoError(t, err)
expectedContainerStates = []runtime.ContainerState{runtime.ContainerState_CONTAINER_CREATED}
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
t.Cleanup(func() {
t.Logf("Step 5: RemoveContainer and check events")
assert.NoError(t, runtimeService.RemoveContainer(cn))
// No container status after the container is removed
expectedContainerStates := []runtime.ContainerState{}
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
})
t.Logf("Step 3: StartContainer and check events")
require.NoError(t, runtimeService.StartContainer(cn))
expectedContainerStates = []runtime.ContainerState{runtime.ContainerState_CONTAINER_RUNNING}
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
t.Cleanup(func() {
t.Logf("Step 4: StopContainer and check events")
assert.NoError(t, runtimeService.StopContainer(cn, 10))
expectedContainerStates := []runtime.ContainerState{runtime.ContainerState_CONTAINER_EXITED}
checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates)
})
}
@ -133,11 +140,12 @@ func drainContainerEventsChan(containerEventsChan chan *runtime.ContainerEventRe
}
}
func checkContainerEventResponse(t *testing.T, containerEventsChan chan *runtime.ContainerEventResponse, expectedType runtime.ContainerEventType, expectedPodSandboxStatus *runtime.PodSandboxStatus, expectedContainerStates []runtime.ContainerState) {
func checkContainerEventResponse(t *testing.T, containerEventsChans []chan *runtime.ContainerEventResponse, expectedType runtime.ContainerEventType, expectedPodSandboxStatus *runtime.PodSandboxStatus, expectedContainerStates []runtime.ContainerState) {
t.Helper()
for _, ch := range containerEventsChans {
var resp *runtime.ContainerEventResponse
select {
case resp = <-containerEventsChan:
case resp = <-ch:
case <-time.After(readContainerEventChannelTimeout):
t.Error("assertContainerEventResponse: timeout waiting for events from channel")
}
@ -154,3 +162,4 @@ func checkContainerEventResponse(t *testing.T, containerEventsChan chan *runtime
assert.Equal(t, expectedContainerStates[i], cs.State)
}
}
}

View File

@ -63,6 +63,7 @@ const (
var (
runtimeService cri.RuntimeService
runtimeService2 cri.RuntimeService // to test GetContainerEvents broadcast
imageService cri.ImageManagerService
containerdClient *containerd.Client
containerdEndpoint string
@ -87,6 +88,10 @@ func ConnectDaemons() error {
if err != nil {
return fmt.Errorf("failed to create runtime service: %w", err)
}
runtimeService2, err = remote.NewRuntimeService(*criEndpoint, timeout)
if err != nil {
return fmt.Errorf("failed to create runtime service: %w", err)
}
imageService, err = remote.NewImageService(*criEndpoint, timeout)
if err != nil {
return fmt.Errorf("failed to create image service: %w", err)
@ -98,6 +103,10 @@ func ConnectDaemons() error {
if err != nil {
return fmt.Errorf("failed to list containers: %w", err)
}
_, err = runtimeService2.ListContainers(&runtime.ContainerFilter{})
if err != nil {
return fmt.Errorf("failed to list containers: %w", err)
}
_, err = imageService.ListImages(&runtime.ImageFilter{})
if err != nil {
return fmt.Errorf("failed to list images: %w", err)