Merge pull request #9661 from dmcgowan/update-cri-container-events-multisub
Add support for multiple subscribers to CRI container events
This commit is contained in:
		| @@ -36,14 +36,21 @@ func TestContainerEvents(t *testing.T) { | ||||
| 	ctx, cancel := context.WithCancel(context.Background()) | ||||
| 	t.Cleanup(cancel) | ||||
|  | ||||
| 	t.Log("Set up container events streaming client") | ||||
| 	containerEventsStreamingClient, err := runtimeService.GetContainerEvents(ctx, &runtime.GetEventsRequest{}) | ||||
| 	t.Log("Set up container events streaming clients") | ||||
| 	containerEventsStreamingClient1, err := runtimeService.GetContainerEvents(ctx, &runtime.GetEventsRequest{}) | ||||
| 	assert.NoError(t, err) | ||||
| 	containerEventsChan := make(chan *runtime.ContainerEventResponse) | ||||
| 	containerEventsStreamingClient2, err := runtimeService.GetContainerEvents(ctx, &runtime.GetEventsRequest{}) | ||||
| 	assert.NoError(t, err) | ||||
| 	containerEventsChan1 := make(chan *runtime.ContainerEventResponse) | ||||
| 	containerEventsChan2 := make(chan *runtime.ContainerEventResponse) | ||||
|  | ||||
| 	go listenToEventChannel(ctx, t, containerEventsChan, containerEventsStreamingClient) | ||||
| 	go listenToEventChannel(ctx, t, containerEventsChan1, containerEventsStreamingClient1) | ||||
| 	go listenToEventChannel(ctx, t, containerEventsChan2, containerEventsStreamingClient2) | ||||
| 	// drain all events emitted by previous tests. | ||||
| 	drainContainerEventsChan(containerEventsChan) | ||||
| 	drainContainerEventsChan(containerEventsChan1) | ||||
| 	drainContainerEventsChan(containerEventsChan2) | ||||
|  | ||||
| 	containerEventsChannels := []chan *runtime.ContainerEventResponse{containerEventsChan1, containerEventsChan2} | ||||
|  | ||||
| 	t.Logf("Step 1: RunPodSandbox and check for expected events") | ||||
| 	sandboxName := "container_events_sandbox" | ||||
| @@ -56,19 +63,19 @@ func TestContainerEvents(t *testing.T) { | ||||
| 		expectedPodSandboxStatus := &runtime.PodSandboxStatus{State: runtime.PodSandboxState_SANDBOX_NOTREADY} | ||||
| 		t.Logf("Step 6: StopPodSandbox and check events") | ||||
| 		assert.NoError(t, runtimeService.StopPodSandbox(sb)) | ||||
| 		checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates) | ||||
| 		checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates) | ||||
| 		t.Logf("Step 7: RemovePodSandbox and check events") | ||||
| 		assert.NoError(t, runtimeService.RemovePodSandbox(sb)) | ||||
| 		checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, nil, expectedContainerStates) | ||||
| 		checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, nil, expectedContainerStates) | ||||
| 	}) | ||||
|  | ||||
| 	// PodSandbox ready, container state list empty | ||||
| 	expectedPodSandboxStatus := &runtime.PodSandboxStatus{State: runtime.PodSandboxState_SANDBOX_READY} | ||||
| 	expectedContainerStates := []runtime.ContainerState{} | ||||
| 	// PodSandbox created. Check for start event for podsandbox container. Should be zero containers in the podsandbox | ||||
| 	checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates) | ||||
| 	checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates) | ||||
| 	// PodSandbox started. Check for start event for podsandbox container. Should be zero containers in the podsandbox | ||||
| 	checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates) | ||||
| 	checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates) | ||||
|  | ||||
| 	t.Logf("Step 2: CreateContainer and check events") | ||||
| 	pauseImage := images.Get(images.Pause) | ||||
| @@ -82,26 +89,26 @@ func TestContainerEvents(t *testing.T) { | ||||
| 	cn, err := runtimeService.CreateContainer(sb, containerConfig, sbConfig) | ||||
| 	require.NoError(t, err) | ||||
| 	expectedContainerStates = []runtime.ContainerState{runtime.ContainerState_CONTAINER_CREATED} | ||||
| 	checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates) | ||||
| 	checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_CREATED_EVENT, expectedPodSandboxStatus, expectedContainerStates) | ||||
|  | ||||
| 	t.Cleanup(func() { | ||||
| 		t.Logf("Step 5: RemoveContainer and check events") | ||||
| 		assert.NoError(t, runtimeService.RemoveContainer(cn)) | ||||
| 		// No container status after the container is removed | ||||
| 		expectedContainerStates := []runtime.ContainerState{} | ||||
| 		checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, expectedPodSandboxStatus, expectedContainerStates) | ||||
| 		checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_DELETED_EVENT, expectedPodSandboxStatus, expectedContainerStates) | ||||
| 	}) | ||||
|  | ||||
| 	t.Logf("Step 3: StartContainer and check events") | ||||
| 	require.NoError(t, runtimeService.StartContainer(cn)) | ||||
| 	expectedContainerStates = []runtime.ContainerState{runtime.ContainerState_CONTAINER_RUNNING} | ||||
| 	checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates) | ||||
| 	checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STARTED_EVENT, expectedPodSandboxStatus, expectedContainerStates) | ||||
|  | ||||
| 	t.Cleanup(func() { | ||||
| 		t.Logf("Step 4: StopContainer and check events") | ||||
| 		assert.NoError(t, runtimeService.StopContainer(cn, 10)) | ||||
| 		expectedContainerStates := []runtime.ContainerState{runtime.ContainerState_CONTAINER_EXITED} | ||||
| 		checkContainerEventResponse(t, containerEventsChan, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates) | ||||
| 		checkContainerEventResponse(t, containerEventsChannels, runtime.ContainerEventType_CONTAINER_STOPPED_EVENT, expectedPodSandboxStatus, expectedContainerStates) | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| @@ -133,24 +140,26 @@ func drainContainerEventsChan(containerEventsChan chan *runtime.ContainerEventRe | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func checkContainerEventResponse(t *testing.T, containerEventsChan chan *runtime.ContainerEventResponse, expectedType runtime.ContainerEventType, expectedPodSandboxStatus *runtime.PodSandboxStatus, expectedContainerStates []runtime.ContainerState) { | ||||
| func checkContainerEventResponse(t *testing.T, containerEventsChans []chan *runtime.ContainerEventResponse, expectedType runtime.ContainerEventType, expectedPodSandboxStatus *runtime.PodSandboxStatus, expectedContainerStates []runtime.ContainerState) { | ||||
| 	t.Helper() | ||||
| 	var resp *runtime.ContainerEventResponse | ||||
| 	select { | ||||
| 	case resp = <-containerEventsChan: | ||||
| 	case <-time.After(readContainerEventChannelTimeout): | ||||
| 		t.Error("assertContainerEventResponse: timeout waiting for events from channel") | ||||
| 	} | ||||
| 	t.Logf("Container Event response received: %+v", *resp) | ||||
| 	assert.Equal(t, expectedType, resp.ContainerEventType) | ||||
| 	for _, ch := range containerEventsChans { | ||||
| 		var resp *runtime.ContainerEventResponse | ||||
| 		select { | ||||
| 		case resp = <-ch: | ||||
| 		case <-time.After(readContainerEventChannelTimeout): | ||||
| 			t.Error("assertContainerEventResponse: timeout waiting for events from channel") | ||||
| 		} | ||||
| 		t.Logf("Container Event response received: %+v", *resp) | ||||
| 		assert.Equal(t, expectedType, resp.ContainerEventType) | ||||
|  | ||||
| 	// Checking only the State field of PodSandboxStatus | ||||
| 	if expectedPodSandboxStatus != nil { | ||||
| 		assert.Equal(t, expectedPodSandboxStatus.State, resp.PodSandboxStatus.State) | ||||
| 	} | ||||
| 		// Checking only the State field of PodSandboxStatus | ||||
| 		if expectedPodSandboxStatus != nil { | ||||
| 			assert.Equal(t, expectedPodSandboxStatus.State, resp.PodSandboxStatus.State) | ||||
| 		} | ||||
|  | ||||
| 	// Checking only the State field of ContainersStatus | ||||
| 	for i, cs := range resp.ContainersStatuses { | ||||
| 		assert.Equal(t, expectedContainerStates[i], cs.State) | ||||
| 		// Checking only the State field of ContainersStatus | ||||
| 		for i, cs := range resp.ContainersStatuses { | ||||
| 			assert.Equal(t, expectedContainerStates[i], cs.State) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -63,6 +63,7 @@ const ( | ||||
|  | ||||
| var ( | ||||
| 	runtimeService     cri.RuntimeService | ||||
| 	runtimeService2    cri.RuntimeService // to test GetContainerEvents broadcast | ||||
| 	imageService       cri.ImageManagerService | ||||
| 	containerdClient   *containerd.Client | ||||
| 	containerdEndpoint string | ||||
| @@ -87,6 +88,10 @@ func ConnectDaemons() error { | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to create runtime service: %w", err) | ||||
| 	} | ||||
| 	runtimeService2, err = remote.NewRuntimeService(*criEndpoint, timeout) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to create runtime service: %w", err) | ||||
| 	} | ||||
| 	imageService, err = remote.NewImageService(*criEndpoint, timeout) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to create image service: %w", err) | ||||
| @@ -98,6 +103,10 @@ func ConnectDaemons() error { | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to list containers: %w", err) | ||||
| 	} | ||||
| 	_, err = runtimeService2.ListContainers(&runtime.ContainerFilter{}) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to list containers: %w", err) | ||||
| 	} | ||||
| 	_, err = imageService.ListImages(&runtime.ImageFilter{}) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to list images: %w", err) | ||||
|   | ||||
							
								
								
									
										163
									
								
								internal/eventq/eventq.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										163
									
								
								internal/eventq/eventq.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,163 @@ | ||||
| /* | ||||
|    Copyright The containerd Authors. | ||||
|  | ||||
|    Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|    you may not use this file except in compliance with the License. | ||||
|    You may obtain a copy of the License at | ||||
|  | ||||
|        http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
|    Unless required by applicable law or agreed to in writing, software | ||||
|    distributed under the License is distributed on an "AS IS" BASIS, | ||||
|    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|    See the License for the specific language governing permissions and | ||||
|    limitations under the License. | ||||
| */ | ||||
|  | ||||
| package eventq | ||||
|  | ||||
| import ( | ||||
| 	"io" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| type EventQueue[T any] struct { | ||||
| 	events      chan<- T | ||||
| 	subscriberC chan<- eventSubscription[T] | ||||
| 	shutdownC   chan struct{} | ||||
| } | ||||
|  | ||||
| type eventSubscription[T any] struct { | ||||
| 	c      chan<- T | ||||
| 	closeC chan struct{} | ||||
| } | ||||
|  | ||||
| func (sub eventSubscription[T]) publish(event T) bool { | ||||
| 	select { | ||||
| 	case <-sub.closeC: | ||||
| 		return false | ||||
| 	case sub.c <- event: | ||||
| 		return true | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (sub eventSubscription[T]) Close() error { | ||||
| 	select { | ||||
| 	case <-sub.closeC: | ||||
| 	default: | ||||
| 		close(sub.closeC) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // New provides a queue for sending messages to one or more | ||||
| // subscribers. Messages are held for the given discardAfter duration | ||||
| // if there are no subscribers. | ||||
| func New[T any](discardAfter time.Duration, discardFn func(T)) EventQueue[T] { | ||||
| 	events := make(chan T) | ||||
| 	subscriberC := make(chan eventSubscription[T]) | ||||
| 	shutdownC := make(chan struct{}) | ||||
|  | ||||
| 	go func() { | ||||
| 		type queuedEvent struct { | ||||
| 			event     T | ||||
| 			discardAt time.Time | ||||
| 		} | ||||
|  | ||||
| 		var discardQueue []queuedEvent | ||||
| 		var discardTime <-chan time.Time | ||||
| 		var subscribers []eventSubscription[T] | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-shutdownC: | ||||
| 				for _, event := range discardQueue { | ||||
| 					discardFn(event.event) | ||||
| 				} | ||||
| 				for _, sub := range subscribers { | ||||
| 					close(sub.c) | ||||
| 				} | ||||
| 				return | ||||
| 			case event := <-events: | ||||
| 				if len(subscribers) > 0 { | ||||
| 					active := subscribers[:0] | ||||
| 					for _, sub := range subscribers { | ||||
| 						if sub.publish(event) { | ||||
| 							active = append(active, sub) | ||||
| 						} | ||||
| 					} | ||||
| 					subscribers = active | ||||
| 				} | ||||
| 				if len(subscribers) == 0 { | ||||
| 					discardQueue = append(discardQueue, queuedEvent{ | ||||
| 						event:     event, | ||||
| 						discardAt: time.Now().Add(discardAfter), | ||||
| 					}) | ||||
| 					if discardTime == nil { | ||||
| 						discardTime = time.After(time.Until(discardQueue[0].discardAt).Abs()) | ||||
| 					} | ||||
| 				} | ||||
| 			case s := <-subscriberC: | ||||
| 				var closed bool | ||||
| 				for i, event := range discardQueue { | ||||
| 					if !s.publish(event.event) { | ||||
| 						discardQueue = discardQueue[i:] | ||||
| 						closed = true | ||||
| 						break | ||||
| 					} | ||||
| 				} | ||||
| 				if !closed { | ||||
| 					discardQueue = nil | ||||
| 					discardTime = nil | ||||
| 					subscribers = append(subscribers, s) | ||||
| 				} | ||||
| 			case t := <-discardTime: | ||||
| 				toDiscard := discardQueue | ||||
| 				discardQueue = nil | ||||
| 				for i, event := range toDiscard { | ||||
| 					if t.After(event.discardAt) { | ||||
| 						discardFn(event.event) | ||||
| 					} else { | ||||
| 						discardQueue = toDiscard[i:] | ||||
| 						break | ||||
| 					} | ||||
| 				} | ||||
| 				if len(discardQueue) == 0 { | ||||
| 					discardTime = nil | ||||
| 				} else { | ||||
| 					// Wait until next item in discard queue plus a small buffer to collect a burst of events | ||||
| 					discardTime = time.After(time.Until(discardQueue[0].discardAt).Abs() + 10*time.Millisecond) | ||||
| 				} | ||||
| 			} | ||||
|  | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	return EventQueue[T]{ | ||||
| 		events:      events, | ||||
| 		subscriberC: subscriberC, | ||||
| 		shutdownC:   shutdownC, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (eq *EventQueue[T]) Shutdown() { | ||||
| 	defer close(eq.shutdownC) | ||||
| 	eq.shutdownC <- struct{}{} | ||||
| } | ||||
|  | ||||
| func (eq *EventQueue[T]) Send(event T) { | ||||
| 	select { | ||||
| 	case <-eq.shutdownC: | ||||
| 	case eq.events <- event: | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (eq *EventQueue[T]) Subscribe() (<-chan T, io.Closer) { | ||||
| 	c := make(chan T, 100) | ||||
| 	subscription := eventSubscription[T]{ | ||||
| 		c:      c, | ||||
| 		closeC: make(chan struct{}), | ||||
| 	} | ||||
| 	eq.subscriberC <- subscription | ||||
|  | ||||
| 	return c, subscription | ||||
| } | ||||
							
								
								
									
										152
									
								
								internal/eventq/eventq_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										152
									
								
								internal/eventq/eventq_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,152 @@ | ||||
| /* | ||||
|    Copyright The containerd Authors. | ||||
|  | ||||
|    Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|    you may not use this file except in compliance with the License. | ||||
|    You may obtain a copy of the License at | ||||
|  | ||||
|        http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
|    Unless required by applicable law or agreed to in writing, software | ||||
|    distributed under the License is distributed on an "AS IS" BASIS, | ||||
|    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|    See the License for the specific language governing permissions and | ||||
|    limitations under the License. | ||||
| */ | ||||
|  | ||||
| package eventq | ||||
|  | ||||
| import ( | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| ) | ||||
|  | ||||
| func TestSingleSubscriber(t *testing.T) { | ||||
| 	eq := New[int](time.Second, func(int) {}) | ||||
| 	c := newCollector(eq) | ||||
| 	expected := []int{1, 2, 3, 4, 5} | ||||
| 	for _, i := range expected { | ||||
| 		eq.Send(i) | ||||
| 	} | ||||
| 	eq.Shutdown() | ||||
| 	assert.Equal(t, expected, c.Collected()) | ||||
| } | ||||
|  | ||||
| func TestMultiSubscriber(t *testing.T) { | ||||
| 	eq := New[int](time.Second, func(int) {}) | ||||
| 	cs := make([]*collector, 10) | ||||
| 	for i := range cs { | ||||
| 		cs[i] = newCollector(eq) | ||||
| 	} | ||||
| 	expected := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} | ||||
| 	for _, i := range expected { | ||||
| 		eq.Send(i) | ||||
| 	} | ||||
| 	eq.Shutdown() | ||||
| 	for i := range cs { | ||||
| 		assert.Equal(t, expected, cs[i].Collected()) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestMissedEvents(t *testing.T) { | ||||
| 	eq := New[int](3600*time.Second, func(int) {}) | ||||
| 	expected := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} | ||||
| 	for _, i := range expected[:2] { | ||||
| 		eq.Send(i) | ||||
| 	} | ||||
| 	c := newCollector(eq) | ||||
| 	for _, i := range expected[2:] { | ||||
| 		eq.Send(i) | ||||
| 	} | ||||
|  | ||||
| 	eq.Shutdown() | ||||
| 	assert.Equal(t, expected, c.Collected()) | ||||
| } | ||||
|  | ||||
| func TestSubscribersDifferentTime(t *testing.T) { | ||||
| 	eq := New[int](time.Second, func(int) {}) | ||||
| 	expected := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} | ||||
| 	c1 := newCollector(eq) | ||||
| 	for _, i := range expected[:2] { | ||||
| 		eq.Send(i) | ||||
| 	} | ||||
| 	c2 := newCollector(eq) | ||||
| 	for _, i := range expected[2:] { | ||||
| 		eq.Send(i) | ||||
| 	} | ||||
|  | ||||
| 	eq.Shutdown() | ||||
| 	assert.Equal(t, expected, c1.Collected()) | ||||
| 	assert.Equal(t, expected[2:], c2.Collected()) | ||||
| } | ||||
|  | ||||
| func TestDiscardedAfterTime(t *testing.T) { | ||||
| 	discarded := []int{} | ||||
| 	eq := New[int](time.Microsecond, func(i int) { | ||||
| 		discarded = append(discarded, i) | ||||
| 	}) | ||||
| 	expected := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} | ||||
| 	for _, i := range expected[:2] { | ||||
| 		eq.Send(i) | ||||
| 	} | ||||
| 	time.Sleep(50 * time.Millisecond) | ||||
| 	c := newCollector(eq) | ||||
| 	for _, i := range expected[2:] { | ||||
| 		eq.Send(i) | ||||
| 	} | ||||
|  | ||||
| 	eq.Shutdown() | ||||
| 	assert.Equal(t, expected[:2], discarded) | ||||
| 	assert.Equal(t, expected[2:], c.Collected()) | ||||
| } | ||||
|  | ||||
| func TestDiscardedAtShutdown(t *testing.T) { | ||||
| 	discarded := []int{} | ||||
| 	expected := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} | ||||
| 	done := make(chan struct{}) | ||||
| 	eq := New[int](3600*time.Second, func(i int) { | ||||
| 		discarded = append(discarded, i) | ||||
| 		if len(discarded) == len(expected) { | ||||
| 			close(done) | ||||
| 		} | ||||
| 	}) | ||||
| 	for _, i := range expected { | ||||
| 		eq.Send(i) | ||||
| 	} | ||||
| 	eq.Shutdown() | ||||
| 	select { | ||||
| 	case <-done: | ||||
| 	case <-time.After(time.Second): | ||||
| 	} | ||||
| 	assert.Equal(t, expected, discarded) | ||||
| } | ||||
|  | ||||
| type collector struct { | ||||
| 	collected []int | ||||
| 	c         <-chan int | ||||
| 	done      chan struct{} | ||||
| } | ||||
|  | ||||
| func newCollector(eq EventQueue[int]) *collector { | ||||
| 	eventC, closer := eq.Subscribe() | ||||
| 	c := &collector{ | ||||
| 		c:    eventC, | ||||
| 		done: make(chan struct{}), | ||||
| 	} | ||||
| 	go func() { | ||||
| 		defer close(c.done) | ||||
| 		defer closer.Close() | ||||
| 		for i := range c.c { | ||||
| 			c.collected = append(c.collected, i) | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	return c | ||||
| } | ||||
|  | ||||
| func (c *collector) Collected() []int { | ||||
| 	<-c.done | ||||
| 	return c.collected | ||||
| } | ||||
| @@ -21,10 +21,10 @@ import ( | ||||
| ) | ||||
|  | ||||
| func (c *criService) GetContainerEvents(r *runtime.GetEventsRequest, s runtime.RuntimeService_GetContainerEventsServer) error { | ||||
| 	// TODO (https://github.com/containerd/containerd/issues/7318): | ||||
| 	// replace with a real implementation that broadcasts containerEventsChan | ||||
| 	// to all subscribers. | ||||
| 	for event := range c.containerEventsChan { | ||||
| 	eventC, closer := c.containerEventsQ.Subscribe() | ||||
| 	defer closer.Close() | ||||
|  | ||||
| 	for event := range eventC { | ||||
| 		if err := s.Send(&event); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|   | ||||
| @@ -395,13 +395,7 @@ func (c *criService) generateAndSendContainerEvent(ctx context.Context, containe | ||||
| 		ContainersStatuses: containerStatuses, | ||||
| 	} | ||||
|  | ||||
| 	// TODO(ruiwen-zhao): write events to a cache, storage, or increase the size of the channel | ||||
| 	select { | ||||
| 	case c.containerEventsChan <- event: | ||||
| 	default: | ||||
| 		containerEventsDroppedCount.Inc() | ||||
| 		log.G(ctx).Debugf("containerEventsChan is full, discarding event %+v", event) | ||||
| 	} | ||||
| 	c.containerEventsQ.Send(event) | ||||
| } | ||||
|  | ||||
| func (c *criService) getPodSandboxStatus(ctx context.Context, podSandboxID string) (*runtime.PodSandboxStatus, error) { | ||||
|   | ||||
| @@ -23,6 +23,7 @@ import ( | ||||
| 	"net/http" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/containerd/go-cni" | ||||
| 	"github.com/containerd/log" | ||||
| @@ -32,6 +33,7 @@ import ( | ||||
|  | ||||
| 	containerd "github.com/containerd/containerd/v2/client" | ||||
| 	"github.com/containerd/containerd/v2/core/sandbox" | ||||
| 	"github.com/containerd/containerd/v2/internal/eventq" | ||||
| 	"github.com/containerd/containerd/v2/internal/registrar" | ||||
| 	criconfig "github.com/containerd/containerd/v2/pkg/cri/config" | ||||
| 	"github.com/containerd/containerd/v2/pkg/cri/nri" | ||||
| @@ -118,9 +120,9 @@ type criService struct { | ||||
| 	// allCaps is the list of the capabilities. | ||||
| 	// When nil, parsed from CapEff of /proc/self/status. | ||||
| 	allCaps []string //nolint:nolintlint,unused // Ignore on non-Linux | ||||
| 	// containerEventsChan is used to capture container events and send them | ||||
| 	// to the caller of GetContainerEvents. | ||||
| 	containerEventsChan chan runtime.ContainerEventResponse | ||||
| 	// containerEventsQ is used to capture container events and send them | ||||
| 	// to the callers of GetContainerEvents. | ||||
| 	containerEventsQ eventq.EventQueue[runtime.ContainerEventResponse] | ||||
| 	// nri is used to hook NRI into CRI request processing. | ||||
| 	nri *nri.API | ||||
| 	// sandboxService is the sandbox related service for CRI | ||||
| @@ -165,8 +167,15 @@ func NewCRIService(config criconfig.Config, options *CRIServiceOptions) (CRIServ | ||||
| 		sandboxService:     newCriSandboxService(&config, options.Client), | ||||
| 	} | ||||
|  | ||||
| 	// TODO: figure out a proper channel size. | ||||
| 	c.containerEventsChan = make(chan runtime.ContainerEventResponse, 1000) | ||||
| 	// TODO: Make discard time configurable | ||||
| 	c.containerEventsQ = eventq.New[runtime.ContainerEventResponse](5*time.Minute, func(event runtime.ContainerEventResponse) { | ||||
| 		containerEventsDroppedCount.Inc() | ||||
| 		log.L.WithFields( | ||||
| 			log.Fields{ | ||||
| 				"container": event.ContainerId, | ||||
| 				"type":      event.ContainerEventType, | ||||
| 			}).Warn("container event discarded") | ||||
| 	}) | ||||
|  | ||||
| 	if err := c.initPlatform(); err != nil { | ||||
| 		return nil, nil, fmt.Errorf("initialize platform: %w", err) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Akihiro Suda
					Akihiro Suda