From e7eb08eb560a50b7304681079d1d53d953367f6f Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Fri, 19 Jan 2024 13:46:12 -0800 Subject: [PATCH] Add support for multiple subscribers to CRI container events Signed-off-by: Derek McGowan --- internal/eventq/eventq.go | 163 +++++++++++++++++++++++++++++ internal/eventq/eventq_test.go | 152 +++++++++++++++++++++++++++ pkg/cri/server/container_events.go | 8 +- pkg/cri/server/helpers.go | 8 +- pkg/cri/server/service.go | 19 +++- 5 files changed, 334 insertions(+), 16 deletions(-) create mode 100644 internal/eventq/eventq.go create mode 100644 internal/eventq/eventq_test.go diff --git a/internal/eventq/eventq.go b/internal/eventq/eventq.go new file mode 100644 index 000000000..91a57ce53 --- /dev/null +++ b/internal/eventq/eventq.go @@ -0,0 +1,163 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package eventq + +import ( + "io" + "time" +) + +type EventQueue[T any] struct { + events chan<- T + subscriberC chan<- eventSubscription[T] + shutdownC chan struct{} +} + +type eventSubscription[T any] struct { + c chan<- T + closeC chan struct{} +} + +func (sub eventSubscription[T]) publish(event T) bool { + select { + case <-sub.closeC: + return false + case sub.c <- event: + return true + } +} + +func (sub eventSubscription[T]) Close() error { + select { + case <-sub.closeC: + default: + close(sub.closeC) + } + return nil +} + +// New provides a queue for sending messages to one or more +// subscribers. Messages are held for the given discardAfter duration +// if there are no subscribers. +func New[T any](discardAfter time.Duration, discardFn func(T)) EventQueue[T] { + events := make(chan T) + subscriberC := make(chan eventSubscription[T]) + shutdownC := make(chan struct{}) + + go func() { + type queuedEvent struct { + event T + discardAt time.Time + } + + var discardQueue []queuedEvent + var discardTime <-chan time.Time + var subscribers []eventSubscription[T] + for { + select { + case <-shutdownC: + for _, event := range discardQueue { + discardFn(event.event) + } + for _, sub := range subscribers { + close(sub.c) + } + return + case event := <-events: + if len(subscribers) > 0 { + active := subscribers[:0] + for _, sub := range subscribers { + if sub.publish(event) { + active = append(active, sub) + } + } + subscribers = active + } + if len(subscribers) == 0 { + discardQueue = append(discardQueue, queuedEvent{ + event: event, + discardAt: time.Now().Add(discardAfter), + }) + if discardTime == nil { + discardTime = time.After(time.Until(discardQueue[0].discardAt).Abs()) + } + } + case s := <-subscriberC: + var closed bool + for i, event := range discardQueue { + if !s.publish(event.event) { + discardQueue = discardQueue[i:] + closed = true + break + } + } + if !closed { + discardQueue = nil + discardTime = nil + subscribers = append(subscribers, s) + } + case t := <-discardTime: + toDiscard := discardQueue + discardQueue = nil + for i, event := range toDiscard { + if t.After(event.discardAt) { + discardFn(event.event) + } else { + discardQueue = toDiscard[i:] + break + } + } + if len(discardQueue) == 0 { + discardTime = nil + } else { + // Wait until next item in discard queue plus a small buffer to collect a burst of events + discardTime = time.After(time.Until(discardQueue[0].discardAt).Abs() + 10*time.Millisecond) + } + } + + } + }() + + return EventQueue[T]{ + events: events, + subscriberC: subscriberC, + shutdownC: shutdownC, + } +} + +func (eq *EventQueue[T]) Shutdown() { + defer close(eq.shutdownC) + eq.shutdownC <- struct{}{} +} + +func (eq *EventQueue[T]) Send(event T) { + select { + case <-eq.shutdownC: + case eq.events <- event: + } +} + +func (eq *EventQueue[T]) Subscribe() (<-chan T, io.Closer) { + c := make(chan T, 100) + subscription := eventSubscription[T]{ + c: c, + closeC: make(chan struct{}), + } + eq.subscriberC <- subscription + + return c, subscription +} diff --git a/internal/eventq/eventq_test.go b/internal/eventq/eventq_test.go new file mode 100644 index 000000000..261729230 --- /dev/null +++ b/internal/eventq/eventq_test.go @@ -0,0 +1,152 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package eventq + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestSingleSubscriber(t *testing.T) { + eq := New[int](time.Second, func(int) {}) + c := newCollector(eq) + expected := []int{1, 2, 3, 4, 5} + for _, i := range expected { + eq.Send(i) + } + eq.Shutdown() + assert.Equal(t, expected, c.Collected()) +} + +func TestMultiSubscriber(t *testing.T) { + eq := New[int](time.Second, func(int) {}) + cs := make([]*collector, 10) + for i := range cs { + cs[i] = newCollector(eq) + } + expected := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} + for _, i := range expected { + eq.Send(i) + } + eq.Shutdown() + for i := range cs { + assert.Equal(t, expected, cs[i].Collected()) + } +} + +func TestMissedEvents(t *testing.T) { + eq := New[int](3600*time.Second, func(int) {}) + expected := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} + for _, i := range expected[:2] { + eq.Send(i) + } + c := newCollector(eq) + for _, i := range expected[2:] { + eq.Send(i) + } + + eq.Shutdown() + assert.Equal(t, expected, c.Collected()) +} + +func TestSubscribersDifferentTime(t *testing.T) { + eq := New[int](time.Second, func(int) {}) + expected := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} + c1 := newCollector(eq) + for _, i := range expected[:2] { + eq.Send(i) + } + c2 := newCollector(eq) + for _, i := range expected[2:] { + eq.Send(i) + } + + eq.Shutdown() + assert.Equal(t, expected, c1.Collected()) + assert.Equal(t, expected[2:], c2.Collected()) +} + +func TestDiscardedAfterTime(t *testing.T) { + discarded := []int{} + eq := New[int](time.Microsecond, func(i int) { + discarded = append(discarded, i) + }) + expected := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} + for _, i := range expected[:2] { + eq.Send(i) + } + time.Sleep(50 * time.Millisecond) + c := newCollector(eq) + for _, i := range expected[2:] { + eq.Send(i) + } + + eq.Shutdown() + assert.Equal(t, expected[:2], discarded) + assert.Equal(t, expected[2:], c.Collected()) +} + +func TestDiscardedAtShutdown(t *testing.T) { + discarded := []int{} + expected := []int{1, 2, 3, 4, 5, 6, 7, 8, 9} + done := make(chan struct{}) + eq := New[int](3600*time.Second, func(i int) { + discarded = append(discarded, i) + if len(discarded) == len(expected) { + close(done) + } + }) + for _, i := range expected { + eq.Send(i) + } + eq.Shutdown() + select { + case <-done: + case <-time.After(time.Second): + } + assert.Equal(t, expected, discarded) +} + +type collector struct { + collected []int + c <-chan int + done chan struct{} +} + +func newCollector(eq EventQueue[int]) *collector { + eventC, closer := eq.Subscribe() + c := &collector{ + c: eventC, + done: make(chan struct{}), + } + go func() { + defer close(c.done) + defer closer.Close() + for i := range c.c { + c.collected = append(c.collected, i) + } + }() + + return c +} + +func (c *collector) Collected() []int { + <-c.done + return c.collected +} diff --git a/pkg/cri/server/container_events.go b/pkg/cri/server/container_events.go index 83e322cd9..f8d157baa 100644 --- a/pkg/cri/server/container_events.go +++ b/pkg/cri/server/container_events.go @@ -21,10 +21,10 @@ import ( ) func (c *criService) GetContainerEvents(r *runtime.GetEventsRequest, s runtime.RuntimeService_GetContainerEventsServer) error { - // TODO (https://github.com/containerd/containerd/issues/7318): - // replace with a real implementation that broadcasts containerEventsChan - // to all subscribers. - for event := range c.containerEventsChan { + eventC, closer := c.containerEventsQ.Subscribe() + defer closer.Close() + + for event := range eventC { if err := s.Send(&event); err != nil { return err } diff --git a/pkg/cri/server/helpers.go b/pkg/cri/server/helpers.go index 8fcae64c1..1a671cf56 100644 --- a/pkg/cri/server/helpers.go +++ b/pkg/cri/server/helpers.go @@ -395,13 +395,7 @@ func (c *criService) generateAndSendContainerEvent(ctx context.Context, containe ContainersStatuses: containerStatuses, } - // TODO(ruiwen-zhao): write events to a cache, storage, or increase the size of the channel - select { - case c.containerEventsChan <- event: - default: - containerEventsDroppedCount.Inc() - log.G(ctx).Debugf("containerEventsChan is full, discarding event %+v", event) - } + c.containerEventsQ.Send(event) } func (c *criService) getPodSandboxStatus(ctx context.Context, podSandboxID string) (*runtime.PodSandboxStatus, error) { diff --git a/pkg/cri/server/service.go b/pkg/cri/server/service.go index 4fb8e28b7..708088b92 100644 --- a/pkg/cri/server/service.go +++ b/pkg/cri/server/service.go @@ -23,6 +23,7 @@ import ( "net/http" "sync" "sync/atomic" + "time" "github.com/containerd/go-cni" "github.com/containerd/log" @@ -32,6 +33,7 @@ import ( containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/sandbox" + "github.com/containerd/containerd/v2/internal/eventq" "github.com/containerd/containerd/v2/internal/registrar" criconfig "github.com/containerd/containerd/v2/pkg/cri/config" "github.com/containerd/containerd/v2/pkg/cri/nri" @@ -118,9 +120,9 @@ type criService struct { // allCaps is the list of the capabilities. // When nil, parsed from CapEff of /proc/self/status. allCaps []string //nolint:nolintlint,unused // Ignore on non-Linux - // containerEventsChan is used to capture container events and send them - // to the caller of GetContainerEvents. - containerEventsChan chan runtime.ContainerEventResponse + // containerEventsQ is used to capture container events and send them + // to the callers of GetContainerEvents. + containerEventsQ eventq.EventQueue[runtime.ContainerEventResponse] // nri is used to hook NRI into CRI request processing. nri *nri.API // sandboxService is the sandbox related service for CRI @@ -165,8 +167,15 @@ func NewCRIService(config criconfig.Config, options *CRIServiceOptions) (CRIServ sandboxService: newCriSandboxService(&config, options.Client), } - // TODO: figure out a proper channel size. - c.containerEventsChan = make(chan runtime.ContainerEventResponse, 1000) + // TODO: Make discard time configurable + c.containerEventsQ = eventq.New[runtime.ContainerEventResponse](5*time.Minute, func(event runtime.ContainerEventResponse) { + containerEventsDroppedCount.Inc() + log.L.WithFields( + log.Fields{ + "container": event.ContainerId, + "type": event.ContainerEventType, + }).Warn("container event discarded") + }) if err := c.initPlatform(); err != nil { return nil, nil, fmt.Errorf("initialize platform: %w", err)