Fix races with concurrent GC

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2017-10-27 14:47:23 -04:00
parent b8c4d85564
commit e74c423f78
2 changed files with 69 additions and 53 deletions

View File

@ -145,10 +145,10 @@ func ConcurrentMark(ctx context.Context, root <-chan Node, refs func(context.Con
// Sweep removes all nodes returned through the channel which are not in // Sweep removes all nodes returned through the channel which are not in
// the reachable set by calling the provided remove function. // the reachable set by calling the provided remove function.
func Sweep(reachable map[Node]struct{}, all <-chan Node, remove func(Node) error) error { func Sweep(reachable map[Node]struct{}, all []Node, remove func(Node) error) error {
// All black objects are now reachable, and all white objects are // All black objects are now reachable, and all white objects are
// unreachable. Free those that are white! // unreachable. Free those that are white!
for node := range all { for _, node := range all {
if _, ok := reachable[node]; !ok { if _, ok := reachable[node]; !ok {
if err := remove(node); err != nil { if err := remove(node); err != nil {
return err return err

View File

@ -199,42 +199,10 @@ func (m *DB) GarbageCollect(ctx context.Context) error {
log.G(ctx).WithField("d", time.Now().Sub(lt1)).Debug("metadata garbage collected") log.G(ctx).WithField("d", time.Now().Sub(lt1)).Debug("metadata garbage collected")
}() }()
var marked map[gc.Node]struct{} marked, err := m.getMarked(ctx)
if err != nil {
if err := m.db.View(func(tx *bolt.Tx) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
roots := make(chan gc.Node)
errChan := make(chan error)
go func() {
defer close(errChan)
defer close(roots)
// Call roots
if err := scanRoots(ctx, tx, roots); err != nil {
cancel()
errChan <- err
}
}()
refs := func(ctx context.Context, n gc.Node, fn func(gc.Node)) error {
return references(ctx, tx, n, fn)
}
reachable, err := gc.ConcurrentMark(ctx, roots, refs)
if rerr := <-errChan; rerr != nil {
return rerr
}
if err != nil {
return err
}
marked = reachable
return nil
}); err != nil {
return err return err
} }
m.dirtyL.Lock() m.dirtyL.Lock()
defer m.dirtyL.Unlock() defer m.dirtyL.Unlock()
@ -242,15 +210,25 @@ func (m *DB) GarbageCollect(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
nodeC := make(chan gc.Node) var (
var scanErr error nodes []gc.Node
wg sync.WaitGroup
nodeC = make(chan gc.Node)
)
wg.Add(1)
go func() { go func() {
defer close(nodeC) defer wg.Done()
scanErr = scanAll(ctx, tx, nodeC) for n := range nodeC {
nodes = append(nodes, n)
}
}() }()
if err := scanAll(ctx, tx, nodeC); err != nil {
return errors.Wrap(err, "failed to scan all")
}
close(nodeC)
wg.Wait()
rm := func(n gc.Node) error { return gc.Sweep(marked, nodes, func(n gc.Node) error {
if n.Type == ResourceSnapshot { if n.Type == ResourceSnapshot {
if idx := strings.IndexRune(n.Key, '/'); idx > 0 { if idx := strings.IndexRune(n.Key, '/'); idx > 0 {
m.dirtySS[n.Key[:idx]] = struct{}{} m.dirtySS[n.Key[:idx]] = struct{}{}
@ -259,17 +237,7 @@ func (m *DB) GarbageCollect(ctx context.Context) error {
m.dirtyCS = true m.dirtyCS = true
} }
return remove(ctx, tx, n) return remove(ctx, tx, n)
} })
if err := gc.Sweep(marked, nodeC, rm); err != nil {
return errors.Wrap(err, "failed to sweep")
}
if scanErr != nil {
return errors.Wrap(scanErr, "failed to scan all")
}
return nil
}); err != nil { }); err != nil {
return err return err
} }
@ -293,6 +261,54 @@ func (m *DB) GarbageCollect(ctx context.Context) error {
return nil return nil
} }
func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) {
var marked map[gc.Node]struct{}
if err := m.db.View(func(tx *bolt.Tx) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var (
nodes []gc.Node
wg sync.WaitGroup
roots = make(chan gc.Node)
)
wg.Add(1)
go func() {
defer wg.Done()
for n := range roots {
nodes = append(nodes, n)
}
}()
// Call roots
if err := scanRoots(ctx, tx, roots); err != nil {
cancel()
return err
}
close(roots)
wg.Wait()
refs := func(n gc.Node) ([]gc.Node, error) {
var sn []gc.Node
if err := references(ctx, tx, n, func(nn gc.Node) {
sn = append(sn, nn)
}); err != nil {
return nil, err
}
return sn, nil
}
reachable, err := gc.Tricolor(nodes, refs)
if err != nil {
return err
}
marked = reachable
return nil
}); err != nil {
return nil, err
}
return marked, nil
}
func (m *DB) cleanupSnapshotter(name string) { func (m *DB) cleanupSnapshotter(name string) {
ctx := context.Background() ctx := context.Background()
sn, ok := m.ss[name] sn, ok := m.ss[name]