diff --git a/core/metadata/db.go b/core/metadata/db.go index 8fb8409f2..303af643d 100644 --- a/core/metadata/db.go +++ b/core/metadata/db.go @@ -270,6 +270,20 @@ func (m *DB) Update(fn func(*bolt.Tx) error) error { return err } +// Publisher returns an event publisher if one is configured +// and the current context is not inside a transaction. +func (m *DB) Publisher(ctx context.Context) events.Publisher { + _, ok := ctx.Value(transactionKey{}).(*bolt.Tx) + if ok { + // Do no publish events within a transaction + return nil + } + if m.dbopts.publisher != nil { + return m.dbopts.publisher + } + return nil +} + // RegisterMutationCallback registers a function to be called after a metadata // mutations has been performed. // diff --git a/core/metadata/images.go b/core/metadata/images.go index 8da3c6ed4..e01ad31d2 100644 --- a/core/metadata/images.go +++ b/core/metadata/images.go @@ -25,6 +25,7 @@ import ( "sync/atomic" "time" + eventstypes "github.com/containerd/containerd/v2/api/events" "github.com/containerd/containerd/v2/core/images" "github.com/containerd/containerd/v2/core/metadata/boltutil" "github.com/containerd/containerd/v2/pkg/epoch" @@ -164,6 +165,15 @@ func (s *imageStore) Create(ctx context.Context, image images.Image) (images.Ima return images.Image{}, err } + if publisher := s.db.Publisher(ctx); publisher != nil { + if err := publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{ + Name: image.Name, + Labels: image.Labels, + }); err != nil { + return images.Image{}, err + } + } + return image, nil } @@ -256,6 +266,15 @@ func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths return images.Image{}, err } + if publisher := s.db.Publisher(ctx); publisher != nil { + if err := publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{ + Name: updated.Name, + Labels: updated.Labels, + }); err != nil { + return images.Image{}, err + } + } + return updated, nil } @@ -273,7 +292,7 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del } } - return update(ctx, s.db, func(tx *bolt.Tx) error { + err = update(ctx, s.db, func(tx *bolt.Tx) error { bkt := getImagesBucket(tx, namespace) if bkt == nil { return fmt.Errorf("image %q: %w", name, errdefs.ErrNotFound) @@ -310,6 +329,19 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del return nil }) + if err != nil { + return err + } + + if publisher := s.db.Publisher(ctx); publisher != nil { + if err := publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{ + Name: name, + }); err != nil { + return err + } + } + + return nil } func validateImage(image *images.Image) error { diff --git a/core/metadata/snapshot.go b/core/metadata/snapshot.go index af4234baa..d77c95fca 100644 --- a/core/metadata/snapshot.go +++ b/core/metadata/snapshot.go @@ -279,8 +279,8 @@ func (s *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...s return nil, err } - if s.db.dbopts.publisher != nil { - if err := s.db.dbopts.publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{ + if publisher := s.db.Publisher(ctx); publisher != nil { + if err := publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{ Key: key, Parent: parent, Snapshotter: s.name, @@ -634,8 +634,8 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap return err } - if s.db.dbopts.publisher != nil { - if err := s.db.dbopts.publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{ + if publisher := s.db.Publisher(ctx); publisher != nil { + if err := publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{ Key: key, Name: name, Snapshotter: s.name, @@ -704,8 +704,8 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error { return err } - if s.db.dbopts.publisher != nil { - return s.db.dbopts.publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{ + if publisher := s.db.Publisher(ctx); publisher != nil { + return publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{ Key: key, Snapshotter: s.name, }) diff --git a/internal/cri/server/images/image_pull.go b/internal/cri/server/images/image_pull.go index a1df05c1b..81990437c 100644 --- a/internal/cri/server/images/image_pull.go +++ b/internal/cri/server/images/image_pull.go @@ -38,7 +38,6 @@ import ( imagespec "github.com/opencontainers/image-spec/specs-go/v1" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" - eventstypes "github.com/containerd/containerd/v2/api/events" containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/diff" containerdimages "github.com/containerd/containerd/v2/core/images" @@ -322,14 +321,6 @@ func (c *CRIImageService) createImageReference(ctx context.Context, name string, // TODO: Call CRIImageService directly oldImg, err := c.images.Create(ctx, img) if err == nil { - if c.publisher != nil { - if err := c.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{ - Name: img.Name, - Labels: img.Labels, - }); err != nil { - return err - } - } return nil } else if !errdefs.IsAlreadyExists(err) { return err @@ -338,16 +329,6 @@ func (c *CRIImageService) createImageReference(ctx context.Context, name string, return nil } _, err = c.images.Update(ctx, img, "target", "labels."+crilabels.ImageLabelKey) - if err == nil && c.publisher != nil { - if c.publisher != nil { - if err := c.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{ - Name: img.Name, - Labels: img.Labels, - }); err != nil { - return err - } - } - } return err } diff --git a/internal/cri/server/images/image_remove.go b/internal/cri/server/images/image_remove.go index 33fd10f4e..1311e33bf 100644 --- a/internal/cri/server/images/image_remove.go +++ b/internal/cri/server/images/image_remove.go @@ -20,7 +20,6 @@ import ( "context" "fmt" - eventstypes "github.com/containerd/containerd/v2/api/events" "github.com/containerd/containerd/v2/core/images" "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/errdefs" @@ -63,14 +62,6 @@ func (c *GRPCCRIImageService) RemoveImage(ctx context.Context, r *runtime.Remove if err := c.imageStore.Update(ctx, ref); err != nil { return nil, fmt.Errorf("failed to update image reference %q for %q: %w", ref, image.ID, err) } - - if c.publisher != nil { - if err := c.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{ - Name: ref, - }); err != nil { - return nil, err - } - } continue } return nil, fmt.Errorf("failed to delete image reference %q for %q: %w", ref, image.ID, err) diff --git a/internal/cri/server/images/service.go b/internal/cri/server/images/service.go index 40d695cc6..94a4113d8 100644 --- a/internal/cri/server/images/service.go +++ b/internal/cri/server/images/service.go @@ -28,7 +28,6 @@ import ( imagestore "github.com/containerd/containerd/v2/internal/cri/store/image" snapshotstore "github.com/containerd/containerd/v2/internal/cri/store/snapshot" "github.com/containerd/containerd/v2/internal/kmutex" - "github.com/containerd/containerd/v2/pkg/events" "github.com/containerd/log" "github.com/containerd/platforms" docker "github.com/distribution/reference" @@ -54,8 +53,6 @@ type CRIImageService struct { // images is the lower level image store used for raw storage, // no event publishing should currently be assumed images images.Store - // publisher is the events publisher - publisher events.Publisher // client is a subset of the containerd client // and will be replaced by image store and transfer service client imageClient @@ -88,8 +85,6 @@ type CRIImageServiceOptions struct { Snapshotters map[string]snapshots.Snapshotter - Publisher events.Publisher - Client imageClient } diff --git a/plugins/cri/images/plugin.go b/plugins/cri/images/plugin.go index f2ae02f4e..3267d0068 100644 --- a/plugins/cri/images/plugin.go +++ b/plugins/cri/images/plugin.go @@ -28,7 +28,6 @@ import ( criconfig "github.com/containerd/containerd/v2/internal/cri/config" "github.com/containerd/containerd/v2/internal/cri/constants" "github.com/containerd/containerd/v2/internal/cri/server/images" - "github.com/containerd/containerd/v2/pkg/events" "github.com/containerd/containerd/v2/plugins" "github.com/containerd/containerd/v2/plugins/services/warning" "github.com/containerd/log" @@ -46,7 +45,6 @@ func init() { Config: &config, Requires: []plugin.Type{ plugins.LeasePlugin, - plugins.EventPlugin, plugins.MetadataPlugin, plugins.SandboxStorePlugin, plugins.ServicePlugin, // For client @@ -60,11 +58,6 @@ func init() { } mdb := m.(*metadata.DB) - ep, err := ic.GetSingle(plugins.EventPlugin) - if err != nil { - return nil, err - } - if warnings, err := criconfig.ValidateImageConfig(ic.Context, &config); err != nil { return nil, fmt.Errorf("invalid cri image config: %w", err) } else if len(warnings) > 0 { @@ -84,7 +77,6 @@ func init() { RuntimePlatforms: map[string]images.ImagePlatform{}, Snapshotters: map[string]snapshots.Snapshotter{}, ImageFSPaths: map[string]string{}, - Publisher: ep.(events.Publisher), } options.Client, err = containerd.New( diff --git a/plugins/services/images/local.go b/plugins/services/images/local.go index 9adb90c8f..1108a502d 100644 --- a/plugins/services/images/local.go +++ b/plugins/services/images/local.go @@ -24,13 +24,11 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - eventstypes "github.com/containerd/containerd/v2/api/events" imagesapi "github.com/containerd/containerd/v2/api/services/images/v1" "github.com/containerd/containerd/v2/core/images" "github.com/containerd/containerd/v2/core/metadata" "github.com/containerd/containerd/v2/pkg/deprecation" "github.com/containerd/containerd/v2/pkg/epoch" - "github.com/containerd/containerd/v2/pkg/events" "github.com/containerd/containerd/v2/pkg/gc" "github.com/containerd/containerd/v2/plugins" "github.com/containerd/containerd/v2/plugins/services" @@ -46,7 +44,6 @@ func init() { Type: plugins.ServicePlugin, ID: services.ImagesService, Requires: []plugin.Type{ - plugins.EventPlugin, plugins.MetadataPlugin, plugins.GCPlugin, plugins.WarningPlugin, @@ -60,20 +57,15 @@ func init() { if err != nil { return nil, err } - ep, err := ic.GetSingle(plugins.EventPlugin) - if err != nil { - return nil, err - } w, err := ic.GetSingle(plugins.WarningPlugin) if err != nil { return nil, err } return &local{ - store: metadata.NewImageStore(m.(*metadata.DB)), - publisher: ep.(events.Publisher), - gc: g.(gcScheduler), - warnings: w.(warning.Service), + store: metadata.NewImageStore(m.(*metadata.DB)), + gc: g.(gcScheduler), + warnings: w.(warning.Service), }, nil }, }) @@ -84,10 +76,9 @@ type gcScheduler interface { } type local struct { - store images.Store - gc gcScheduler - publisher events.Publisher - warnings warning.Service + store images.Store + gc gcScheduler + warnings warning.Service } var _ imagesapi.ImagesClient = &local{} @@ -136,13 +127,6 @@ func (l *local) Create(ctx context.Context, req *imagesapi.CreateImageRequest, _ resp.Image = imageToProto(&created) - if err := l.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{ - Name: resp.Image.Name, - Labels: resp.Image.Labels, - }); err != nil { - return nil, err - } - l.emitSchema1DeprecationWarning(ctx, &image) return &resp, nil @@ -175,13 +159,6 @@ func (l *local) Update(ctx context.Context, req *imagesapi.UpdateImageRequest, _ resp.Image = imageToProto(&updated) - if err := l.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{ - Name: resp.Image.Name, - Labels: resp.Image.Labels, - }); err != nil { - return nil, err - } - l.emitSchema1DeprecationWarning(ctx, &image) return &resp, nil } @@ -200,12 +177,6 @@ func (l *local) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest, _ return nil, errdefs.ToGRPC(err) } - if err := l.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{ - Name: req.Name, - }); err != nil { - return nil, err - } - if req.Sync { if _, err := l.gc.ScheduleAndWait(ctx); err != nil { return nil, err