Add support for multiple subscribers to CRI container events

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan
2024-01-19 13:46:12 -08:00
parent 0817c9798f
commit e7eb08eb56
5 changed files with 334 additions and 16 deletions

View File

@@ -23,6 +23,7 @@ import (
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/containerd/go-cni"
"github.com/containerd/log"
@@ -32,6 +33,7 @@ import (
containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/sandbox"
"github.com/containerd/containerd/v2/internal/eventq"
"github.com/containerd/containerd/v2/internal/registrar"
criconfig "github.com/containerd/containerd/v2/pkg/cri/config"
"github.com/containerd/containerd/v2/pkg/cri/nri"
@@ -118,9 +120,9 @@ type criService struct {
// allCaps is the list of the capabilities.
// When nil, parsed from CapEff of /proc/self/status.
allCaps []string //nolint:nolintlint,unused // Ignore on non-Linux
// containerEventsChan is used to capture container events and send them
// to the caller of GetContainerEvents.
containerEventsChan chan runtime.ContainerEventResponse
// containerEventsQ is used to capture container events and send them
// to the callers of GetContainerEvents.
containerEventsQ eventq.EventQueue[runtime.ContainerEventResponse]
// nri is used to hook NRI into CRI request processing.
nri *nri.API
// sandboxService is the sandbox related service for CRI
@@ -165,8 +167,15 @@ func NewCRIService(config criconfig.Config, options *CRIServiceOptions) (CRIServ
sandboxService: newCriSandboxService(&config, options.Client),
}
// TODO: figure out a proper channel size.
c.containerEventsChan = make(chan runtime.ContainerEventResponse, 1000)
// TODO: Make discard time configurable
c.containerEventsQ = eventq.New[runtime.ContainerEventResponse](5*time.Minute, func(event runtime.ContainerEventResponse) {
containerEventsDroppedCount.Inc()
log.L.WithFields(
log.Fields{
"container": event.ContainerId,
"type": event.ContainerEventType,
}).Warn("container event discarded")
})
if err := c.initPlatform(); err != nil {
return nil, nil, fmt.Errorf("initialize platform: %w", err)