357 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			357 lines
		
	
	
		
			9.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
|    Copyright The containerd Authors.
 | |
| 
 | |
|    Licensed under the Apache License, Version 2.0 (the "License");
 | |
|    you may not use this file except in compliance with the License.
 | |
|    You may obtain a copy of the License at
 | |
| 
 | |
|        http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
|    Unless required by applicable law or agreed to in writing, software
 | |
|    distributed under the License is distributed on an "AS IS" BASIS,
 | |
|    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|    See the License for the specific language governing permissions and
 | |
|    limitations under the License.
 | |
| */
 | |
| 
 | |
| package scheduler
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/containerd/containerd/gc"
 | |
| 	"github.com/containerd/containerd/log"
 | |
| 	"github.com/containerd/containerd/plugin"
 | |
| 	"github.com/pkg/errors"
 | |
| )
 | |
| 
 | |
| // config configures the garbage collection policies.
 | |
| type config struct {
 | |
| 	// PauseThreshold represents the maximum amount of time garbage
 | |
| 	// collection should be scheduled based on the average pause time.
 | |
| 	// For example, a value of 0.02 means that scheduled garbage collection
 | |
| 	// pauses should present at most 2% of real time,
 | |
| 	// or 20ms of every second.
 | |
| 	//
 | |
| 	// A maximum value of .5 is enforced to prevent over scheduling of the
 | |
| 	// garbage collector, trigger options are available to run in a more
 | |
| 	// predictable time frame after mutation.
 | |
| 	//
 | |
| 	// Default is 0.02
 | |
| 	PauseThreshold float64 `toml:"pause_threshold"`
 | |
| 
 | |
| 	// DeletionThreshold is used to guarantee that a garbage collection is
 | |
| 	// scheduled after configured number of deletions have occurred
 | |
| 	// since the previous garbage collection. A value of 0 indicates that
 | |
| 	// garbage collection will not be triggered by deletion count.
 | |
| 	//
 | |
| 	// Default 0
 | |
| 	DeletionThreshold int `toml:"deletion_threshold"`
 | |
| 
 | |
| 	// MutationThreshold is used to guarantee that a garbage collection is
 | |
| 	// run after a configured number of database mutations have occurred
 | |
| 	// since the previous garbage collection. A value of 0 indicates that
 | |
| 	// garbage collection will only be run after a manual trigger or
 | |
| 	// deletion. Unlike the deletion threshold, the mutation threshold does
 | |
| 	// not cause scheduling of a garbage collection, but ensures GC is run
 | |
| 	// at the next scheduled GC.
 | |
| 	//
 | |
| 	// Default 100
 | |
| 	MutationThreshold int `toml:"mutation_threshold"`
 | |
| 
 | |
| 	// ScheduleDelay is the duration in the future to schedule a garbage
 | |
| 	// collection triggered manually or by exceeding the configured
 | |
| 	// threshold for deletion or mutation. A zero value will immediately
 | |
| 	// schedule. Use suffix "ms" for millisecond and "s" for second.
 | |
| 	//
 | |
| 	// Default is "0ms"
 | |
| 	ScheduleDelay duration `toml:"schedule_delay"`
 | |
| 
 | |
| 	// StartupDelay is the delay duration to do an initial garbage
 | |
| 	// collection after startup. The initial garbage collection is used to
 | |
| 	// set the base for pause threshold and should be scheduled in the
 | |
| 	// future to avoid slowing down other startup processes. Use suffix
 | |
| 	// "ms" for millisecond and "s" for second.
 | |
| 	//
 | |
| 	// Default is "100ms"
 | |
| 	StartupDelay duration `toml:"startup_delay"`
 | |
| }
 | |
| 
 | |
| type duration time.Duration
 | |
| 
 | |
| func (d *duration) UnmarshalText(text []byte) error {
 | |
| 	ed, err := time.ParseDuration(string(text))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	*d = duration(ed)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (d duration) MarshalText() (text []byte, err error) {
 | |
| 	return []byte(time.Duration(d).String()), nil
 | |
| }
 | |
| 
 | |
| func init() {
 | |
| 	plugin.Register(&plugin.Registration{
 | |
| 		Type: plugin.GCPlugin,
 | |
| 		ID:   "scheduler",
 | |
| 		Requires: []plugin.Type{
 | |
| 			plugin.MetadataPlugin,
 | |
| 		},
 | |
| 		Config: &config{
 | |
| 			PauseThreshold:    0.02,
 | |
| 			DeletionThreshold: 0,
 | |
| 			MutationThreshold: 100,
 | |
| 			ScheduleDelay:     duration(0),
 | |
| 			StartupDelay:      duration(100 * time.Millisecond),
 | |
| 		},
 | |
| 		InitFn: func(ic *plugin.InitContext) (interface{}, error) {
 | |
| 			md, err := ic.Get(plugin.MetadataPlugin)
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 
 | |
| 			mdCollector, ok := md.(collector)
 | |
| 			if !ok {
 | |
| 				return nil, errors.Errorf("%s %T must implement collector", plugin.MetadataPlugin, md)
 | |
| 			}
 | |
| 
 | |
| 			m := newScheduler(mdCollector, ic.Config.(*config))
 | |
| 
 | |
| 			ic.Meta.Exports = map[string]string{
 | |
| 				"PauseThreshold":    fmt.Sprint(m.pauseThreshold),
 | |
| 				"DeletionThreshold": fmt.Sprint(m.deletionThreshold),
 | |
| 				"MutationThreshold": fmt.Sprint(m.mutationThreshold),
 | |
| 				"ScheduleDelay":     fmt.Sprint(m.scheduleDelay),
 | |
| 			}
 | |
| 
 | |
| 			go m.run(ic.Context)
 | |
| 
 | |
| 			return m, nil
 | |
| 		},
 | |
| 	})
 | |
| }
 | |
| 
 | |
| type mutationEvent struct {
 | |
| 	ts       time.Time
 | |
| 	mutation bool
 | |
| 	dirty    bool
 | |
| }
 | |
| 
 | |
| type collector interface {
 | |
| 	RegisterMutationCallback(func(bool))
 | |
| 	GarbageCollect(context.Context) (gc.Stats, error)
 | |
| }
 | |
| 
 | |
| type gcScheduler struct {
 | |
| 	c collector
 | |
| 
 | |
| 	eventC chan mutationEvent
 | |
| 
 | |
| 	waiterL sync.Mutex
 | |
| 	waiters []chan gc.Stats
 | |
| 
 | |
| 	pauseThreshold    float64
 | |
| 	deletionThreshold int
 | |
| 	mutationThreshold int
 | |
| 	scheduleDelay     time.Duration
 | |
| 	startupDelay      time.Duration
 | |
| }
 | |
| 
 | |
| func newScheduler(c collector, cfg *config) *gcScheduler {
 | |
| 	eventC := make(chan mutationEvent)
 | |
| 
 | |
| 	s := &gcScheduler{
 | |
| 		c:                 c,
 | |
| 		eventC:            eventC,
 | |
| 		pauseThreshold:    cfg.PauseThreshold,
 | |
| 		deletionThreshold: cfg.DeletionThreshold,
 | |
| 		mutationThreshold: cfg.MutationThreshold,
 | |
| 		scheduleDelay:     time.Duration(cfg.ScheduleDelay),
 | |
| 		startupDelay:      time.Duration(cfg.StartupDelay),
 | |
| 	}
 | |
| 
 | |
| 	if s.pauseThreshold < 0.0 {
 | |
| 		s.pauseThreshold = 0.0
 | |
| 	}
 | |
| 	if s.pauseThreshold > 0.5 {
 | |
| 		s.pauseThreshold = 0.5
 | |
| 	}
 | |
| 	if s.mutationThreshold < 0 {
 | |
| 		s.mutationThreshold = 0
 | |
| 	}
 | |
| 	if s.scheduleDelay < 0 {
 | |
| 		s.scheduleDelay = 0
 | |
| 	}
 | |
| 	if s.startupDelay < 0 {
 | |
| 		s.startupDelay = 0
 | |
| 	}
 | |
| 
 | |
| 	c.RegisterMutationCallback(s.mutationCallback)
 | |
| 
 | |
| 	return s
 | |
| }
 | |
| 
 | |
| func (s *gcScheduler) ScheduleAndWait(ctx context.Context) (gc.Stats, error) {
 | |
| 	return s.wait(ctx, true)
 | |
| }
 | |
| 
 | |
| func (s *gcScheduler) wait(ctx context.Context, trigger bool) (gc.Stats, error) {
 | |
| 	wc := make(chan gc.Stats, 1)
 | |
| 	s.waiterL.Lock()
 | |
| 	s.waiters = append(s.waiters, wc)
 | |
| 	s.waiterL.Unlock()
 | |
| 
 | |
| 	if trigger {
 | |
| 		e := mutationEvent{
 | |
| 			ts: time.Now(),
 | |
| 		}
 | |
| 		go func() {
 | |
| 			s.eventC <- e
 | |
| 		}()
 | |
| 	}
 | |
| 
 | |
| 	var gcStats gc.Stats
 | |
| 	select {
 | |
| 	case stats, ok := <-wc:
 | |
| 		if !ok {
 | |
| 			return gcStats, errors.New("gc failed")
 | |
| 		}
 | |
| 		gcStats = stats
 | |
| 	case <-ctx.Done():
 | |
| 		return gcStats, ctx.Err()
 | |
| 	}
 | |
| 
 | |
| 	return gcStats, nil
 | |
| }
 | |
| 
 | |
| func (s *gcScheduler) mutationCallback(dirty bool) {
 | |
| 	e := mutationEvent{
 | |
| 		ts:       time.Now(),
 | |
| 		mutation: true,
 | |
| 		dirty:    dirty,
 | |
| 	}
 | |
| 	go func() {
 | |
| 		s.eventC <- e
 | |
| 	}()
 | |
| }
 | |
| 
 | |
| func schedule(d time.Duration) (<-chan time.Time, *time.Time) {
 | |
| 	next := time.Now().Add(d)
 | |
| 	return time.After(d), &next
 | |
| }
 | |
| 
 | |
| func (s *gcScheduler) run(ctx context.Context) {
 | |
| 	var (
 | |
| 		schedC <-chan time.Time
 | |
| 
 | |
| 		lastCollection *time.Time
 | |
| 		nextCollection *time.Time
 | |
| 
 | |
| 		interval    = time.Second
 | |
| 		gcTime      time.Duration
 | |
| 		collections int
 | |
| 		// TODO(dmcg): expose collection stats as metrics
 | |
| 
 | |
| 		triggered bool
 | |
| 		deletions int
 | |
| 		mutations int
 | |
| 	)
 | |
| 	if s.startupDelay > 0 {
 | |
| 		schedC, nextCollection = schedule(s.startupDelay)
 | |
| 	}
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-schedC:
 | |
| 			// Check if garbage collection can be skipped because
 | |
| 			// it is not needed or was not requested and reschedule
 | |
| 			// it to attempt again after another time interval.
 | |
| 			if !triggered && lastCollection != nil && deletions == 0 &&
 | |
| 				(s.mutationThreshold == 0 || mutations < s.mutationThreshold) {
 | |
| 				schedC, nextCollection = schedule(interval)
 | |
| 				continue
 | |
| 			}
 | |
| 		case e := <-s.eventC:
 | |
| 			if lastCollection != nil && lastCollection.After(e.ts) {
 | |
| 				continue
 | |
| 			}
 | |
| 			if e.dirty {
 | |
| 				deletions++
 | |
| 			}
 | |
| 			if e.mutation {
 | |
| 				mutations++
 | |
| 			} else {
 | |
| 				triggered = true
 | |
| 			}
 | |
| 
 | |
| 			// Check if condition should cause immediate collection.
 | |
| 			if triggered ||
 | |
| 				(s.deletionThreshold > 0 && deletions >= s.deletionThreshold) ||
 | |
| 				(nextCollection == nil && ((s.deletionThreshold == 0 && deletions > 0) ||
 | |
| 					(s.mutationThreshold > 0 && mutations >= s.mutationThreshold))) {
 | |
| 				// Check if not already scheduled before delay threshold
 | |
| 				if nextCollection == nil || nextCollection.After(time.Now().Add(s.scheduleDelay)) {
 | |
| 					// TODO(dmcg): track re-schedules for tuning schedule config
 | |
| 					schedC, nextCollection = schedule(s.scheduleDelay)
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			continue
 | |
| 		case <-ctx.Done():
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		s.waiterL.Lock()
 | |
| 
 | |
| 		stats, err := s.c.GarbageCollect(ctx)
 | |
| 		last := time.Now()
 | |
| 		if err != nil {
 | |
| 			log.G(ctx).WithError(err).Error("garbage collection failed")
 | |
| 
 | |
| 			// Reschedule garbage collection for same duration + 1 second
 | |
| 			schedC, nextCollection = schedule(nextCollection.Sub(*lastCollection) + time.Second)
 | |
| 
 | |
| 			// Update last collection time even though failure occured
 | |
| 			lastCollection = &last
 | |
| 
 | |
| 			for _, w := range s.waiters {
 | |
| 				close(w)
 | |
| 			}
 | |
| 			s.waiters = nil
 | |
| 			s.waiterL.Unlock()
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		log.G(ctx).WithField("d", stats.Elapsed()).Debug("garbage collected")
 | |
| 
 | |
| 		gcTime += stats.Elapsed()
 | |
| 		collections++
 | |
| 		triggered = false
 | |
| 		deletions = 0
 | |
| 		mutations = 0
 | |
| 
 | |
| 		// Calculate new interval with updated times
 | |
| 		if s.pauseThreshold > 0.0 {
 | |
| 			// Set interval to average gc time divided by the pause threshold
 | |
| 			// This algorithm ensures that a gc is scheduled to allow enough
 | |
| 			// runtime in between gc to reach the pause threshold.
 | |
| 			// Pause threshold is always 0.0 < threshold <= 0.5
 | |
| 			avg := float64(gcTime) / float64(collections)
 | |
| 			interval = time.Duration(avg/s.pauseThreshold - avg)
 | |
| 		}
 | |
| 
 | |
| 		lastCollection = &last
 | |
| 		schedC, nextCollection = schedule(interval)
 | |
| 
 | |
| 		for _, w := range s.waiters {
 | |
| 			w <- stats
 | |
| 		}
 | |
| 		s.waiters = nil
 | |
| 		s.waiterL.Unlock()
 | |
| 	}
 | |
| }
 | 
