Move gc/scheduler to plugins/gc

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan
2024-01-17 09:55:03 -08:00
parent c38f2ab724
commit 11114b0a9a
6 changed files with 3 additions and 3 deletions

34
plugins/gc/metrics.go Normal file
View File

@@ -0,0 +1,34 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import "github.com/docker/go-metrics"
var (
// collectionCounter metrics for counter of gc scheduler collections.
collectionCounter metrics.LabeledCounter
// gcTimeHist histogram metrics for duration of gc scheduler collections.
gcTimeHist metrics.Timer
)
func init() {
ns := metrics.NewNamespace("containerd", "gc", nil)
collectionCounter = ns.NewLabeledCounter("collections", "counter of gc scheduler collections", "status")
gcTimeHist = ns.NewTimer("gc", "duration of gc scheduler collections")
metrics.Register(ns)
}

360
plugins/gc/scheduler.go Normal file
View File

@@ -0,0 +1,360 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/containerd/containerd/v2/gc"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/log"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
)
// config configures the garbage collection policies.
type config struct {
// PauseThreshold represents the maximum amount of time garbage
// collection should be scheduled based on the average pause time.
// For example, a value of 0.02 means that scheduled garbage collection
// pauses should present at most 2% of real time,
// or 20ms of every second.
//
// A maximum value of .5 is enforced to prevent over scheduling of the
// garbage collector, trigger options are available to run in a more
// predictable time frame after mutation.
//
// Default is 0.02
PauseThreshold float64 `toml:"pause_threshold"`
// DeletionThreshold is used to guarantee that a garbage collection is
// scheduled after configured number of deletions have occurred
// since the previous garbage collection. A value of 0 indicates that
// garbage collection will not be triggered by deletion count.
//
// Default 0
DeletionThreshold int `toml:"deletion_threshold"`
// MutationThreshold is used to guarantee that a garbage collection is
// run after a configured number of database mutations have occurred
// since the previous garbage collection. A value of 0 indicates that
// garbage collection will only be run after a manual trigger or
// deletion. Unlike the deletion threshold, the mutation threshold does
// not cause scheduling of a garbage collection, but ensures GC is run
// at the next scheduled GC.
//
// Default 100
MutationThreshold int `toml:"mutation_threshold"`
// ScheduleDelay is the duration in the future to schedule a garbage
// collection triggered manually or by exceeding the configured
// threshold for deletion or mutation. A zero value will immediately
// schedule. Use suffix "ms" for millisecond and "s" for second.
//
// Default is "0ms"
ScheduleDelay duration `toml:"schedule_delay"`
// StartupDelay is the delay duration to do an initial garbage
// collection after startup. The initial garbage collection is used to
// set the base for pause threshold and should be scheduled in the
// future to avoid slowing down other startup processes. Use suffix
// "ms" for millisecond and "s" for second.
//
// Default is "100ms"
StartupDelay duration `toml:"startup_delay"`
}
type duration time.Duration
func (d *duration) UnmarshalText(text []byte) error {
ed, err := time.ParseDuration(string(text))
if err != nil {
return err
}
*d = duration(ed)
return nil
}
func (d duration) MarshalText() (text []byte, err error) {
return []byte(time.Duration(d).String()), nil
}
func init() {
registry.Register(&plugin.Registration{
Type: plugins.GCPlugin,
ID: "scheduler",
Requires: []plugin.Type{
plugins.MetadataPlugin,
},
Config: &config{
PauseThreshold: 0.02,
DeletionThreshold: 0,
MutationThreshold: 100,
ScheduleDelay: duration(0),
StartupDelay: duration(100 * time.Millisecond),
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
md, err := ic.GetSingle(plugins.MetadataPlugin)
if err != nil {
return nil, err
}
mdCollector, ok := md.(collector)
if !ok {
return nil, fmt.Errorf("%s %T must implement collector", plugins.MetadataPlugin, md)
}
m := newScheduler(mdCollector, ic.Config.(*config))
ic.Meta.Exports = map[string]string{
"PauseThreshold": fmt.Sprint(m.pauseThreshold),
"DeletionThreshold": fmt.Sprint(m.deletionThreshold),
"MutationThreshold": fmt.Sprint(m.mutationThreshold),
"ScheduleDelay": fmt.Sprint(m.scheduleDelay),
}
go m.run(ic.Context)
return m, nil
},
})
}
type mutationEvent struct {
ts time.Time
mutation bool
dirty bool
}
type collector interface {
RegisterMutationCallback(func(bool))
GarbageCollect(context.Context) (gc.Stats, error)
}
type gcScheduler struct {
c collector
eventC chan mutationEvent
waiterL sync.Mutex
waiters []chan gc.Stats
pauseThreshold float64
deletionThreshold int
mutationThreshold int
scheduleDelay time.Duration
startupDelay time.Duration
}
func newScheduler(c collector, cfg *config) *gcScheduler {
eventC := make(chan mutationEvent)
s := &gcScheduler{
c: c,
eventC: eventC,
pauseThreshold: cfg.PauseThreshold,
deletionThreshold: cfg.DeletionThreshold,
mutationThreshold: cfg.MutationThreshold,
scheduleDelay: time.Duration(cfg.ScheduleDelay),
startupDelay: time.Duration(cfg.StartupDelay),
}
if s.pauseThreshold < 0.0 {
s.pauseThreshold = 0.0
}
if s.pauseThreshold > 0.5 {
s.pauseThreshold = 0.5
}
if s.mutationThreshold < 0 {
s.mutationThreshold = 0
}
if s.scheduleDelay < 0 {
s.scheduleDelay = 0
}
if s.startupDelay < 0 {
s.startupDelay = 0
}
c.RegisterMutationCallback(s.mutationCallback)
return s
}
func (s *gcScheduler) ScheduleAndWait(ctx context.Context) (gc.Stats, error) {
return s.wait(ctx, true)
}
func (s *gcScheduler) wait(ctx context.Context, trigger bool) (gc.Stats, error) {
wc := make(chan gc.Stats, 1)
s.waiterL.Lock()
s.waiters = append(s.waiters, wc)
s.waiterL.Unlock()
if trigger {
e := mutationEvent{
ts: time.Now(),
}
go func() {
s.eventC <- e
}()
}
var gcStats gc.Stats
select {
case stats, ok := <-wc:
if !ok {
return gcStats, errors.New("gc failed")
}
gcStats = stats
case <-ctx.Done():
return gcStats, ctx.Err()
}
return gcStats, nil
}
func (s *gcScheduler) mutationCallback(dirty bool) {
e := mutationEvent{
ts: time.Now(),
mutation: true,
dirty: dirty,
}
go func() {
s.eventC <- e
}()
}
func schedule(d time.Duration) (<-chan time.Time, *time.Time) {
next := time.Now().Add(d)
return time.After(d), &next
}
func (s *gcScheduler) run(ctx context.Context) {
var (
schedC <-chan time.Time
lastCollection *time.Time
nextCollection *time.Time
interval = time.Second
gcTimeSum time.Duration
collections int
triggered bool
deletions int
mutations int
)
if s.startupDelay > 0 {
schedC, nextCollection = schedule(s.startupDelay)
}
for {
select {
case <-schedC:
// Check if garbage collection can be skipped because
// it is not needed or was not requested and reschedule
// it to attempt again after another time interval.
if !triggered && lastCollection != nil && deletions == 0 &&
(s.mutationThreshold == 0 || mutations < s.mutationThreshold) {
schedC, nextCollection = schedule(interval)
continue
}
case e := <-s.eventC:
if lastCollection != nil && lastCollection.After(e.ts) {
continue
}
if e.dirty {
deletions++
}
if e.mutation {
mutations++
} else {
triggered = true
}
// Check if condition should cause immediate collection.
if triggered ||
(s.deletionThreshold > 0 && deletions >= s.deletionThreshold) ||
(nextCollection == nil && ((s.deletionThreshold == 0 && deletions > 0) ||
(s.mutationThreshold > 0 && mutations >= s.mutationThreshold))) {
// Check if not already scheduled before delay threshold
if nextCollection == nil || nextCollection.After(time.Now().Add(s.scheduleDelay)) {
// TODO(dmcg): track re-schedules for tuning schedule config
schedC, nextCollection = schedule(s.scheduleDelay)
}
}
continue
case <-ctx.Done():
return
}
s.waiterL.Lock()
stats, err := s.c.GarbageCollect(ctx)
last := time.Now()
if err != nil {
log.G(ctx).WithError(err).Error("garbage collection failed")
collectionCounter.WithValues("fail").Inc()
// Reschedule garbage collection for same duration + 1 second
schedC, nextCollection = schedule(nextCollection.Sub(*lastCollection) + time.Second)
// Update last collection time even though failure occurred
lastCollection = &last
for _, w := range s.waiters {
close(w)
}
s.waiters = nil
s.waiterL.Unlock()
continue
}
gcTime := stats.Elapsed()
gcTimeHist.Update(gcTime)
log.G(ctx).WithField("d", gcTime).Trace("garbage collected")
gcTimeSum += gcTime
collections++
collectionCounter.WithValues("success").Inc()
triggered = false
deletions = 0
mutations = 0
// Calculate new interval with updated times
if s.pauseThreshold > 0.0 {
// Set interval to average gc time divided by the pause threshold
// This algorithm ensures that a gc is scheduled to allow enough
// runtime in between gc to reach the pause threshold.
// Pause threshold is always 0.0 < threshold <= 0.5
avg := float64(gcTimeSum) / float64(collections)
interval = time.Duration(avg/s.pauseThreshold - avg)
}
lastCollection = &last
schedC, nextCollection = schedule(interval)
for _, w := range s.waiters {
w <- stats
}
s.waiters = nil
s.waiterL.Unlock()
}
}

View File

@@ -0,0 +1,222 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"context"
"sync"
"testing"
"time"
"github.com/containerd/containerd/v2/gc"
"github.com/stretchr/testify/assert"
)
func TestPauseThreshold(t *testing.T) {
cfg := &config{
// With 100μs, gc should run about every 5ms
PauseThreshold: 0.02,
}
tc := &testCollector{
d: time.Microsecond * 100,
}
scheduler := newScheduler(tc, cfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go scheduler.run(ctx)
// Ensure every possible GC cycle runs
go func() {
tick := time.NewTicker(time.Microsecond * 100)
for {
select {
case <-tick.C:
tc.trigger(true)
case <-ctx.Done():
return
}
}
}()
time.Sleep(time.Millisecond * 15)
if c := tc.runCount(); c > 4 {
t.Fatalf("unexpected gc run count %d, expected less than 5", c)
}
}
func TestDeletionThreshold(t *testing.T) {
cfg := &config{
// Prevent GC from scheduling again before check
PauseThreshold: 0.001,
DeletionThreshold: 5,
}
tc := &testCollector{
d: time.Second,
}
scheduler := newScheduler(tc, cfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go scheduler.run(ctx)
// Block until next GC finishes
gcWait := make(chan struct{})
go func() {
scheduler.wait(ctx, false)
close(gcWait)
}()
// Increment deletion count 5, checking GC hasn't run in
// between each call
for i := 0; i < 5; i++ {
time.Sleep(time.Millisecond)
if c := tc.runCount(); c != 0 {
t.Fatalf("GC ran unexpectedly")
}
tc.trigger(true)
}
select {
case <-gcWait:
case <-time.After(time.Millisecond * 30):
t.Fatal("GC wait timed out")
}
if c := tc.runCount(); c != 1 {
t.Fatalf("unexpected gc run count %d, expected 1", c)
}
}
func TestTrigger(t *testing.T) {
var (
cfg = &config{}
tc = &testCollector{
d: time.Millisecond * 10,
}
ctx, cancel = context.WithCancel(context.Background())
scheduler = newScheduler(tc, cfg)
stats gc.Stats
err error
)
defer cancel()
go scheduler.run(ctx)
// Block until next GC finishes
gcWait := make(chan struct{})
go func() {
stats, err = scheduler.ScheduleAndWait(ctx)
close(gcWait)
}()
select {
case <-gcWait:
case <-time.After(time.Millisecond * 10):
t.Fatal("GC wait timed out")
}
if err != nil {
t.Fatalf("GC failed: %#v", err)
}
assert.Equal(t, tc.d, stats.Elapsed())
if c := tc.runCount(); c != 1 {
t.Fatalf("unexpected gc run count %d, expected 1", c)
}
}
func TestStartupDelay(t *testing.T) {
var (
startupDelay = time.Millisecond * 20
cfg = &config{
// Prevent GC from scheduling again before check
PauseThreshold: 0.001,
StartupDelay: duration(startupDelay),
}
tc = &testCollector{
d: time.Second,
}
ctx, cancel = context.WithCancel(context.Background())
scheduler = newScheduler(tc, cfg)
)
defer cancel()
t1 := time.Now()
go scheduler.run(ctx)
_, err := scheduler.wait(ctx, false)
if err != nil {
t.Fatalf("gc failed with error: %s", err)
}
d := time.Since(t1)
if c := tc.runCount(); c != 1 {
t.Fatalf("unexpected gc run count %d, expected 1", c)
}
if d < startupDelay {
t.Fatalf("expected startup delay to be longer than %s, actual %s", startupDelay, d)
}
}
type testCollector struct {
d time.Duration
gc int
m sync.Mutex
callbacks []func(bool)
}
func (tc *testCollector) trigger(delete bool) {
for _, f := range tc.callbacks {
f(delete)
}
}
func (tc *testCollector) runCount() int {
tc.m.Lock()
c := tc.gc
tc.m.Unlock()
return c
}
func (tc *testCollector) RegisterMutationCallback(f func(bool)) {
tc.callbacks = append(tc.callbacks, f)
}
func (tc *testCollector) GarbageCollect(context.Context) (gc.Stats, error) {
tc.m.Lock()
tc.gc++
tc.m.Unlock()
return gcStats{elapsed: tc.d}, nil
}
type gcStats struct {
elapsed time.Duration
}
// Elapsed returns the duration which elapsed during a collection
func (s gcStats) Elapsed() time.Duration {
return s.elapsed
}