445 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			445 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
|    Copyright The containerd Authors.
 | |
| 
 | |
|    Licensed under the Apache License, Version 2.0 (the "License");
 | |
|    you may not use this file except in compliance with the License.
 | |
|    You may obtain a copy of the License at
 | |
| 
 | |
|        http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
|    Unless required by applicable law or agreed to in writing, software
 | |
|    distributed under the License is distributed on an "AS IS" BASIS,
 | |
|    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|    See the License for the specific language governing permissions and
 | |
|    limitations under the License.
 | |
| */
 | |
| 
 | |
| package metadata
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/binary"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/containerd/containerd/content"
 | |
| 	"github.com/containerd/containerd/gc"
 | |
| 	"github.com/containerd/containerd/log"
 | |
| 	"github.com/containerd/containerd/snapshots"
 | |
| 	"github.com/pkg/errors"
 | |
| 	bolt "go.etcd.io/bbolt"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	// schemaVersion represents the schema version of
 | |
| 	// the database. This schema version represents the
 | |
| 	// structure of the data in the database. The schema
 | |
| 	// can envolve at any time but any backwards
 | |
| 	// incompatible changes or structural changes require
 | |
| 	// bumping the schema version.
 | |
| 	schemaVersion = "v1"
 | |
| 
 | |
| 	// dbVersion represents updates to the schema
 | |
| 	// version which are additions and compatible with
 | |
| 	// prior version of the same schema.
 | |
| 	dbVersion = 3
 | |
| )
 | |
| 
 | |
| // DBOpt configures how we set up the DB
 | |
| type DBOpt func(*dbOptions)
 | |
| 
 | |
| // WithPolicyIsolated isolates contents between namespaces
 | |
| func WithPolicyIsolated(o *dbOptions) {
 | |
| 	o.shared = false
 | |
| }
 | |
| 
 | |
| // dbOptions configure db options.
 | |
| type dbOptions struct {
 | |
| 	shared bool
 | |
| }
 | |
| 
 | |
| // DB represents a metadata database backed by a bolt
 | |
| // database. The database is fully namespaced and stores
 | |
| // image, container, namespace, snapshot, and content data
 | |
| // while proxying data shared across namespaces to backend
 | |
| // datastores for content and snapshots.
 | |
| type DB struct {
 | |
| 	db *bolt.DB
 | |
| 	ss map[string]*snapshotter
 | |
| 	cs *contentStore
 | |
| 
 | |
| 	// wlock is used to protect access to the data structures during garbage
 | |
| 	// collection. While the wlock is held no writable transactions can be
 | |
| 	// opened, preventing changes from occurring between the mark and
 | |
| 	// sweep phases without preventing read transactions.
 | |
| 	wlock sync.RWMutex
 | |
| 
 | |
| 	// dirty flag indicates that references have been removed which require
 | |
| 	// a garbage collection to ensure the database is clean. This tracks
 | |
| 	// the number of dirty operations. This should be updated and read
 | |
| 	// atomically if outside of wlock.Lock.
 | |
| 	dirty uint32
 | |
| 
 | |
| 	// dirtySS and dirtyCS flags keeps track of datastores which have had
 | |
| 	// deletions since the last garbage collection. These datastores will
 | |
| 	// be garbage collected during the next garbage collection. These
 | |
| 	// should only be updated inside of a write transaction or wlock.Lock.
 | |
| 	dirtySS map[string]struct{}
 | |
| 	dirtyCS bool
 | |
| 
 | |
| 	// mutationCallbacks are called after each mutation with the flag
 | |
| 	// set indicating whether any dirty flags are set
 | |
| 	mutationCallbacks []func(bool)
 | |
| 
 | |
| 	dbopts dbOptions
 | |
| }
 | |
| 
 | |
| // NewDB creates a new metadata database using the provided
 | |
| // bolt database, content store, and snapshotters.
 | |
| func NewDB(db *bolt.DB, cs content.Store, ss map[string]snapshots.Snapshotter, opts ...DBOpt) *DB {
 | |
| 	m := &DB{
 | |
| 		db:      db,
 | |
| 		ss:      make(map[string]*snapshotter, len(ss)),
 | |
| 		dirtySS: map[string]struct{}{},
 | |
| 		dbopts: dbOptions{
 | |
| 			shared: true,
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	for _, opt := range opts {
 | |
| 		opt(&m.dbopts)
 | |
| 	}
 | |
| 
 | |
| 	// Initialize data stores
 | |
| 	m.cs = newContentStore(m, m.dbopts.shared, cs)
 | |
| 	for name, sn := range ss {
 | |
| 		m.ss[name] = newSnapshotter(m, name, sn)
 | |
| 	}
 | |
| 
 | |
| 	return m
 | |
| }
 | |
| 
 | |
| // Init ensures the database is at the correct version
 | |
| // and performs any needed migrations.
 | |
| func (m *DB) Init(ctx context.Context) error {
 | |
| 	// errSkip is used when no migration or version needs to be written
 | |
| 	// to the database and the transaction can be immediately rolled
 | |
| 	// back rather than performing a much slower and unnecessary commit.
 | |
| 	var errSkip = errors.New("skip update")
 | |
| 
 | |
| 	err := m.db.Update(func(tx *bolt.Tx) error {
 | |
| 		var (
 | |
| 			// current schema and version
 | |
| 			schema  = "v0"
 | |
| 			version = 0
 | |
| 		)
 | |
| 
 | |
| 		// i represents the index of the first migration
 | |
| 		// which must be run to get the database up to date.
 | |
| 		// The migration's version will be checked in reverse
 | |
| 		// order, decrementing i for each migration which
 | |
| 		// represents a version newer than the current
 | |
| 		// database version
 | |
| 		i := len(migrations)
 | |
| 
 | |
| 		for ; i > 0; i-- {
 | |
| 			migration := migrations[i-1]
 | |
| 
 | |
| 			bkt := tx.Bucket([]byte(migration.schema))
 | |
| 			if bkt == nil {
 | |
| 				// Hasn't encountered another schema, go to next migration
 | |
| 				if schema == "v0" {
 | |
| 					continue
 | |
| 				}
 | |
| 				break
 | |
| 			}
 | |
| 			if schema == "v0" {
 | |
| 				schema = migration.schema
 | |
| 				vb := bkt.Get(bucketKeyDBVersion)
 | |
| 				if vb != nil {
 | |
| 					v, _ := binary.Varint(vb)
 | |
| 					version = int(v)
 | |
| 				}
 | |
| 			}
 | |
| 
 | |
| 			if version >= migration.version {
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// Previous version of database found
 | |
| 		if schema != "v0" {
 | |
| 			updates := migrations[i:]
 | |
| 
 | |
| 			// No migration updates, return immediately
 | |
| 			if len(updates) == 0 {
 | |
| 				return errSkip
 | |
| 			}
 | |
| 
 | |
| 			for _, m := range updates {
 | |
| 				t0 := time.Now()
 | |
| 				if err := m.migrate(tx); err != nil {
 | |
| 					return errors.Wrapf(err, "failed to migrate to %s.%d", m.schema, m.version)
 | |
| 				}
 | |
| 				log.G(ctx).WithField("d", time.Since(t0)).Debugf("finished database migration to %s.%d", m.schema, m.version)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		versionEncoded, err := encodeInt(dbVersion)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		return bkt.Put(bucketKeyDBVersion, versionEncoded)
 | |
| 	})
 | |
| 	if err == errSkip {
 | |
| 		err = nil
 | |
| 	}
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // ContentStore returns a namespaced content store
 | |
| // proxied to a content store.
 | |
| func (m *DB) ContentStore() content.Store {
 | |
| 	if m.cs == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	return m.cs
 | |
| }
 | |
| 
 | |
| // Snapshotter returns a namespaced content store for
 | |
| // the requested snapshotter name proxied to a snapshotter.
 | |
| func (m *DB) Snapshotter(name string) snapshots.Snapshotter {
 | |
| 	sn, ok := m.ss[name]
 | |
| 	if !ok {
 | |
| 		return nil
 | |
| 	}
 | |
| 	return sn
 | |
| }
 | |
| 
 | |
| // Snapshotters returns all available snapshotters.
 | |
| func (m *DB) Snapshotters() map[string]snapshots.Snapshotter {
 | |
| 	ss := make(map[string]snapshots.Snapshotter, len(m.ss))
 | |
| 	for n, sn := range m.ss {
 | |
| 		ss[n] = sn
 | |
| 	}
 | |
| 	return ss
 | |
| }
 | |
| 
 | |
| // View runs a readonly transaction on the metadata store.
 | |
| func (m *DB) View(fn func(*bolt.Tx) error) error {
 | |
| 	return m.db.View(fn)
 | |
| }
 | |
| 
 | |
| // Update runs a writable transaction on the metadata store.
 | |
| func (m *DB) Update(fn func(*bolt.Tx) error) error {
 | |
| 	m.wlock.RLock()
 | |
| 	defer m.wlock.RUnlock()
 | |
| 	err := m.db.Update(fn)
 | |
| 	if err == nil {
 | |
| 		dirty := atomic.LoadUint32(&m.dirty) > 0
 | |
| 		for _, fn := range m.mutationCallbacks {
 | |
| 			fn(dirty)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // RegisterMutationCallback registers a function to be called after a metadata
 | |
| // mutations has been performed.
 | |
| //
 | |
| // The callback function is an argument for whether a deletion has occurred
 | |
| // since the last garbage collection.
 | |
| func (m *DB) RegisterMutationCallback(fn func(bool)) {
 | |
| 	m.wlock.Lock()
 | |
| 	m.mutationCallbacks = append(m.mutationCallbacks, fn)
 | |
| 	m.wlock.Unlock()
 | |
| }
 | |
| 
 | |
| // GCStats holds the duration for the different phases of the garbage collector
 | |
| type GCStats struct {
 | |
| 	MetaD     time.Duration
 | |
| 	ContentD  time.Duration
 | |
| 	SnapshotD map[string]time.Duration
 | |
| }
 | |
| 
 | |
| // Elapsed returns the duration which elapsed during a collection
 | |
| func (s GCStats) Elapsed() time.Duration {
 | |
| 	return s.MetaD
 | |
| }
 | |
| 
 | |
| // GarbageCollect starts garbage collection
 | |
| func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
 | |
| 	m.wlock.Lock()
 | |
| 	t1 := time.Now()
 | |
| 
 | |
| 	marked, err := m.getMarked(ctx)
 | |
| 	if err != nil {
 | |
| 		m.wlock.Unlock()
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	if err := m.db.Update(func(tx *bolt.Tx) error {
 | |
| 		ctx, cancel := context.WithCancel(ctx)
 | |
| 		defer cancel()
 | |
| 
 | |
| 		rm := func(ctx context.Context, n gc.Node) error {
 | |
| 			if _, ok := marked[n]; ok {
 | |
| 				return nil
 | |
| 			}
 | |
| 
 | |
| 			if n.Type == ResourceSnapshot {
 | |
| 				if idx := strings.IndexRune(n.Key, '/'); idx > 0 {
 | |
| 					m.dirtySS[n.Key[:idx]] = struct{}{}
 | |
| 				}
 | |
| 			} else if n.Type == ResourceContent || n.Type == ResourceIngest {
 | |
| 				m.dirtyCS = true
 | |
| 			}
 | |
| 			return remove(ctx, tx, n)
 | |
| 		}
 | |
| 
 | |
| 		if err := scanAll(ctx, tx, rm); err != nil {
 | |
| 			return errors.Wrap(err, "failed to scan and remove")
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	}); err != nil {
 | |
| 		m.wlock.Unlock()
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	var stats GCStats
 | |
| 	var wg sync.WaitGroup
 | |
| 
 | |
| 	// reset dirty, no need for atomic inside of wlock.Lock
 | |
| 	m.dirty = 0
 | |
| 
 | |
| 	if len(m.dirtySS) > 0 {
 | |
| 		var sl sync.Mutex
 | |
| 		stats.SnapshotD = map[string]time.Duration{}
 | |
| 		wg.Add(len(m.dirtySS))
 | |
| 		for snapshotterName := range m.dirtySS {
 | |
| 			log.G(ctx).WithField("snapshotter", snapshotterName).Debug("schedule snapshotter cleanup")
 | |
| 			go func(snapshotterName string) {
 | |
| 				st1 := time.Now()
 | |
| 				m.cleanupSnapshotter(snapshotterName)
 | |
| 
 | |
| 				sl.Lock()
 | |
| 				stats.SnapshotD[snapshotterName] = time.Since(st1)
 | |
| 				sl.Unlock()
 | |
| 
 | |
| 				wg.Done()
 | |
| 			}(snapshotterName)
 | |
| 		}
 | |
| 		m.dirtySS = map[string]struct{}{}
 | |
| 	}
 | |
| 
 | |
| 	if m.dirtyCS {
 | |
| 		wg.Add(1)
 | |
| 		log.G(ctx).Debug("schedule content cleanup")
 | |
| 		go func() {
 | |
| 			ct1 := time.Now()
 | |
| 			m.cleanupContent()
 | |
| 			stats.ContentD = time.Since(ct1)
 | |
| 			wg.Done()
 | |
| 		}()
 | |
| 		m.dirtyCS = false
 | |
| 	}
 | |
| 
 | |
| 	stats.MetaD = time.Since(t1)
 | |
| 	m.wlock.Unlock()
 | |
| 
 | |
| 	wg.Wait()
 | |
| 
 | |
| 	return stats, err
 | |
| }
 | |
| 
 | |
| func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) {
 | |
| 	var marked map[gc.Node]struct{}
 | |
| 	if err := m.db.View(func(tx *bolt.Tx) error {
 | |
| 		ctx, cancel := context.WithCancel(ctx)
 | |
| 		defer cancel()
 | |
| 
 | |
| 		var (
 | |
| 			nodes []gc.Node
 | |
| 			wg    sync.WaitGroup
 | |
| 			roots = make(chan gc.Node)
 | |
| 		)
 | |
| 		wg.Add(1)
 | |
| 		go func() {
 | |
| 			defer wg.Done()
 | |
| 			for n := range roots {
 | |
| 				nodes = append(nodes, n)
 | |
| 			}
 | |
| 		}()
 | |
| 		// Call roots
 | |
| 		if err := scanRoots(ctx, tx, roots); err != nil {
 | |
| 			cancel()
 | |
| 			return err
 | |
| 		}
 | |
| 		close(roots)
 | |
| 		wg.Wait()
 | |
| 
 | |
| 		refs := func(n gc.Node) ([]gc.Node, error) {
 | |
| 			var sn []gc.Node
 | |
| 			if err := references(ctx, tx, n, func(nn gc.Node) {
 | |
| 				sn = append(sn, nn)
 | |
| 			}); err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			return sn, nil
 | |
| 		}
 | |
| 
 | |
| 		reachable, err := gc.Tricolor(nodes, refs)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		marked = reachable
 | |
| 		return nil
 | |
| 	}); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return marked, nil
 | |
| }
 | |
| 
 | |
| func (m *DB) cleanupSnapshotter(name string) (time.Duration, error) {
 | |
| 	ctx := context.Background()
 | |
| 	sn, ok := m.ss[name]
 | |
| 	if !ok {
 | |
| 		return 0, nil
 | |
| 	}
 | |
| 
 | |
| 	d, err := sn.garbageCollect(ctx)
 | |
| 	logger := log.G(ctx).WithField("snapshotter", name)
 | |
| 	if err != nil {
 | |
| 		logger.WithError(err).Warn("snapshot garbage collection failed")
 | |
| 	} else {
 | |
| 		logger.WithField("d", d).Debugf("snapshot garbage collected")
 | |
| 	}
 | |
| 	return d, err
 | |
| }
 | |
| 
 | |
| func (m *DB) cleanupContent() (time.Duration, error) {
 | |
| 	ctx := context.Background()
 | |
| 	if m.cs == nil {
 | |
| 		return 0, nil
 | |
| 	}
 | |
| 
 | |
| 	d, err := m.cs.garbageCollect(ctx)
 | |
| 	if err != nil {
 | |
| 		log.G(ctx).WithError(err).Warn("content garbage collection failed")
 | |
| 	} else {
 | |
| 		log.G(ctx).WithField("d", d).Debugf("content garbage collected")
 | |
| 	}
 | |
| 
 | |
| 	return d, err
 | |
| }
 | 
