diff --git a/cmd/containerd/builtins.go b/cmd/containerd/builtins.go index 2112548a6..78c477d2d 100644 --- a/cmd/containerd/builtins.go +++ b/cmd/containerd/builtins.go @@ -3,6 +3,7 @@ package main // register containerd builtins here import ( _ "github.com/containerd/containerd/diff/walking" + _ "github.com/containerd/containerd/gc/policy" _ "github.com/containerd/containerd/services/containers" _ "github.com/containerd/containerd/services/content" _ "github.com/containerd/containerd/services/diff" diff --git a/gc/scheduler/scheduler.go b/gc/scheduler/scheduler.go new file mode 100644 index 000000000..d0504cb10 --- /dev/null +++ b/gc/scheduler/scheduler.go @@ -0,0 +1,318 @@ +package scheduler + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/plugin" +) + +// Config configures the garbage collection policies. +type Config struct { + // PauseThreshold represents the maximum amount of time garbage + // collection should be scheduled based on the average pause time. + // For example, a value of 0.02 means that scheduled garbage collection + // pauses should present at most 2% of real time, + // or 20ms of every second. + // + // A maximum value of .5 is enforced to prevent over scheduling of the + // garbage collector, trigger options are available to run in a more + // predictable time frame after mutation. + // + // Default is 0.02 + PauseThreshold float64 `toml:"pause_threshold"` + + // DeletionThreshold is used to guarantee that a garbage collection is + // scheduled after configured number of deletions have occurred + // since the previous garbage collection. A value of 0 indicates that + // garbage collection will not be triggered by deletion count. + // + // Default 0 + DeletionThreshold int `toml:"deletion_threshold"` + + // MutationThreshold is used to guarantee that a garbage collection is + // run after a configured number of database mutations have occurred + // since the previous garbage collection. A value of 0 indicates that + // garbage collection will only be run after a manual trigger or + // deletion. Unlike the deletion threshold, the mutation threshold does + // not cause scheduling of a garbage collection, but ensures GC is run + // at the next scheduled GC. + // + // Default 100 + MutationThreshold int `toml:"mutation_threshold"` + + // ScheduleDelayMs is the number of milliseconds in the future to + // schedule a garbage collection triggered manually or by exceeding + // the configured threshold for deletion or mutation. A zero value + // will immediately schedule. + // + // Default is 0 + ScheduleDelayMs int `toml:"schedule_delay_ms"` + + // StartupDelayMs is the number of milliseconds to do an initial + // garbage collection after startup. The initial garbage collection + // is used to set the base for pause threshold and should be scheduled + // in the future to avoid slowing down other startup processes. + // + // Default is 100 + StartupDelayMs int `toml:"startup_delay_ms"` +} + +func init() { + plugin.Register(&plugin.Registration{ + Type: plugin.GCPlugin, + ID: "scheduler", + Requires: []plugin.Type{ + plugin.MetadataPlugin, + }, + Config: &Config{ + PauseThreshold: 0.02, + DeletionThreshold: 0, + MutationThreshold: 100, + ScheduleDelayMs: 0, + StartupDelayMs: 100, + }, + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + md, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + + m := newScheduler(md.(*metadata.DB), ic.Config.(*Config)) + + ic.Meta.Exports = map[string]string{ + "PauseThreshold": fmt.Sprint(m.pauseThreshold), + "DeletionThreshold": fmt.Sprint(m.deletionThreshold), + "MutationThreshold": fmt.Sprint(m.mutationThreshold), + "ScheduleDelay": fmt.Sprint(m.scheduleDelay), + } + + go m.run(ic.Context) + + return m, nil + }, + }) +} + +type mutationEvent struct { + ts time.Time + mutation bool + dirty bool +} + +type collector interface { + RegisterMutationCallback(func(bool)) + GarbageCollect(context.Context) (metadata.GCStats, error) +} + +type gcScheduler struct { + c collector + + eventC chan mutationEvent + + waiterL sync.Mutex + waiters []chan metadata.GCStats + + pauseThreshold float64 + deletionThreshold int + mutationThreshold int + scheduleDelay time.Duration + startupDelay time.Duration +} + +func newScheduler(c collector, cfg *Config) *gcScheduler { + eventC := make(chan mutationEvent) + + s := &gcScheduler{ + c: c, + eventC: eventC, + pauseThreshold: cfg.PauseThreshold, + deletionThreshold: cfg.DeletionThreshold, + mutationThreshold: cfg.MutationThreshold, + scheduleDelay: time.Duration(cfg.ScheduleDelayMs) * time.Millisecond, + startupDelay: time.Duration(cfg.StartupDelayMs) * time.Millisecond, + } + + if s.pauseThreshold < 0.0 { + s.pauseThreshold = 0.0 + } + if s.pauseThreshold > 0.5 { + s.pauseThreshold = 0.5 + } + if s.mutationThreshold < 0 { + s.mutationThreshold = 0 + } + if s.scheduleDelay < 0 { + s.scheduleDelay = 0 + } + if s.startupDelay < 0 { + s.startupDelay = 0 + } + + c.RegisterMutationCallback(s.mutationCallback) + + return s +} + +func (s *gcScheduler) ScheduleAndWait(ctx context.Context) (metadata.GCStats, error) { + return s.wait(ctx, true) +} + +func (s *gcScheduler) wait(ctx context.Context, trigger bool) (metadata.GCStats, error) { + wc := make(chan metadata.GCStats, 1) + s.waiterL.Lock() + s.waiters = append(s.waiters, wc) + s.waiterL.Unlock() + + if trigger { + e := mutationEvent{ + ts: time.Now(), + } + go func() { + s.eventC <- e + }() + } + + var gcStats metadata.GCStats + select { + case stats, ok := <-wc: + if !ok { + return metadata.GCStats{}, errors.New("gc failed") + } + gcStats = stats + case <-ctx.Done(): + return metadata.GCStats{}, ctx.Err() + } + + return gcStats, nil +} + +func (s *gcScheduler) mutationCallback(dirty bool) { + e := mutationEvent{ + ts: time.Now(), + mutation: true, + dirty: dirty, + } + go func() { + s.eventC <- e + }() +} + +func schedule(d time.Duration) (<-chan time.Time, *time.Time) { + next := time.Now().Add(d) + return time.After(d), &next +} + +func (s *gcScheduler) run(ctx context.Context) { + var ( + schedC <-chan time.Time + + lastCollection *time.Time + nextCollection *time.Time + + interval = time.Second + gcTime time.Duration + collections int + + triggered bool + deletions int + mutations int + ) + if s.startupDelay > 0 { + schedC, nextCollection = schedule(s.startupDelay) + } + for { + select { + case <-schedC: + // Check if garbage collection can be skipped because + // it is not needed or was not requested and reschedule + // it to attempt again after another time interval. + if !triggered && lastCollection != nil && deletions == 0 && + (s.mutationThreshold == 0 || mutations < s.mutationThreshold) { + schedC, nextCollection = schedule(interval) + continue + } + break + case e := <-s.eventC: + if lastCollection != nil && lastCollection.After(e.ts) { + continue + } + if e.dirty { + deletions++ + } + if e.mutation { + mutations++ + } else { + triggered = true + } + + // Check if condition should cause immediate collection. + if triggered || + (s.deletionThreshold > 0 && deletions >= s.deletionThreshold) || + (nextCollection == nil && ((s.deletionThreshold == 0 && deletions > 0) || + (s.mutationThreshold > 0 && mutations >= s.mutationThreshold))) { + // Check if not already scheduled before delay threshold + if nextCollection == nil || nextCollection.After(time.Now().Add(s.scheduleDelay)) { + schedC, nextCollection = schedule(s.scheduleDelay) + } + } + + continue + case <-ctx.Done(): + return + } + + s.waiterL.Lock() + + stats, err := s.c.GarbageCollect(ctx) + last := time.Now() + if err != nil { + log.G(ctx).WithError(err).Error("garbage collection failed") + + // Reschedule garbage collection for same duration + 1 second + schedC, nextCollection = schedule(nextCollection.Sub(*lastCollection) + time.Second) + + // Update last collection time even though failure occured + lastCollection = &last + + for _, w := range s.waiters { + close(w) + } + s.waiters = nil + s.waiterL.Unlock() + continue + } + + log.G(ctx).WithField("d", stats.MetaD).Debug("garbage collected") + + gcTime += stats.MetaD + collections++ + triggered = false + deletions = 0 + mutations = 0 + + // Calculate new interval with updated times + if s.pauseThreshold > 0.0 { + // Set interval to average gc time divided by the pause threshold + // This algorithm ensures that a gc is scheduled to allow enough + // runtime in between gc to reach the pause threshold. + // Pause threshold is always 0.0 < threshold <= 0.5 + avg := float64(gcTime) / float64(collections) + interval = time.Duration(avg/s.pauseThreshold - avg) + } + + lastCollection = &last + schedC, nextCollection = schedule(interval) + + for _, w := range s.waiters { + w <- stats + } + s.waiters = nil + s.waiterL.Unlock() + } +} diff --git a/gc/scheduler/scheduler_test.go b/gc/scheduler/scheduler_test.go new file mode 100644 index 000000000..ee77297ce --- /dev/null +++ b/gc/scheduler/scheduler_test.go @@ -0,0 +1,188 @@ +package scheduler + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/containerd/containerd/metadata" +) + +func TestPauseThreshold(t *testing.T) { + cfg := &Config{ + // With 100μs, gc should run about every 5ms + PauseThreshold: 0.02, + } + tc := &testCollector{ + d: time.Microsecond * 100, + } + + scheduler := newScheduler(tc, cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go scheduler.run(ctx) + + // Ensure every possible GC cycle runs + go func() { + tick := time.NewTicker(time.Microsecond * 100) + for { + select { + case <-tick.C: + tc.trigger(true) + case <-ctx.Done(): + return + } + } + }() + + time.Sleep(time.Millisecond * 15) + if c := tc.runCount(); c < 3 || c > 4 { + t.Fatalf("unexpected gc run count %d, expected between 5 and 6", c) + } +} + +func TestDeletionThreshold(t *testing.T) { + cfg := &Config{ + // Prevent GC from scheduling again before check + PauseThreshold: 0.001, + DeletionThreshold: 5, + } + tc := &testCollector{ + d: time.Second, + } + + scheduler := newScheduler(tc, cfg) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go scheduler.run(ctx) + + // Block until next GC finishes + gcWait := make(chan struct{}) + go func() { + scheduler.wait(ctx, false) + close(gcWait) + }() + + // Increment deletion count 5, checking GC hasn't run in + // between each call + for i := 0; i < 5; i++ { + time.Sleep(time.Millisecond) + if c := tc.runCount(); c != 0 { + t.Fatalf("GC ran unexpectedly") + } + tc.trigger(true) + } + + select { + case <-gcWait: + case <-time.After(time.Millisecond * 10): + t.Fatal("GC wait timed out") + } + + if c := tc.runCount(); c != 1 { + t.Fatalf("unexpected gc run count %d, expected 1", c) + } +} + +func TestTrigger(t *testing.T) { + var ( + cfg = &Config{} + tc = &testCollector{ + d: time.Millisecond * 10, + } + ctx, cancel = context.WithCancel(context.Background()) + scheduler = newScheduler(tc, cfg) + stats metadata.GCStats + err error + ) + + defer cancel() + go scheduler.run(ctx) + + // Block until next GC finishes + gcWait := make(chan struct{}) + go func() { + stats, err = scheduler.ScheduleAndWait(ctx) + close(gcWait) + }() + + select { + case <-gcWait: + case <-time.After(time.Millisecond * 10): + t.Fatal("GC wait timed out") + } + + if err != nil { + t.Fatalf("GC failed: %#v", err) + } + + if stats.MetaD != tc.d { + t.Fatalf("unexpected gc duration: %s, expected %d", stats.MetaD, tc.d) + } + + if c := tc.runCount(); c != 1 { + t.Fatalf("unexpected gc run count %d, expected 1", c) + } +} + +func TestStartupDelay(t *testing.T) { + var ( + cfg = &Config{ + // Prevent GC from scheduling again before check + PauseThreshold: 0.001, + StartupDelayMs: 1, + } + tc = &testCollector{ + d: time.Second, + } + ctx, cancel = context.WithCancel(context.Background()) + scheduler = newScheduler(tc, cfg) + ) + defer cancel() + go scheduler.run(ctx) + + time.Sleep(time.Millisecond * 5) + + if c := tc.runCount(); c != 1 { + t.Fatalf("unexpected gc run count %d, expected 1", c) + } +} + +type testCollector struct { + d time.Duration + gc int + m sync.Mutex + + callbacks []func(bool) +} + +func (tc *testCollector) trigger(delete bool) { + for _, f := range tc.callbacks { + f(delete) + } +} + +func (tc *testCollector) runCount() int { + tc.m.Lock() + c := tc.gc + tc.m.Unlock() + return c +} + +func (tc *testCollector) RegisterMutationCallback(f func(bool)) { + tc.callbacks = append(tc.callbacks, f) +} + +func (tc *testCollector) GarbageCollect(context.Context) (metadata.GCStats, error) { + tc.m.Lock() + tc.gc++ + tc.m.Unlock() + return metadata.GCStats{ + MetaD: tc.d, + }, nil +} diff --git a/metadata/content.go b/metadata/content.go index 1a9b16af9..c13f7867e 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -530,12 +530,14 @@ func writeInfo(info *content.Info, bkt *bolt.Bucket) error { return bkt.Put(bucketKeySize, sizeEncoded) } -func (cs *contentStore) garbageCollect(ctx context.Context) error { - lt1 := time.Now() +func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, err error) { cs.l.Lock() + t1 := time.Now() defer func() { + if err == nil { + d = time.Now().Sub(t1) + } cs.l.Unlock() - log.G(ctx).WithField("t", time.Now().Sub(lt1)).Debugf("content garbage collected") }() seen := map[string]struct{}{} @@ -570,10 +572,10 @@ func (cs *contentStore) garbageCollect(ctx context.Context) error { return nil }); err != nil { - return err + return 0, err } - return cs.Store.Walk(ctx, func(info content.Info) error { + err = cs.Store.Walk(ctx, func(info content.Info) error { if _, ok := seen[info.Digest.String()]; !ok { if err := cs.Store.Delete(ctx, info.Digest); err != nil { return err @@ -582,4 +584,5 @@ func (cs *contentStore) garbageCollect(ctx context.Context) error { } return nil }) + return } diff --git a/metadata/db.go b/metadata/db.go index 7c366ebcc..0bbc0c1ef 100644 --- a/metadata/db.go +++ b/metadata/db.go @@ -53,8 +53,9 @@ type DB struct { dirtySS map[string]struct{} dirtyCS bool - // TODO: Keep track of stats such as pause time, number of collected objects, errors - lastCollection time.Time + // mutationCallbacks are called after each mutation with the flag + // set indicating whether any dirty flags are set + mutationCallbacks []func(bool) } // NewDB creates a new metadata database using the provided @@ -183,29 +184,53 @@ func (m *DB) View(fn func(*bolt.Tx) error) error { return m.db.View(fn) } -// Update runs a writable transation on the metadata store. +// Update runs a writable transaction on the metadata store. func (m *DB) Update(fn func(*bolt.Tx) error) error { m.wlock.RLock() defer m.wlock.RUnlock() - return m.db.Update(fn) + err := m.db.Update(fn) + if err == nil { + m.dirtyL.Lock() + dirty := m.dirtyCS || len(m.dirtySS) > 0 + for _, fn := range m.mutationCallbacks { + fn(dirty) + } + m.dirtyL.Unlock() + } + + return err +} + +// RegisterMutationCallback registers a function to be called after a metadata +// mutations has been performed. +// +// The callback function in an argument for whether a deletion has occurred +// since the last garbage collection. +func (m *DB) RegisterMutationCallback(fn func(bool)) { + m.dirtyL.Lock() + m.mutationCallbacks = append(m.mutationCallbacks, fn) + m.dirtyL.Unlock() +} + +// GCStats holds the duration for the different phases of the garbage collector +type GCStats struct { + MetaD time.Duration + ContentD time.Duration + SnapshotD map[string]time.Duration } // GarbageCollect starts garbage collection -func (m *DB) GarbageCollect(ctx context.Context) error { - lt1 := time.Now() +func (m *DB) GarbageCollect(ctx context.Context) (stats GCStats, err error) { m.wlock.Lock() - defer func() { - m.wlock.Unlock() - log.G(ctx).WithField("d", time.Now().Sub(lt1)).Debug("metadata garbage collected") - }() + t1 := time.Now() marked, err := m.getMarked(ctx) if err != nil { - return err + m.wlock.Unlock() + return GCStats{}, err } m.dirtyL.Lock() - defer m.dirtyL.Unlock() if err := m.db.Update(func(tx *bolt.Tx) error { ctx, cancel := context.WithCancel(ctx) @@ -232,26 +257,53 @@ func (m *DB) GarbageCollect(ctx context.Context) error { return nil }); err != nil { - return err + m.dirtyL.Unlock() + m.wlock.Unlock() + return GCStats{}, err } - m.lastCollection = time.Now() + var wg sync.WaitGroup if len(m.dirtySS) > 0 { + var sl sync.Mutex + stats.SnapshotD = map[string]time.Duration{} + wg.Add(len(m.dirtySS)) for snapshotterName := range m.dirtySS { log.G(ctx).WithField("snapshotter", snapshotterName).Debug("scheduling snapshotter cleanup") - go m.cleanupSnapshotter(snapshotterName) + go func(snapshotterName string) { + st1 := time.Now() + m.cleanupSnapshotter(snapshotterName) + + sl.Lock() + stats.SnapshotD[snapshotterName] = time.Now().Sub(st1) + sl.Unlock() + + wg.Done() + }(snapshotterName) } m.dirtySS = map[string]struct{}{} } if m.dirtyCS { + wg.Add(1) log.G(ctx).Debug("scheduling content cleanup") - go m.cleanupContent() + go func() { + ct1 := time.Now() + m.cleanupContent() + stats.ContentD = time.Now().Sub(ct1) + wg.Done() + }() m.dirtyCS = false } - return nil + m.dirtyL.Unlock() + + stats.MetaD = time.Now().Sub(t1) + m.wlock.Unlock() + + wg.Wait() + + return } func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) { @@ -302,27 +354,35 @@ func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) { return marked, nil } -func (m *DB) cleanupSnapshotter(name string) { +func (m *DB) cleanupSnapshotter(name string) (time.Duration, error) { ctx := context.Background() sn, ok := m.ss[name] if !ok { - return + return 0, nil } - err := sn.garbageCollect(ctx) + d, err := sn.garbageCollect(ctx) + logger := log.G(ctx).WithField("snapshotter", name) if err != nil { - log.G(ctx).WithError(err).WithField("snapshotter", name).Warn("garbage collection failed") + logger.WithError(err).Warn("snapshot garbage collection failed") + } else { + logger.WithField("d", d).Debugf("snapshot garbage collected") } + return d, err } -func (m *DB) cleanupContent() { +func (m *DB) cleanupContent() (time.Duration, error) { ctx := context.Background() if m.cs == nil { - return + return 0, nil } - err := m.cs.garbageCollect(ctx) + d, err := m.cs.garbageCollect(ctx) if err != nil { log.G(ctx).WithError(err).Warn("content garbage collection failed") + } else { + log.G(ctx).WithField("d", d).Debugf("content garbage collected") } + + return d, err } diff --git a/metadata/db_test.go b/metadata/db_test.go index cbaa22f2e..5dd721ec2 100644 --- a/metadata/db_test.go +++ b/metadata/db_test.go @@ -235,7 +235,7 @@ func TestMetadataCollector(t *testing.T) { t.Fatalf("Creation failed: %+v", err) } - if err := mdb.GarbageCollect(ctx); err != nil { + if _, err := mdb.GarbageCollect(ctx); err != nil { t.Fatal(err) } @@ -322,7 +322,7 @@ func benchmarkTrigger(n int) func(b *testing.B) { //b.StartTimer() - if err := mdb.GarbageCollect(ctx); err != nil { + if _, err := mdb.GarbageCollect(ctx); err != nil { b.Fatal(err) } diff --git a/metadata/snapshot.go b/metadata/snapshot.go index cdc0768c9..4dcd845d7 100644 --- a/metadata/snapshot.go +++ b/metadata/snapshot.go @@ -604,13 +604,14 @@ func validateSnapshot(info *snapshot.Info) error { return nil } -func (s *snapshotter) garbageCollect(ctx context.Context) error { - logger := log.G(ctx).WithField("snapshotter", s.name) - lt1 := time.Now() +func (s *snapshotter) garbageCollect(ctx context.Context) (d time.Duration, err error) { s.l.Lock() + t1 := time.Now() defer func() { + if err == nil { + d = time.Now().Sub(t1) + } s.l.Unlock() - logger.WithField("t", time.Now().Sub(lt1)).Debugf("garbage collected") }() seen := map[string]struct{}{} @@ -654,23 +655,26 @@ func (s *snapshotter) garbageCollect(ctx context.Context) error { return nil }); err != nil { - return err + return 0, err } roots, err := s.walkTree(ctx, seen) if err != nil { - return err + return 0, err } - // TODO: Unlock before prune (once nodes are fully unavailable) + // TODO: Unlock before removal (once nodes are fully unavailable). + // This could be achieved through doing prune inside the lock + // and having a cleanup method which actually performs the + // deletions on the snapshotters which support it. for _, node := range roots { if err := s.pruneBranch(ctx, node); err != nil { - return err + return 0, err } } - return nil + return } type treeNode struct { diff --git a/plugin/plugin.go b/plugin/plugin.go index 9bda46cbf..5746bf72d 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -54,6 +54,8 @@ const ( MetadataPlugin Type = "io.containerd.metadata.v1" // ContentPlugin implements a content store ContentPlugin Type = "io.containerd.content.v1" + // GCPlugin implements garbage collection policy + GCPlugin Type = "io.containerd.gc.v1" ) // Registration contains information for registering a plugin diff --git a/services/containers/service.go b/services/containers/service.go index dfcb1957d..40fe97ad3 100644 --- a/services/containers/service.go +++ b/services/containers/service.go @@ -10,7 +10,6 @@ import ( "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/plugin" ptypes "github.com/gogo/protobuf/types" - "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -162,10 +161,6 @@ func (s *service) Delete(ctx context.Context, req *api.DeleteContainerRequest) ( return &ptypes.Empty{}, err } - if err := s.db.GarbageCollect(ctx); err != nil { - return &ptypes.Empty{}, errdefs.ToGRPC(errors.Wrap(err, "garbage collection failed")) - } - return &ptypes.Empty{}, nil } diff --git a/services/images/service.go b/services/images/service.go index 23dcdf33e..4e52d51de 100644 --- a/services/images/service.go +++ b/services/images/service.go @@ -10,7 +10,6 @@ import ( "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/plugin" ptypes "github.com/gogo/protobuf/types" - "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -163,10 +162,6 @@ func (s *service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest) return nil, err } - if err := s.db.GarbageCollect(ctx); err != nil { - return nil, errdefs.ToGRPC(errors.Wrap(err, "garbage collection failed")) - } - return &ptypes.Empty{}, nil }