diff --git a/gc/gc.go b/gc/gc.go index 66898c5de..b5aad39e6 100644 --- a/gc/gc.go +++ b/gc/gc.go @@ -8,6 +8,7 @@ package gc import ( "context" "sync" + "time" ) // ResourceType represents type of resource at a node @@ -21,6 +22,11 @@ type Node struct { Key string } +// Stats about a garbage collection run +type Stats interface { + Elapsed() time.Duration +} + // Tricolor implements basic, single-thread tri-color GC. Given the roots, the // complete set and a refs function, this function returns a map of all // reachable objects. diff --git a/gc/scheduler/scheduler.go b/gc/scheduler/scheduler.go index 3aa37b6e9..fdb0981e6 100644 --- a/gc/scheduler/scheduler.go +++ b/gc/scheduler/scheduler.go @@ -2,14 +2,14 @@ package scheduler import ( "context" - "errors" "fmt" "sync" "time" + "github.com/containerd/containerd/gc" "github.com/containerd/containerd/log" - "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/plugin" + "github.com/pkg/errors" ) // config configures the garbage collection policies. @@ -95,7 +95,12 @@ func init() { return nil, err } - m := newScheduler(md.(*metadata.DB), ic.Config.(*config)) + mdCollector, ok := md.(collector) + if !ok { + return nil, errors.Errorf("%s %T must implement collector", plugin.MetadataPlugin, md) + } + + m := newScheduler(mdCollector, ic.Config.(*config)) ic.Meta.Exports = map[string]string{ "PauseThreshold": fmt.Sprint(m.pauseThreshold), @@ -119,7 +124,7 @@ type mutationEvent struct { type collector interface { RegisterMutationCallback(func(bool)) - GarbageCollect(context.Context) (metadata.GCStats, error) + GarbageCollect(context.Context) (gc.Stats, error) } type gcScheduler struct { @@ -128,7 +133,7 @@ type gcScheduler struct { eventC chan mutationEvent waiterL sync.Mutex - waiters []chan metadata.GCStats + waiters []chan gc.Stats pauseThreshold float64 deletionThreshold int @@ -171,12 +176,12 @@ func newScheduler(c collector, cfg *config) *gcScheduler { return s } -func (s *gcScheduler) ScheduleAndWait(ctx context.Context) (metadata.GCStats, error) { +func (s *gcScheduler) ScheduleAndWait(ctx context.Context) (gc.Stats, error) { return s.wait(ctx, true) } -func (s *gcScheduler) wait(ctx context.Context, trigger bool) (metadata.GCStats, error) { - wc := make(chan metadata.GCStats, 1) +func (s *gcScheduler) wait(ctx context.Context, trigger bool) (gc.Stats, error) { + wc := make(chan gc.Stats, 1) s.waiterL.Lock() s.waiters = append(s.waiters, wc) s.waiterL.Unlock() @@ -190,15 +195,15 @@ func (s *gcScheduler) wait(ctx context.Context, trigger bool) (metadata.GCStats, }() } - var gcStats metadata.GCStats + var gcStats gc.Stats select { case stats, ok := <-wc: if !ok { - return metadata.GCStats{}, errors.New("gc failed") + return gcStats, errors.New("gc failed") } gcStats = stats case <-ctx.Done(): - return metadata.GCStats{}, ctx.Err() + return gcStats, ctx.Err() } return gcStats, nil @@ -301,9 +306,9 @@ func (s *gcScheduler) run(ctx context.Context) { continue } - log.G(ctx).WithField("d", stats.MetaD).Debug("garbage collected") + log.G(ctx).WithField("d", stats.Elapsed()).Debug("garbage collected") - gcTime += stats.MetaD + gcTime += stats.Elapsed() collections++ triggered = false deletions = 0 diff --git a/gc/scheduler/scheduler_test.go b/gc/scheduler/scheduler_test.go index 103dc7f2a..460c833ac 100644 --- a/gc/scheduler/scheduler_test.go +++ b/gc/scheduler/scheduler_test.go @@ -6,11 +6,11 @@ import ( "testing" "time" - "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/gc" + "github.com/stretchr/testify/require" ) func TestPauseThreshold(t *testing.T) { - cfg := &config{ // With 100μs, gc should run about every 5ms PauseThreshold: 0.02, @@ -99,7 +99,7 @@ func TestTrigger(t *testing.T) { } ctx, cancel = context.WithCancel(context.Background()) scheduler = newScheduler(tc, cfg) - stats metadata.GCStats + stats gc.Stats err error ) @@ -123,9 +123,7 @@ func TestTrigger(t *testing.T) { t.Fatalf("GC failed: %#v", err) } - if stats.MetaD != tc.d { - t.Fatalf("unexpected gc duration: %s, expected %d", stats.MetaD, tc.d) - } + require.Equal(t, tc.d, stats.Elapsed()) if c := tc.runCount(); c != 1 { t.Fatalf("unexpected gc run count %d, expected 1", c) @@ -180,11 +178,18 @@ func (tc *testCollector) RegisterMutationCallback(f func(bool)) { tc.callbacks = append(tc.callbacks, f) } -func (tc *testCollector) GarbageCollect(context.Context) (metadata.GCStats, error) { +func (tc *testCollector) GarbageCollect(context.Context) (gc.Stats, error) { tc.m.Lock() tc.gc++ tc.m.Unlock() - return metadata.GCStats{ - MetaD: tc.d, - }, nil + return gcStats{elapsed: tc.d}, nil +} + +type gcStats struct { + elapsed time.Duration +} + +// Elapsed returns the duration which elapsed during a collection +func (s gcStats) Elapsed() time.Duration { + return s.elapsed } diff --git a/metadata/db.go b/metadata/db.go index 8be62a95c..5e830ad82 100644 --- a/metadata/db.go +++ b/metadata/db.go @@ -204,7 +204,7 @@ func (m *DB) Update(fn func(*bolt.Tx) error) error { // RegisterMutationCallback registers a function to be called after a metadata // mutations has been performed. // -// The callback function in an argument for whether a deletion has occurred +// The callback function is an argument for whether a deletion has occurred // since the last garbage collection. func (m *DB) RegisterMutationCallback(fn func(bool)) { m.dirtyL.Lock() @@ -219,15 +219,20 @@ type GCStats struct { SnapshotD map[string]time.Duration } +// Elapsed returns the duration which elapsed during a collection +func (s GCStats) Elapsed() time.Duration { + return s.MetaD +} + // GarbageCollect starts garbage collection -func (m *DB) GarbageCollect(ctx context.Context) (stats GCStats, err error) { +func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) { m.wlock.Lock() t1 := time.Now() marked, err := m.getMarked(ctx) if err != nil { m.wlock.Unlock() - return GCStats{}, err + return nil, err } m.dirtyL.Lock() @@ -259,9 +264,10 @@ func (m *DB) GarbageCollect(ctx context.Context) (stats GCStats, err error) { }); err != nil { m.dirtyL.Unlock() m.wlock.Unlock() - return GCStats{}, err + return nil, err } + var stats GCStats var wg sync.WaitGroup if len(m.dirtySS) > 0 { @@ -303,7 +309,7 @@ func (m *DB) GarbageCollect(ctx context.Context) (stats GCStats, err error) { wg.Wait() - return + return stats, err } func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) { diff --git a/services/images/service.go b/services/images/service.go index cef3401dd..0a6ccec57 100644 --- a/services/images/service.go +++ b/services/images/service.go @@ -8,6 +8,7 @@ import ( imagesapi "github.com/containerd/containerd/api/services/images/v1" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" + "github.com/containerd/containerd/gc" "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata" @@ -43,7 +44,7 @@ func init() { } type gcScheduler interface { - ScheduleAndWait(gocontext.Context) (metadata.GCStats, error) + ScheduleAndWait(gocontext.Context) (gc.Stats, error) } type service struct {