diff --git a/core/metadata/db.go b/core/metadata/db.go index 8fb8409f2..303af643d 100644 --- a/core/metadata/db.go +++ b/core/metadata/db.go @@ -270,6 +270,20 @@ func (m *DB) Update(fn func(*bolt.Tx) error) error { return err } +// Publisher returns an event publisher if one is configured +// and the current context is not inside a transaction. +func (m *DB) Publisher(ctx context.Context) events.Publisher { + _, ok := ctx.Value(transactionKey{}).(*bolt.Tx) + if ok { + // Do no publish events within a transaction + return nil + } + if m.dbopts.publisher != nil { + return m.dbopts.publisher + } + return nil +} + // RegisterMutationCallback registers a function to be called after a metadata // mutations has been performed. // diff --git a/core/metadata/images.go b/core/metadata/images.go index 49c9fb7bd..e01ad31d2 100644 --- a/core/metadata/images.go +++ b/core/metadata/images.go @@ -165,8 +165,8 @@ func (s *imageStore) Create(ctx context.Context, image images.Image) (images.Ima return images.Image{}, err } - if s.db.dbopts.publisher != nil { - if err := s.db.dbopts.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{ + if publisher := s.db.Publisher(ctx); publisher != nil { + if err := publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{ Name: image.Name, Labels: image.Labels, }); err != nil { @@ -266,8 +266,8 @@ func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths return images.Image{}, err } - if s.db.dbopts.publisher != nil { - if err := s.db.dbopts.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{ + if publisher := s.db.Publisher(ctx); publisher != nil { + if err := publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{ Name: updated.Name, Labels: updated.Labels, }); err != nil { @@ -333,8 +333,8 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del return err } - if s.db.dbopts.publisher != nil { - if err := s.db.dbopts.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{ + if publisher := s.db.Publisher(ctx); publisher != nil { + if err := publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{ Name: name, }); err != nil { return err diff --git a/core/metadata/snapshot.go b/core/metadata/snapshot.go index af4234baa..d77c95fca 100644 --- a/core/metadata/snapshot.go +++ b/core/metadata/snapshot.go @@ -279,8 +279,8 @@ func (s *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...s return nil, err } - if s.db.dbopts.publisher != nil { - if err := s.db.dbopts.publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{ + if publisher := s.db.Publisher(ctx); publisher != nil { + if err := publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{ Key: key, Parent: parent, Snapshotter: s.name, @@ -634,8 +634,8 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap return err } - if s.db.dbopts.publisher != nil { - if err := s.db.dbopts.publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{ + if publisher := s.db.Publisher(ctx); publisher != nil { + if err := publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{ Key: key, Name: name, Snapshotter: s.name, @@ -704,8 +704,8 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error { return err } - if s.db.dbopts.publisher != nil { - return s.db.dbopts.publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{ + if publisher := s.db.Publisher(ctx); publisher != nil { + return publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{ Key: key, Snapshotter: s.name, })