Merge pull request #9779 from dmcgowan/move-image-event-publishing

Move image event publishing to metadata store
This commit is contained in:
Fu Wei 2024-02-07 14:10:42 +00:00 committed by GitHub
commit ff464f3687
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 59 additions and 83 deletions

View File

@ -270,6 +270,20 @@ func (m *DB) Update(fn func(*bolt.Tx) error) error {
return err
}
// Publisher returns an event publisher if one is configured
// and the current context is not inside a transaction.
func (m *DB) Publisher(ctx context.Context) events.Publisher {
_, ok := ctx.Value(transactionKey{}).(*bolt.Tx)
if ok {
// Do no publish events within a transaction
return nil
}
if m.dbopts.publisher != nil {
return m.dbopts.publisher
}
return nil
}
// RegisterMutationCallback registers a function to be called after a metadata
// mutations has been performed.
//

View File

@ -25,6 +25,7 @@ import (
"sync/atomic"
"time"
eventstypes "github.com/containerd/containerd/v2/api/events"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/metadata/boltutil"
"github.com/containerd/containerd/v2/pkg/epoch"
@ -164,6 +165,15 @@ func (s *imageStore) Create(ctx context.Context, image images.Image) (images.Ima
return images.Image{}, err
}
if publisher := s.db.Publisher(ctx); publisher != nil {
if err := publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{
Name: image.Name,
Labels: image.Labels,
}); err != nil {
return images.Image{}, err
}
}
return image, nil
}
@ -256,6 +266,15 @@ func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths
return images.Image{}, err
}
if publisher := s.db.Publisher(ctx); publisher != nil {
if err := publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{
Name: updated.Name,
Labels: updated.Labels,
}); err != nil {
return images.Image{}, err
}
}
return updated, nil
}
@ -273,7 +292,7 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del
}
}
return update(ctx, s.db, func(tx *bolt.Tx) error {
err = update(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getImagesBucket(tx, namespace)
if bkt == nil {
return fmt.Errorf("image %q: %w", name, errdefs.ErrNotFound)
@ -310,6 +329,19 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del
return nil
})
if err != nil {
return err
}
if publisher := s.db.Publisher(ctx); publisher != nil {
if err := publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{
Name: name,
}); err != nil {
return err
}
}
return nil
}
func validateImage(image *images.Image) error {

View File

@ -279,8 +279,8 @@ func (s *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...s
return nil, err
}
if s.db.dbopts.publisher != nil {
if err := s.db.dbopts.publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{
if publisher := s.db.Publisher(ctx); publisher != nil {
if err := publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{
Key: key,
Parent: parent,
Snapshotter: s.name,
@ -634,8 +634,8 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
return err
}
if s.db.dbopts.publisher != nil {
if err := s.db.dbopts.publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{
if publisher := s.db.Publisher(ctx); publisher != nil {
if err := publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{
Key: key,
Name: name,
Snapshotter: s.name,
@ -704,8 +704,8 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error {
return err
}
if s.db.dbopts.publisher != nil {
return s.db.dbopts.publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{
if publisher := s.db.Publisher(ctx); publisher != nil {
return publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{
Key: key,
Snapshotter: s.name,
})

View File

@ -38,7 +38,6 @@ import (
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
eventstypes "github.com/containerd/containerd/v2/api/events"
containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/core/diff"
containerdimages "github.com/containerd/containerd/v2/core/images"
@ -322,14 +321,6 @@ func (c *CRIImageService) createImageReference(ctx context.Context, name string,
// TODO: Call CRIImageService directly
oldImg, err := c.images.Create(ctx, img)
if err == nil {
if c.publisher != nil {
if err := c.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{
Name: img.Name,
Labels: img.Labels,
}); err != nil {
return err
}
}
return nil
} else if !errdefs.IsAlreadyExists(err) {
return err
@ -338,16 +329,6 @@ func (c *CRIImageService) createImageReference(ctx context.Context, name string,
return nil
}
_, err = c.images.Update(ctx, img, "target", "labels."+crilabels.ImageLabelKey)
if err == nil && c.publisher != nil {
if c.publisher != nil {
if err := c.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{
Name: img.Name,
Labels: img.Labels,
}); err != nil {
return err
}
}
}
return err
}

View File

@ -20,7 +20,6 @@ import (
"context"
"fmt"
eventstypes "github.com/containerd/containerd/v2/api/events"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/pkg/tracing"
"github.com/containerd/errdefs"
@ -63,14 +62,6 @@ func (c *GRPCCRIImageService) RemoveImage(ctx context.Context, r *runtime.Remove
if err := c.imageStore.Update(ctx, ref); err != nil {
return nil, fmt.Errorf("failed to update image reference %q for %q: %w", ref, image.ID, err)
}
if c.publisher != nil {
if err := c.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{
Name: ref,
}); err != nil {
return nil, err
}
}
continue
}
return nil, fmt.Errorf("failed to delete image reference %q for %q: %w", ref, image.ID, err)

View File

@ -28,7 +28,6 @@ import (
imagestore "github.com/containerd/containerd/v2/internal/cri/store/image"
snapshotstore "github.com/containerd/containerd/v2/internal/cri/store/snapshot"
"github.com/containerd/containerd/v2/internal/kmutex"
"github.com/containerd/containerd/v2/pkg/events"
"github.com/containerd/log"
"github.com/containerd/platforms"
docker "github.com/distribution/reference"
@ -54,8 +53,6 @@ type CRIImageService struct {
// images is the lower level image store used for raw storage,
// no event publishing should currently be assumed
images images.Store
// publisher is the events publisher
publisher events.Publisher
// client is a subset of the containerd client
// and will be replaced by image store and transfer service
client imageClient
@ -88,8 +85,6 @@ type CRIImageServiceOptions struct {
Snapshotters map[string]snapshots.Snapshotter
Publisher events.Publisher
Client imageClient
}

View File

@ -28,7 +28,6 @@ import (
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
"github.com/containerd/containerd/v2/internal/cri/constants"
"github.com/containerd/containerd/v2/internal/cri/server/images"
"github.com/containerd/containerd/v2/pkg/events"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/plugins/services/warning"
"github.com/containerd/log"
@ -46,7 +45,6 @@ func init() {
Config: &config,
Requires: []plugin.Type{
plugins.LeasePlugin,
plugins.EventPlugin,
plugins.MetadataPlugin,
plugins.SandboxStorePlugin,
plugins.ServicePlugin, // For client
@ -60,11 +58,6 @@ func init() {
}
mdb := m.(*metadata.DB)
ep, err := ic.GetSingle(plugins.EventPlugin)
if err != nil {
return nil, err
}
if warnings, err := criconfig.ValidateImageConfig(ic.Context, &config); err != nil {
return nil, fmt.Errorf("invalid cri image config: %w", err)
} else if len(warnings) > 0 {
@ -84,7 +77,6 @@ func init() {
RuntimePlatforms: map[string]images.ImagePlatform{},
Snapshotters: map[string]snapshots.Snapshotter{},
ImageFSPaths: map[string]string{},
Publisher: ep.(events.Publisher),
}
options.Client, err = containerd.New(

View File

@ -24,13 +24,11 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
eventstypes "github.com/containerd/containerd/v2/api/events"
imagesapi "github.com/containerd/containerd/v2/api/services/images/v1"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/metadata"
"github.com/containerd/containerd/v2/pkg/deprecation"
"github.com/containerd/containerd/v2/pkg/epoch"
"github.com/containerd/containerd/v2/pkg/events"
"github.com/containerd/containerd/v2/pkg/gc"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/plugins/services"
@ -46,7 +44,6 @@ func init() {
Type: plugins.ServicePlugin,
ID: services.ImagesService,
Requires: []plugin.Type{
plugins.EventPlugin,
plugins.MetadataPlugin,
plugins.GCPlugin,
plugins.WarningPlugin,
@ -60,20 +57,15 @@ func init() {
if err != nil {
return nil, err
}
ep, err := ic.GetSingle(plugins.EventPlugin)
if err != nil {
return nil, err
}
w, err := ic.GetSingle(plugins.WarningPlugin)
if err != nil {
return nil, err
}
return &local{
store: metadata.NewImageStore(m.(*metadata.DB)),
publisher: ep.(events.Publisher),
gc: g.(gcScheduler),
warnings: w.(warning.Service),
store: metadata.NewImageStore(m.(*metadata.DB)),
gc: g.(gcScheduler),
warnings: w.(warning.Service),
}, nil
},
})
@ -84,10 +76,9 @@ type gcScheduler interface {
}
type local struct {
store images.Store
gc gcScheduler
publisher events.Publisher
warnings warning.Service
store images.Store
gc gcScheduler
warnings warning.Service
}
var _ imagesapi.ImagesClient = &local{}
@ -136,13 +127,6 @@ func (l *local) Create(ctx context.Context, req *imagesapi.CreateImageRequest, _
resp.Image = imageToProto(&created)
if err := l.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{
Name: resp.Image.Name,
Labels: resp.Image.Labels,
}); err != nil {
return nil, err
}
l.emitSchema1DeprecationWarning(ctx, &image)
return &resp, nil
@ -175,13 +159,6 @@ func (l *local) Update(ctx context.Context, req *imagesapi.UpdateImageRequest, _
resp.Image = imageToProto(&updated)
if err := l.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{
Name: resp.Image.Name,
Labels: resp.Image.Labels,
}); err != nil {
return nil, err
}
l.emitSchema1DeprecationWarning(ctx, &image)
return &resp, nil
}
@ -200,12 +177,6 @@ func (l *local) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest, _
return nil, errdefs.ToGRPC(err)
}
if err := l.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{
Name: req.Name,
}); err != nil {
return nil, err
}
if req.Sync {
if _, err := l.gc.ScheduleAndWait(ctx); err != nil {
return nil, err