diff --git a/gc/gc.go b/gc/gc.go index 47421a822..70838a762 100644 --- a/gc/gc.go +++ b/gc/gc.go @@ -145,10 +145,10 @@ func ConcurrentMark(ctx context.Context, root <-chan Node, refs func(context.Con // Sweep removes all nodes returned through the channel which are not in // the reachable set by calling the provided remove function. -func Sweep(reachable map[Node]struct{}, all <-chan Node, remove func(Node) error) error { +func Sweep(reachable map[Node]struct{}, all []Node, remove func(Node) error) error { // All black objects are now reachable, and all white objects are // unreachable. Free those that are white! - for node := range all { + for _, node := range all { if _, ok := reachable[node]; !ok { if err := remove(node); err != nil { return err diff --git a/metadata/db.go b/metadata/db.go index 18eba909e..8e9944876 100644 --- a/metadata/db.go +++ b/metadata/db.go @@ -199,42 +199,10 @@ func (m *DB) GarbageCollect(ctx context.Context) error { log.G(ctx).WithField("d", time.Now().Sub(lt1)).Debug("metadata garbage collected") }() - var marked map[gc.Node]struct{} - - if err := m.db.View(func(tx *bolt.Tx) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - roots := make(chan gc.Node) - errChan := make(chan error) - go func() { - defer close(errChan) - defer close(roots) - - // Call roots - if err := scanRoots(ctx, tx, roots); err != nil { - cancel() - errChan <- err - } - }() - - refs := func(ctx context.Context, n gc.Node, fn func(gc.Node)) error { - return references(ctx, tx, n, fn) - } - - reachable, err := gc.ConcurrentMark(ctx, roots, refs) - if rerr := <-errChan; rerr != nil { - return rerr - } - if err != nil { - return err - } - marked = reachable - return nil - }); err != nil { + marked, err := m.getMarked(ctx) + if err != nil { return err } - m.dirtyL.Lock() defer m.dirtyL.Unlock() @@ -242,15 +210,25 @@ func (m *DB) GarbageCollect(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - nodeC := make(chan gc.Node) - var scanErr error - + var ( + nodes []gc.Node + wg sync.WaitGroup + nodeC = make(chan gc.Node) + ) + wg.Add(1) go func() { - defer close(nodeC) - scanErr = scanAll(ctx, tx, nodeC) + defer wg.Done() + for n := range nodeC { + nodes = append(nodes, n) + } }() + if err := scanAll(ctx, tx, nodeC); err != nil { + return errors.Wrap(err, "failed to scan all") + } + close(nodeC) + wg.Wait() - rm := func(n gc.Node) error { + return gc.Sweep(marked, nodes, func(n gc.Node) error { if n.Type == ResourceSnapshot { if idx := strings.IndexRune(n.Key, '/'); idx > 0 { m.dirtySS[n.Key[:idx]] = struct{}{} @@ -259,17 +237,7 @@ func (m *DB) GarbageCollect(ctx context.Context) error { m.dirtyCS = true } return remove(ctx, tx, n) - } - - if err := gc.Sweep(marked, nodeC, rm); err != nil { - return errors.Wrap(err, "failed to sweep") - } - - if scanErr != nil { - return errors.Wrap(scanErr, "failed to scan all") - } - - return nil + }) }); err != nil { return err } @@ -293,6 +261,54 @@ func (m *DB) GarbageCollect(ctx context.Context) error { return nil } +func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) { + var marked map[gc.Node]struct{} + if err := m.db.View(func(tx *bolt.Tx) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var ( + nodes []gc.Node + wg sync.WaitGroup + roots = make(chan gc.Node) + ) + wg.Add(1) + go func() { + defer wg.Done() + for n := range roots { + nodes = append(nodes, n) + } + }() + // Call roots + if err := scanRoots(ctx, tx, roots); err != nil { + cancel() + return err + } + close(roots) + wg.Wait() + + refs := func(n gc.Node) ([]gc.Node, error) { + var sn []gc.Node + if err := references(ctx, tx, n, func(nn gc.Node) { + sn = append(sn, nn) + }); err != nil { + return nil, err + } + return sn, nil + } + + reachable, err := gc.Tricolor(nodes, refs) + if err != nil { + return err + } + marked = reachable + return nil + }); err != nil { + return nil, err + } + return marked, nil +} + func (m *DB) cleanupSnapshotter(name string) { ctx := context.Background() sn, ok := m.ss[name]