From 86530c0afbe282f24a053b56183857e732470e56 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Tue, 6 Feb 2024 21:24:55 -0800 Subject: [PATCH 1/2] Move image event publishing to metadata store The metadata store is in the best place to handle events directly after the database has been updated. This prevents every user of the image store interface from having to know whether or not they are responsible for publishing events and avoid double events if the grpc local service is used. Signed-off-by: Derek McGowan --- core/metadata/images.go | 34 +++++++++++++++++- internal/cri/server/images/image_pull.go | 19 ---------- internal/cri/server/images/image_remove.go | 9 ----- internal/cri/server/images/service.go | 5 --- plugins/cri/images/plugin.go | 8 ----- plugins/services/images/local.go | 41 ++++------------------ 6 files changed, 39 insertions(+), 77 deletions(-) diff --git a/core/metadata/images.go b/core/metadata/images.go index 8da3c6ed4..49c9fb7bd 100644 --- a/core/metadata/images.go +++ b/core/metadata/images.go @@ -25,6 +25,7 @@ import ( "sync/atomic" "time" + eventstypes "github.com/containerd/containerd/v2/api/events" "github.com/containerd/containerd/v2/core/images" "github.com/containerd/containerd/v2/core/metadata/boltutil" "github.com/containerd/containerd/v2/pkg/epoch" @@ -164,6 +165,15 @@ func (s *imageStore) Create(ctx context.Context, image images.Image) (images.Ima return images.Image{}, err } + if s.db.dbopts.publisher != nil { + if err := s.db.dbopts.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{ + Name: image.Name, + Labels: image.Labels, + }); err != nil { + return images.Image{}, err + } + } + return image, nil } @@ -256,6 +266,15 @@ func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths return images.Image{}, err } + if s.db.dbopts.publisher != nil { + if err := s.db.dbopts.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{ + Name: updated.Name, + Labels: updated.Labels, + }); err != nil { + return images.Image{}, err + } + } + return updated, nil } @@ -273,7 +292,7 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del } } - return update(ctx, s.db, func(tx *bolt.Tx) error { + err = update(ctx, s.db, func(tx *bolt.Tx) error { bkt := getImagesBucket(tx, namespace) if bkt == nil { return fmt.Errorf("image %q: %w", name, errdefs.ErrNotFound) @@ -310,6 +329,19 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del return nil }) + if err != nil { + return err + } + + if s.db.dbopts.publisher != nil { + if err := s.db.dbopts.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{ + Name: name, + }); err != nil { + return err + } + } + + return nil } func validateImage(image *images.Image) error { diff --git a/internal/cri/server/images/image_pull.go b/internal/cri/server/images/image_pull.go index a1df05c1b..81990437c 100644 --- a/internal/cri/server/images/image_pull.go +++ b/internal/cri/server/images/image_pull.go @@ -38,7 +38,6 @@ import ( imagespec "github.com/opencontainers/image-spec/specs-go/v1" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" - eventstypes "github.com/containerd/containerd/v2/api/events" containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/diff" containerdimages "github.com/containerd/containerd/v2/core/images" @@ -322,14 +321,6 @@ func (c *CRIImageService) createImageReference(ctx context.Context, name string, // TODO: Call CRIImageService directly oldImg, err := c.images.Create(ctx, img) if err == nil { - if c.publisher != nil { - if err := c.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{ - Name: img.Name, - Labels: img.Labels, - }); err != nil { - return err - } - } return nil } else if !errdefs.IsAlreadyExists(err) { return err @@ -338,16 +329,6 @@ func (c *CRIImageService) createImageReference(ctx context.Context, name string, return nil } _, err = c.images.Update(ctx, img, "target", "labels."+crilabels.ImageLabelKey) - if err == nil && c.publisher != nil { - if c.publisher != nil { - if err := c.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{ - Name: img.Name, - Labels: img.Labels, - }); err != nil { - return err - } - } - } return err } diff --git a/internal/cri/server/images/image_remove.go b/internal/cri/server/images/image_remove.go index 33fd10f4e..1311e33bf 100644 --- a/internal/cri/server/images/image_remove.go +++ b/internal/cri/server/images/image_remove.go @@ -20,7 +20,6 @@ import ( "context" "fmt" - eventstypes "github.com/containerd/containerd/v2/api/events" "github.com/containerd/containerd/v2/core/images" "github.com/containerd/containerd/v2/pkg/tracing" "github.com/containerd/errdefs" @@ -63,14 +62,6 @@ func (c *GRPCCRIImageService) RemoveImage(ctx context.Context, r *runtime.Remove if err := c.imageStore.Update(ctx, ref); err != nil { return nil, fmt.Errorf("failed to update image reference %q for %q: %w", ref, image.ID, err) } - - if c.publisher != nil { - if err := c.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{ - Name: ref, - }); err != nil { - return nil, err - } - } continue } return nil, fmt.Errorf("failed to delete image reference %q for %q: %w", ref, image.ID, err) diff --git a/internal/cri/server/images/service.go b/internal/cri/server/images/service.go index 40d695cc6..94a4113d8 100644 --- a/internal/cri/server/images/service.go +++ b/internal/cri/server/images/service.go @@ -28,7 +28,6 @@ import ( imagestore "github.com/containerd/containerd/v2/internal/cri/store/image" snapshotstore "github.com/containerd/containerd/v2/internal/cri/store/snapshot" "github.com/containerd/containerd/v2/internal/kmutex" - "github.com/containerd/containerd/v2/pkg/events" "github.com/containerd/log" "github.com/containerd/platforms" docker "github.com/distribution/reference" @@ -54,8 +53,6 @@ type CRIImageService struct { // images is the lower level image store used for raw storage, // no event publishing should currently be assumed images images.Store - // publisher is the events publisher - publisher events.Publisher // client is a subset of the containerd client // and will be replaced by image store and transfer service client imageClient @@ -88,8 +85,6 @@ type CRIImageServiceOptions struct { Snapshotters map[string]snapshots.Snapshotter - Publisher events.Publisher - Client imageClient } diff --git a/plugins/cri/images/plugin.go b/plugins/cri/images/plugin.go index f2ae02f4e..3267d0068 100644 --- a/plugins/cri/images/plugin.go +++ b/plugins/cri/images/plugin.go @@ -28,7 +28,6 @@ import ( criconfig "github.com/containerd/containerd/v2/internal/cri/config" "github.com/containerd/containerd/v2/internal/cri/constants" "github.com/containerd/containerd/v2/internal/cri/server/images" - "github.com/containerd/containerd/v2/pkg/events" "github.com/containerd/containerd/v2/plugins" "github.com/containerd/containerd/v2/plugins/services/warning" "github.com/containerd/log" @@ -46,7 +45,6 @@ func init() { Config: &config, Requires: []plugin.Type{ plugins.LeasePlugin, - plugins.EventPlugin, plugins.MetadataPlugin, plugins.SandboxStorePlugin, plugins.ServicePlugin, // For client @@ -60,11 +58,6 @@ func init() { } mdb := m.(*metadata.DB) - ep, err := ic.GetSingle(plugins.EventPlugin) - if err != nil { - return nil, err - } - if warnings, err := criconfig.ValidateImageConfig(ic.Context, &config); err != nil { return nil, fmt.Errorf("invalid cri image config: %w", err) } else if len(warnings) > 0 { @@ -84,7 +77,6 @@ func init() { RuntimePlatforms: map[string]images.ImagePlatform{}, Snapshotters: map[string]snapshots.Snapshotter{}, ImageFSPaths: map[string]string{}, - Publisher: ep.(events.Publisher), } options.Client, err = containerd.New( diff --git a/plugins/services/images/local.go b/plugins/services/images/local.go index 9adb90c8f..1108a502d 100644 --- a/plugins/services/images/local.go +++ b/plugins/services/images/local.go @@ -24,13 +24,11 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - eventstypes "github.com/containerd/containerd/v2/api/events" imagesapi "github.com/containerd/containerd/v2/api/services/images/v1" "github.com/containerd/containerd/v2/core/images" "github.com/containerd/containerd/v2/core/metadata" "github.com/containerd/containerd/v2/pkg/deprecation" "github.com/containerd/containerd/v2/pkg/epoch" - "github.com/containerd/containerd/v2/pkg/events" "github.com/containerd/containerd/v2/pkg/gc" "github.com/containerd/containerd/v2/plugins" "github.com/containerd/containerd/v2/plugins/services" @@ -46,7 +44,6 @@ func init() { Type: plugins.ServicePlugin, ID: services.ImagesService, Requires: []plugin.Type{ - plugins.EventPlugin, plugins.MetadataPlugin, plugins.GCPlugin, plugins.WarningPlugin, @@ -60,20 +57,15 @@ func init() { if err != nil { return nil, err } - ep, err := ic.GetSingle(plugins.EventPlugin) - if err != nil { - return nil, err - } w, err := ic.GetSingle(plugins.WarningPlugin) if err != nil { return nil, err } return &local{ - store: metadata.NewImageStore(m.(*metadata.DB)), - publisher: ep.(events.Publisher), - gc: g.(gcScheduler), - warnings: w.(warning.Service), + store: metadata.NewImageStore(m.(*metadata.DB)), + gc: g.(gcScheduler), + warnings: w.(warning.Service), }, nil }, }) @@ -84,10 +76,9 @@ type gcScheduler interface { } type local struct { - store images.Store - gc gcScheduler - publisher events.Publisher - warnings warning.Service + store images.Store + gc gcScheduler + warnings warning.Service } var _ imagesapi.ImagesClient = &local{} @@ -136,13 +127,6 @@ func (l *local) Create(ctx context.Context, req *imagesapi.CreateImageRequest, _ resp.Image = imageToProto(&created) - if err := l.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{ - Name: resp.Image.Name, - Labels: resp.Image.Labels, - }); err != nil { - return nil, err - } - l.emitSchema1DeprecationWarning(ctx, &image) return &resp, nil @@ -175,13 +159,6 @@ func (l *local) Update(ctx context.Context, req *imagesapi.UpdateImageRequest, _ resp.Image = imageToProto(&updated) - if err := l.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{ - Name: resp.Image.Name, - Labels: resp.Image.Labels, - }); err != nil { - return nil, err - } - l.emitSchema1DeprecationWarning(ctx, &image) return &resp, nil } @@ -200,12 +177,6 @@ func (l *local) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest, _ return nil, errdefs.ToGRPC(err) } - if err := l.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{ - Name: req.Name, - }); err != nil { - return nil, err - } - if req.Sync { if _, err := l.gc.ScheduleAndWait(ctx); err != nil { return nil, err From 9eb9038a9e16707e276a4781263702020e61785d Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Tue, 6 Feb 2024 21:38:32 -0800 Subject: [PATCH 2/2] Avoid publishing data events during transaction Signed-off-by: Derek McGowan --- core/metadata/db.go | 14 ++++++++++++++ core/metadata/images.go | 12 ++++++------ core/metadata/snapshot.go | 12 ++++++------ 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/core/metadata/db.go b/core/metadata/db.go index 8fb8409f2..303af643d 100644 --- a/core/metadata/db.go +++ b/core/metadata/db.go @@ -270,6 +270,20 @@ func (m *DB) Update(fn func(*bolt.Tx) error) error { return err } +// Publisher returns an event publisher if one is configured +// and the current context is not inside a transaction. +func (m *DB) Publisher(ctx context.Context) events.Publisher { + _, ok := ctx.Value(transactionKey{}).(*bolt.Tx) + if ok { + // Do no publish events within a transaction + return nil + } + if m.dbopts.publisher != nil { + return m.dbopts.publisher + } + return nil +} + // RegisterMutationCallback registers a function to be called after a metadata // mutations has been performed. // diff --git a/core/metadata/images.go b/core/metadata/images.go index 49c9fb7bd..e01ad31d2 100644 --- a/core/metadata/images.go +++ b/core/metadata/images.go @@ -165,8 +165,8 @@ func (s *imageStore) Create(ctx context.Context, image images.Image) (images.Ima return images.Image{}, err } - if s.db.dbopts.publisher != nil { - if err := s.db.dbopts.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{ + if publisher := s.db.Publisher(ctx); publisher != nil { + if err := publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{ Name: image.Name, Labels: image.Labels, }); err != nil { @@ -266,8 +266,8 @@ func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths return images.Image{}, err } - if s.db.dbopts.publisher != nil { - if err := s.db.dbopts.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{ + if publisher := s.db.Publisher(ctx); publisher != nil { + if err := publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{ Name: updated.Name, Labels: updated.Labels, }); err != nil { @@ -333,8 +333,8 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del return err } - if s.db.dbopts.publisher != nil { - if err := s.db.dbopts.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{ + if publisher := s.db.Publisher(ctx); publisher != nil { + if err := publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{ Name: name, }); err != nil { return err diff --git a/core/metadata/snapshot.go b/core/metadata/snapshot.go index af4234baa..d77c95fca 100644 --- a/core/metadata/snapshot.go +++ b/core/metadata/snapshot.go @@ -279,8 +279,8 @@ func (s *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...s return nil, err } - if s.db.dbopts.publisher != nil { - if err := s.db.dbopts.publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{ + if publisher := s.db.Publisher(ctx); publisher != nil { + if err := publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{ Key: key, Parent: parent, Snapshotter: s.name, @@ -634,8 +634,8 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap return err } - if s.db.dbopts.publisher != nil { - if err := s.db.dbopts.publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{ + if publisher := s.db.Publisher(ctx); publisher != nil { + if err := publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{ Key: key, Name: name, Snapshotter: s.name, @@ -704,8 +704,8 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error { return err } - if s.db.dbopts.publisher != nil { - return s.db.dbopts.publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{ + if publisher := s.db.Publisher(ctx); publisher != nil { + return publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{ Key: key, Snapshotter: s.name, })