Move image event publishing to metadata store

The metadata store is in the best place to handle events directly after
the database has been updated. This prevents every user of the image
store interface from having to know whether or not they are responsible
for publishing events and avoid double events if the grpc local service
is used.

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2024-02-06 21:24:55 -08:00
parent 2f807b606a
commit 86530c0afb
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
6 changed files with 39 additions and 77 deletions

View File

@ -25,6 +25,7 @@ import (
"sync/atomic"
"time"
eventstypes "github.com/containerd/containerd/v2/api/events"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/metadata/boltutil"
"github.com/containerd/containerd/v2/pkg/epoch"
@ -164,6 +165,15 @@ func (s *imageStore) Create(ctx context.Context, image images.Image) (images.Ima
return images.Image{}, err
}
if s.db.dbopts.publisher != nil {
if err := s.db.dbopts.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{
Name: image.Name,
Labels: image.Labels,
}); err != nil {
return images.Image{}, err
}
}
return image, nil
}
@ -256,6 +266,15 @@ func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths
return images.Image{}, err
}
if s.db.dbopts.publisher != nil {
if err := s.db.dbopts.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{
Name: updated.Name,
Labels: updated.Labels,
}); err != nil {
return images.Image{}, err
}
}
return updated, nil
}
@ -273,7 +292,7 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del
}
}
return update(ctx, s.db, func(tx *bolt.Tx) error {
err = update(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getImagesBucket(tx, namespace)
if bkt == nil {
return fmt.Errorf("image %q: %w", name, errdefs.ErrNotFound)
@ -310,6 +329,19 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del
return nil
})
if err != nil {
return err
}
if s.db.dbopts.publisher != nil {
if err := s.db.dbopts.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{
Name: name,
}); err != nil {
return err
}
}
return nil
}
func validateImage(image *images.Image) error {

View File

@ -38,7 +38,6 @@ import (
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
eventstypes "github.com/containerd/containerd/v2/api/events"
containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/diff"
containerdimages "github.com/containerd/containerd/v2/core/images"
@ -322,14 +321,6 @@ func (c *CRIImageService) createImageReference(ctx context.Context, name string,
// TODO: Call CRIImageService directly
oldImg, err := c.images.Create(ctx, img)
if err == nil {
if c.publisher != nil {
if err := c.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{
Name: img.Name,
Labels: img.Labels,
}); err != nil {
return err
}
}
return nil
} else if !errdefs.IsAlreadyExists(err) {
return err
@ -338,16 +329,6 @@ func (c *CRIImageService) createImageReference(ctx context.Context, name string,
return nil
}
_, err = c.images.Update(ctx, img, "target", "labels."+crilabels.ImageLabelKey)
if err == nil && c.publisher != nil {
if c.publisher != nil {
if err := c.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{
Name: img.Name,
Labels: img.Labels,
}); err != nil {
return err
}
}
}
return err
}

View File

@ -20,7 +20,6 @@ import (
"context"
"fmt"
eventstypes "github.com/containerd/containerd/v2/api/events"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/pkg/tracing"
"github.com/containerd/errdefs"
@ -63,14 +62,6 @@ func (c *GRPCCRIImageService) RemoveImage(ctx context.Context, r *runtime.Remove
if err := c.imageStore.Update(ctx, ref); err != nil {
return nil, fmt.Errorf("failed to update image reference %q for %q: %w", ref, image.ID, err)
}
if c.publisher != nil {
if err := c.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{
Name: ref,
}); err != nil {
return nil, err
}
}
continue
}
return nil, fmt.Errorf("failed to delete image reference %q for %q: %w", ref, image.ID, err)

View File

@ -28,7 +28,6 @@ import (
imagestore "github.com/containerd/containerd/v2/internal/cri/store/image"
snapshotstore "github.com/containerd/containerd/v2/internal/cri/store/snapshot"
"github.com/containerd/containerd/v2/internal/kmutex"
"github.com/containerd/containerd/v2/pkg/events"
"github.com/containerd/log"
"github.com/containerd/platforms"
docker "github.com/distribution/reference"
@ -54,8 +53,6 @@ type CRIImageService struct {
// images is the lower level image store used for raw storage,
// no event publishing should currently be assumed
images images.Store
// publisher is the events publisher
publisher events.Publisher
// client is a subset of the containerd client
// and will be replaced by image store and transfer service
client imageClient
@ -88,8 +85,6 @@ type CRIImageServiceOptions struct {
Snapshotters map[string]snapshots.Snapshotter
Publisher events.Publisher
Client imageClient
}

View File

@ -28,7 +28,6 @@ import (
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
"github.com/containerd/containerd/v2/internal/cri/constants"
"github.com/containerd/containerd/v2/internal/cri/server/images"
"github.com/containerd/containerd/v2/pkg/events"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/plugins/services/warning"
"github.com/containerd/log"
@ -46,7 +45,6 @@ func init() {
Config: &config,
Requires: []plugin.Type{
plugins.LeasePlugin,
plugins.EventPlugin,
plugins.MetadataPlugin,
plugins.SandboxStorePlugin,
plugins.ServicePlugin, // For client
@ -60,11 +58,6 @@ func init() {
}
mdb := m.(*metadata.DB)
ep, err := ic.GetSingle(plugins.EventPlugin)
if err != nil {
return nil, err
}
if warnings, err := criconfig.ValidateImageConfig(ic.Context, &config); err != nil {
return nil, fmt.Errorf("invalid cri image config: %w", err)
} else if len(warnings) > 0 {
@ -84,7 +77,6 @@ func init() {
RuntimePlatforms: map[string]images.ImagePlatform{},
Snapshotters: map[string]snapshots.Snapshotter{},
ImageFSPaths: map[string]string{},
Publisher: ep.(events.Publisher),
}
options.Client, err = containerd.New(

View File

@ -24,13 +24,11 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
eventstypes "github.com/containerd/containerd/v2/api/events"
imagesapi "github.com/containerd/containerd/v2/api/services/images/v1"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/metadata"
"github.com/containerd/containerd/v2/pkg/deprecation"
"github.com/containerd/containerd/v2/pkg/epoch"
"github.com/containerd/containerd/v2/pkg/events"
"github.com/containerd/containerd/v2/pkg/gc"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/plugins/services"
@ -46,7 +44,6 @@ func init() {
Type: plugins.ServicePlugin,
ID: services.ImagesService,
Requires: []plugin.Type{
plugins.EventPlugin,
plugins.MetadataPlugin,
plugins.GCPlugin,
plugins.WarningPlugin,
@ -60,10 +57,6 @@ func init() {
if err != nil {
return nil, err
}
ep, err := ic.GetSingle(plugins.EventPlugin)
if err != nil {
return nil, err
}
w, err := ic.GetSingle(plugins.WarningPlugin)
if err != nil {
return nil, err
@ -71,7 +64,6 @@ func init() {
return &local{
store: metadata.NewImageStore(m.(*metadata.DB)),
publisher: ep.(events.Publisher),
gc: g.(gcScheduler),
warnings: w.(warning.Service),
}, nil
@ -86,7 +78,6 @@ type gcScheduler interface {
type local struct {
store images.Store
gc gcScheduler
publisher events.Publisher
warnings warning.Service
}
@ -136,13 +127,6 @@ func (l *local) Create(ctx context.Context, req *imagesapi.CreateImageRequest, _
resp.Image = imageToProto(&created)
if err := l.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{
Name: resp.Image.Name,
Labels: resp.Image.Labels,
}); err != nil {
return nil, err
}
l.emitSchema1DeprecationWarning(ctx, &image)
return &resp, nil
@ -175,13 +159,6 @@ func (l *local) Update(ctx context.Context, req *imagesapi.UpdateImageRequest, _
resp.Image = imageToProto(&updated)
if err := l.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{
Name: resp.Image.Name,
Labels: resp.Image.Labels,
}); err != nil {
return nil, err
}
l.emitSchema1DeprecationWarning(ctx, &image)
return &resp, nil
}
@ -200,12 +177,6 @@ func (l *local) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest, _
return nil, errdefs.ToGRPC(err)
}
if err := l.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{
Name: req.Name,
}); err != nil {
return nil, err
}
if req.Sync {
if _, err := l.gc.ScheduleAndWait(ctx); err != nil {
return nil, err