@@ -530,12 +530,14 @@ func writeInfo(info *content.Info, bkt *bolt.Bucket) error {
|
||||
return bkt.Put(bucketKeySize, sizeEncoded)
|
||||
}
|
||||
|
||||
func (cs *contentStore) garbageCollect(ctx context.Context) error {
|
||||
lt1 := time.Now()
|
||||
func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, err error) {
|
||||
cs.l.Lock()
|
||||
t1 := time.Now()
|
||||
defer func() {
|
||||
if err == nil {
|
||||
d = time.Now().Sub(t1)
|
||||
}
|
||||
cs.l.Unlock()
|
||||
log.G(ctx).WithField("t", time.Now().Sub(lt1)).Debugf("content garbage collected")
|
||||
}()
|
||||
|
||||
seen := map[string]struct{}{}
|
||||
@@ -570,10 +572,10 @@ func (cs *contentStore) garbageCollect(ctx context.Context) error {
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return cs.Store.Walk(ctx, func(info content.Info) error {
|
||||
err = cs.Store.Walk(ctx, func(info content.Info) error {
|
||||
if _, ok := seen[info.Digest.String()]; !ok {
|
||||
if err := cs.Store.Delete(ctx, info.Digest); err != nil {
|
||||
return err
|
||||
@@ -582,4 +584,5 @@ func (cs *contentStore) garbageCollect(ctx context.Context) error {
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
108
metadata/db.go
108
metadata/db.go
@@ -53,8 +53,9 @@ type DB struct {
|
||||
dirtySS map[string]struct{}
|
||||
dirtyCS bool
|
||||
|
||||
// TODO: Keep track of stats such as pause time, number of collected objects, errors
|
||||
lastCollection time.Time
|
||||
// mutationCallbacks are called after each mutation with the flag
|
||||
// set indicating whether any dirty flags are set
|
||||
mutationCallbacks []func(bool)
|
||||
}
|
||||
|
||||
// NewDB creates a new metadata database using the provided
|
||||
@@ -183,29 +184,53 @@ func (m *DB) View(fn func(*bolt.Tx) error) error {
|
||||
return m.db.View(fn)
|
||||
}
|
||||
|
||||
// Update runs a writable transation on the metadata store.
|
||||
// Update runs a writable transaction on the metadata store.
|
||||
func (m *DB) Update(fn func(*bolt.Tx) error) error {
|
||||
m.wlock.RLock()
|
||||
defer m.wlock.RUnlock()
|
||||
return m.db.Update(fn)
|
||||
err := m.db.Update(fn)
|
||||
if err == nil {
|
||||
m.dirtyL.Lock()
|
||||
dirty := m.dirtyCS || len(m.dirtySS) > 0
|
||||
for _, fn := range m.mutationCallbacks {
|
||||
fn(dirty)
|
||||
}
|
||||
m.dirtyL.Unlock()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// RegisterMutationCallback registers a function to be called after a metadata
|
||||
// mutations has been performed.
|
||||
//
|
||||
// The callback function in an argument for whether a deletion has occurred
|
||||
// since the last garbage collection.
|
||||
func (m *DB) RegisterMutationCallback(fn func(bool)) {
|
||||
m.dirtyL.Lock()
|
||||
m.mutationCallbacks = append(m.mutationCallbacks, fn)
|
||||
m.dirtyL.Unlock()
|
||||
}
|
||||
|
||||
// GCStats holds the duration for the different phases of the garbage collector
|
||||
type GCStats struct {
|
||||
MetaD time.Duration
|
||||
ContentD time.Duration
|
||||
SnapshotD map[string]time.Duration
|
||||
}
|
||||
|
||||
// GarbageCollect starts garbage collection
|
||||
func (m *DB) GarbageCollect(ctx context.Context) error {
|
||||
lt1 := time.Now()
|
||||
func (m *DB) GarbageCollect(ctx context.Context) (stats GCStats, err error) {
|
||||
m.wlock.Lock()
|
||||
defer func() {
|
||||
m.wlock.Unlock()
|
||||
log.G(ctx).WithField("d", time.Now().Sub(lt1)).Debug("metadata garbage collected")
|
||||
}()
|
||||
t1 := time.Now()
|
||||
|
||||
marked, err := m.getMarked(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
m.wlock.Unlock()
|
||||
return GCStats{}, err
|
||||
}
|
||||
|
||||
m.dirtyL.Lock()
|
||||
defer m.dirtyL.Unlock()
|
||||
|
||||
if err := m.db.Update(func(tx *bolt.Tx) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
@@ -232,26 +257,53 @@ func (m *DB) GarbageCollect(ctx context.Context) error {
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
m.dirtyL.Unlock()
|
||||
m.wlock.Unlock()
|
||||
return GCStats{}, err
|
||||
}
|
||||
|
||||
m.lastCollection = time.Now()
|
||||
var wg sync.WaitGroup
|
||||
|
||||
if len(m.dirtySS) > 0 {
|
||||
var sl sync.Mutex
|
||||
stats.SnapshotD = map[string]time.Duration{}
|
||||
wg.Add(len(m.dirtySS))
|
||||
for snapshotterName := range m.dirtySS {
|
||||
log.G(ctx).WithField("snapshotter", snapshotterName).Debug("scheduling snapshotter cleanup")
|
||||
go m.cleanupSnapshotter(snapshotterName)
|
||||
go func(snapshotterName string) {
|
||||
st1 := time.Now()
|
||||
m.cleanupSnapshotter(snapshotterName)
|
||||
|
||||
sl.Lock()
|
||||
stats.SnapshotD[snapshotterName] = time.Now().Sub(st1)
|
||||
sl.Unlock()
|
||||
|
||||
wg.Done()
|
||||
}(snapshotterName)
|
||||
}
|
||||
m.dirtySS = map[string]struct{}{}
|
||||
}
|
||||
|
||||
if m.dirtyCS {
|
||||
wg.Add(1)
|
||||
log.G(ctx).Debug("scheduling content cleanup")
|
||||
go m.cleanupContent()
|
||||
go func() {
|
||||
ct1 := time.Now()
|
||||
m.cleanupContent()
|
||||
stats.ContentD = time.Now().Sub(ct1)
|
||||
wg.Done()
|
||||
}()
|
||||
m.dirtyCS = false
|
||||
}
|
||||
|
||||
return nil
|
||||
m.dirtyL.Unlock()
|
||||
|
||||
stats.MetaD = time.Now().Sub(t1)
|
||||
m.wlock.Unlock()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) {
|
||||
@@ -302,27 +354,35 @@ func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) {
|
||||
return marked, nil
|
||||
}
|
||||
|
||||
func (m *DB) cleanupSnapshotter(name string) {
|
||||
func (m *DB) cleanupSnapshotter(name string) (time.Duration, error) {
|
||||
ctx := context.Background()
|
||||
sn, ok := m.ss[name]
|
||||
if !ok {
|
||||
return
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
err := sn.garbageCollect(ctx)
|
||||
d, err := sn.garbageCollect(ctx)
|
||||
logger := log.G(ctx).WithField("snapshotter", name)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).WithField("snapshotter", name).Warn("garbage collection failed")
|
||||
logger.WithError(err).Warn("snapshot garbage collection failed")
|
||||
} else {
|
||||
logger.WithField("d", d).Debugf("snapshot garbage collected")
|
||||
}
|
||||
return d, err
|
||||
}
|
||||
|
||||
func (m *DB) cleanupContent() {
|
||||
func (m *DB) cleanupContent() (time.Duration, error) {
|
||||
ctx := context.Background()
|
||||
if m.cs == nil {
|
||||
return
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
err := m.cs.garbageCollect(ctx)
|
||||
d, err := m.cs.garbageCollect(ctx)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Warn("content garbage collection failed")
|
||||
} else {
|
||||
log.G(ctx).WithField("d", d).Debugf("content garbage collected")
|
||||
}
|
||||
|
||||
return d, err
|
||||
}
|
||||
|
||||
@@ -235,7 +235,7 @@ func TestMetadataCollector(t *testing.T) {
|
||||
t.Fatalf("Creation failed: %+v", err)
|
||||
}
|
||||
|
||||
if err := mdb.GarbageCollect(ctx); err != nil {
|
||||
if _, err := mdb.GarbageCollect(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -322,7 +322,7 @@ func benchmarkTrigger(n int) func(b *testing.B) {
|
||||
|
||||
//b.StartTimer()
|
||||
|
||||
if err := mdb.GarbageCollect(ctx); err != nil {
|
||||
if _, err := mdb.GarbageCollect(ctx); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
|
||||
@@ -183,7 +183,7 @@ func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths
|
||||
})
|
||||
}
|
||||
|
||||
func (s *imageStore) Delete(ctx context.Context, name string) error {
|
||||
func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.DeleteOpt) error {
|
||||
namespace, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -596,13 +596,14 @@ func validateSnapshot(info *snapshot.Info) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshotter) garbageCollect(ctx context.Context) error {
|
||||
logger := log.G(ctx).WithField("snapshotter", s.name)
|
||||
lt1 := time.Now()
|
||||
func (s *snapshotter) garbageCollect(ctx context.Context) (d time.Duration, err error) {
|
||||
s.l.Lock()
|
||||
t1 := time.Now()
|
||||
defer func() {
|
||||
if err == nil {
|
||||
d = time.Now().Sub(t1)
|
||||
}
|
||||
s.l.Unlock()
|
||||
logger.WithField("t", time.Now().Sub(lt1)).Debugf("garbage collected")
|
||||
}()
|
||||
|
||||
seen := map[string]struct{}{}
|
||||
@@ -646,23 +647,26 @@ func (s *snapshotter) garbageCollect(ctx context.Context) error {
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
roots, err := s.walkTree(ctx, seen)
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// TODO: Unlock before prune (once nodes are fully unavailable)
|
||||
// TODO: Unlock before removal (once nodes are fully unavailable).
|
||||
// This could be achieved through doing prune inside the lock
|
||||
// and having a cleanup method which actually performs the
|
||||
// deletions on the snapshotters which support it.
|
||||
|
||||
for _, node := range roots {
|
||||
if err := s.pruneBranch(ctx, node); err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
type treeNode struct {
|
||||
|
||||
Reference in New Issue
Block a user