Small refactor of gc/scheduler to remove import of metadata
Replace metadata.GCStats with an interface for exposing elapsed time Signed-off-by: Daniel Nephin <dnephin@gmail.com>
This commit is contained in:
parent
e479165a38
commit
06edd193ef
6
gc/gc.go
6
gc/gc.go
@ -8,6 +8,7 @@ package gc
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ResourceType represents type of resource at a node
|
// ResourceType represents type of resource at a node
|
||||||
@ -21,6 +22,11 @@ type Node struct {
|
|||||||
Key string
|
Key string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stats about a garbage collection run
|
||||||
|
type Stats interface {
|
||||||
|
Elapsed() time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
// Tricolor implements basic, single-thread tri-color GC. Given the roots, the
|
// Tricolor implements basic, single-thread tri-color GC. Given the roots, the
|
||||||
// complete set and a refs function, this function returns a map of all
|
// complete set and a refs function, this function returns a map of all
|
||||||
// reachable objects.
|
// reachable objects.
|
||||||
|
@ -2,14 +2,14 @@ package scheduler
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd/gc"
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
"github.com/containerd/containerd/metadata"
|
|
||||||
"github.com/containerd/containerd/plugin"
|
"github.com/containerd/containerd/plugin"
|
||||||
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
// config configures the garbage collection policies.
|
// config configures the garbage collection policies.
|
||||||
@ -95,7 +95,12 @@ func init() {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
m := newScheduler(md.(*metadata.DB), ic.Config.(*config))
|
mdCollector, ok := md.(collector)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.Errorf("%s %T must implement collector", plugin.MetadataPlugin, md)
|
||||||
|
}
|
||||||
|
|
||||||
|
m := newScheduler(mdCollector, ic.Config.(*config))
|
||||||
|
|
||||||
ic.Meta.Exports = map[string]string{
|
ic.Meta.Exports = map[string]string{
|
||||||
"PauseThreshold": fmt.Sprint(m.pauseThreshold),
|
"PauseThreshold": fmt.Sprint(m.pauseThreshold),
|
||||||
@ -119,7 +124,7 @@ type mutationEvent struct {
|
|||||||
|
|
||||||
type collector interface {
|
type collector interface {
|
||||||
RegisterMutationCallback(func(bool))
|
RegisterMutationCallback(func(bool))
|
||||||
GarbageCollect(context.Context) (metadata.GCStats, error)
|
GarbageCollect(context.Context) (gc.Stats, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type gcScheduler struct {
|
type gcScheduler struct {
|
||||||
@ -128,7 +133,7 @@ type gcScheduler struct {
|
|||||||
eventC chan mutationEvent
|
eventC chan mutationEvent
|
||||||
|
|
||||||
waiterL sync.Mutex
|
waiterL sync.Mutex
|
||||||
waiters []chan metadata.GCStats
|
waiters []chan gc.Stats
|
||||||
|
|
||||||
pauseThreshold float64
|
pauseThreshold float64
|
||||||
deletionThreshold int
|
deletionThreshold int
|
||||||
@ -171,12 +176,12 @@ func newScheduler(c collector, cfg *config) *gcScheduler {
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *gcScheduler) ScheduleAndWait(ctx context.Context) (metadata.GCStats, error) {
|
func (s *gcScheduler) ScheduleAndWait(ctx context.Context) (gc.Stats, error) {
|
||||||
return s.wait(ctx, true)
|
return s.wait(ctx, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *gcScheduler) wait(ctx context.Context, trigger bool) (metadata.GCStats, error) {
|
func (s *gcScheduler) wait(ctx context.Context, trigger bool) (gc.Stats, error) {
|
||||||
wc := make(chan metadata.GCStats, 1)
|
wc := make(chan gc.Stats, 1)
|
||||||
s.waiterL.Lock()
|
s.waiterL.Lock()
|
||||||
s.waiters = append(s.waiters, wc)
|
s.waiters = append(s.waiters, wc)
|
||||||
s.waiterL.Unlock()
|
s.waiterL.Unlock()
|
||||||
@ -190,15 +195,15 @@ func (s *gcScheduler) wait(ctx context.Context, trigger bool) (metadata.GCStats,
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
var gcStats metadata.GCStats
|
var gcStats gc.Stats
|
||||||
select {
|
select {
|
||||||
case stats, ok := <-wc:
|
case stats, ok := <-wc:
|
||||||
if !ok {
|
if !ok {
|
||||||
return metadata.GCStats{}, errors.New("gc failed")
|
return gcStats, errors.New("gc failed")
|
||||||
}
|
}
|
||||||
gcStats = stats
|
gcStats = stats
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return metadata.GCStats{}, ctx.Err()
|
return gcStats, ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
return gcStats, nil
|
return gcStats, nil
|
||||||
@ -301,9 +306,9 @@ func (s *gcScheduler) run(ctx context.Context) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
log.G(ctx).WithField("d", stats.MetaD).Debug("garbage collected")
|
log.G(ctx).WithField("d", stats.Elapsed()).Debug("garbage collected")
|
||||||
|
|
||||||
gcTime += stats.MetaD
|
gcTime += stats.Elapsed()
|
||||||
collections++
|
collections++
|
||||||
triggered = false
|
triggered = false
|
||||||
deletions = 0
|
deletions = 0
|
||||||
|
@ -6,11 +6,11 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/containerd/metadata"
|
"github.com/containerd/containerd/gc"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPauseThreshold(t *testing.T) {
|
func TestPauseThreshold(t *testing.T) {
|
||||||
|
|
||||||
cfg := &config{
|
cfg := &config{
|
||||||
// With 100μs, gc should run about every 5ms
|
// With 100μs, gc should run about every 5ms
|
||||||
PauseThreshold: 0.02,
|
PauseThreshold: 0.02,
|
||||||
@ -99,7 +99,7 @@ func TestTrigger(t *testing.T) {
|
|||||||
}
|
}
|
||||||
ctx, cancel = context.WithCancel(context.Background())
|
ctx, cancel = context.WithCancel(context.Background())
|
||||||
scheduler = newScheduler(tc, cfg)
|
scheduler = newScheduler(tc, cfg)
|
||||||
stats metadata.GCStats
|
stats gc.Stats
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -123,9 +123,7 @@ func TestTrigger(t *testing.T) {
|
|||||||
t.Fatalf("GC failed: %#v", err)
|
t.Fatalf("GC failed: %#v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if stats.MetaD != tc.d {
|
require.Equal(t, tc.d, stats.Elapsed())
|
||||||
t.Fatalf("unexpected gc duration: %s, expected %d", stats.MetaD, tc.d)
|
|
||||||
}
|
|
||||||
|
|
||||||
if c := tc.runCount(); c != 1 {
|
if c := tc.runCount(); c != 1 {
|
||||||
t.Fatalf("unexpected gc run count %d, expected 1", c)
|
t.Fatalf("unexpected gc run count %d, expected 1", c)
|
||||||
@ -180,11 +178,18 @@ func (tc *testCollector) RegisterMutationCallback(f func(bool)) {
|
|||||||
tc.callbacks = append(tc.callbacks, f)
|
tc.callbacks = append(tc.callbacks, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tc *testCollector) GarbageCollect(context.Context) (metadata.GCStats, error) {
|
func (tc *testCollector) GarbageCollect(context.Context) (gc.Stats, error) {
|
||||||
tc.m.Lock()
|
tc.m.Lock()
|
||||||
tc.gc++
|
tc.gc++
|
||||||
tc.m.Unlock()
|
tc.m.Unlock()
|
||||||
return metadata.GCStats{
|
return gcStats{elapsed: tc.d}, nil
|
||||||
MetaD: tc.d,
|
}
|
||||||
}, nil
|
|
||||||
|
type gcStats struct {
|
||||||
|
elapsed time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// Elapsed returns the duration which elapsed during a collection
|
||||||
|
func (s gcStats) Elapsed() time.Duration {
|
||||||
|
return s.elapsed
|
||||||
}
|
}
|
||||||
|
@ -204,7 +204,7 @@ func (m *DB) Update(fn func(*bolt.Tx) error) error {
|
|||||||
// RegisterMutationCallback registers a function to be called after a metadata
|
// RegisterMutationCallback registers a function to be called after a metadata
|
||||||
// mutations has been performed.
|
// mutations has been performed.
|
||||||
//
|
//
|
||||||
// The callback function in an argument for whether a deletion has occurred
|
// The callback function is an argument for whether a deletion has occurred
|
||||||
// since the last garbage collection.
|
// since the last garbage collection.
|
||||||
func (m *DB) RegisterMutationCallback(fn func(bool)) {
|
func (m *DB) RegisterMutationCallback(fn func(bool)) {
|
||||||
m.dirtyL.Lock()
|
m.dirtyL.Lock()
|
||||||
@ -219,15 +219,20 @@ type GCStats struct {
|
|||||||
SnapshotD map[string]time.Duration
|
SnapshotD map[string]time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Elapsed returns the duration which elapsed during a collection
|
||||||
|
func (s GCStats) Elapsed() time.Duration {
|
||||||
|
return s.MetaD
|
||||||
|
}
|
||||||
|
|
||||||
// GarbageCollect starts garbage collection
|
// GarbageCollect starts garbage collection
|
||||||
func (m *DB) GarbageCollect(ctx context.Context) (stats GCStats, err error) {
|
func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
|
||||||
m.wlock.Lock()
|
m.wlock.Lock()
|
||||||
t1 := time.Now()
|
t1 := time.Now()
|
||||||
|
|
||||||
marked, err := m.getMarked(ctx)
|
marked, err := m.getMarked(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.wlock.Unlock()
|
m.wlock.Unlock()
|
||||||
return GCStats{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
m.dirtyL.Lock()
|
m.dirtyL.Lock()
|
||||||
@ -259,9 +264,10 @@ func (m *DB) GarbageCollect(ctx context.Context) (stats GCStats, err error) {
|
|||||||
}); err != nil {
|
}); err != nil {
|
||||||
m.dirtyL.Unlock()
|
m.dirtyL.Unlock()
|
||||||
m.wlock.Unlock()
|
m.wlock.Unlock()
|
||||||
return GCStats{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var stats GCStats
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
if len(m.dirtySS) > 0 {
|
if len(m.dirtySS) > 0 {
|
||||||
@ -303,7 +309,7 @@ func (m *DB) GarbageCollect(ctx context.Context) (stats GCStats, err error) {
|
|||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
return
|
return stats, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) {
|
func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) {
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
imagesapi "github.com/containerd/containerd/api/services/images/v1"
|
imagesapi "github.com/containerd/containerd/api/services/images/v1"
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
"github.com/containerd/containerd/events"
|
"github.com/containerd/containerd/events"
|
||||||
|
"github.com/containerd/containerd/gc"
|
||||||
"github.com/containerd/containerd/images"
|
"github.com/containerd/containerd/images"
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
"github.com/containerd/containerd/metadata"
|
"github.com/containerd/containerd/metadata"
|
||||||
@ -43,7 +44,7 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type gcScheduler interface {
|
type gcScheduler interface {
|
||||||
ScheduleAndWait(gocontext.Context) (metadata.GCStats, error)
|
ScheduleAndWait(gocontext.Context) (gc.Stats, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type service struct {
|
type service struct {
|
||||||
|
Loading…
Reference in New Issue
Block a user