Merge pull request #1692 from dmcgowan/gc-fix-sweep-race
Fix race in gc sweep
This commit is contained in:
commit
474d4df2aa
@ -203,6 +203,7 @@ func (m *DB) GarbageCollect(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.dirtyL.Lock()
|
||||
defer m.dirtyL.Unlock()
|
||||
|
||||
@ -210,25 +211,11 @@ func (m *DB) GarbageCollect(ctx context.Context) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
var (
|
||||
nodes []gc.Node
|
||||
wg sync.WaitGroup
|
||||
nodeC = make(chan gc.Node)
|
||||
)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for n := range nodeC {
|
||||
nodes = append(nodes, n)
|
||||
rm := func(ctx context.Context, n gc.Node) error {
|
||||
if _, ok := marked[n]; ok {
|
||||
return nil
|
||||
}
|
||||
}()
|
||||
if err := scanAll(ctx, tx, nodeC); err != nil {
|
||||
return errors.Wrap(err, "failed to scan all")
|
||||
}
|
||||
close(nodeC)
|
||||
wg.Wait()
|
||||
|
||||
return gc.Sweep(marked, nodes, func(n gc.Node) error {
|
||||
if n.Type == ResourceSnapshot {
|
||||
if idx := strings.IndexRune(n.Key, '/'); idx > 0 {
|
||||
m.dirtySS[n.Key[:idx]] = struct{}{}
|
||||
@ -237,7 +224,13 @@ func (m *DB) GarbageCollect(ctx context.Context) error {
|
||||
m.dirtyCS = true
|
||||
}
|
||||
return remove(ctx, tx, n)
|
||||
})
|
||||
}
|
||||
|
||||
if err := scanAll(ctx, tx, rm); err != nil {
|
||||
return errors.Wrap(err, "failed to scan and remove")
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -242,16 +242,11 @@ func TestMetadataCollector(t *testing.T) {
|
||||
var actual []gc.Node
|
||||
|
||||
if err := mdb.View(func(tx *bolt.Tx) error {
|
||||
nodeC := make(chan gc.Node)
|
||||
var scanErr error
|
||||
go func() {
|
||||
defer close(nodeC)
|
||||
scanErr = scanAll(ctx, tx, nodeC)
|
||||
}()
|
||||
for node := range nodeC {
|
||||
scanFn := func(ctx context.Context, node gc.Node) error {
|
||||
actual = append(actual, node)
|
||||
return nil
|
||||
}
|
||||
return scanErr
|
||||
return scanAll(ctx, tx, scanFn)
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -179,7 +179,7 @@ func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)
|
||||
return nil
|
||||
}
|
||||
|
||||
func scanAll(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
func scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx context.Context, n gc.Node) error) error {
|
||||
v1bkt := tx.Bucket(bucketKeyVersion)
|
||||
if v1bkt == nil {
|
||||
return nil
|
||||
@ -206,12 +206,8 @@ func scanAll(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
if v != nil {
|
||||
return nil
|
||||
}
|
||||
select {
|
||||
case nc <- gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k)):
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
return nil
|
||||
node := gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k))
|
||||
return fn(ctx, node)
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
@ -227,12 +223,8 @@ func scanAll(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
if v != nil {
|
||||
return nil
|
||||
}
|
||||
select {
|
||||
case nc <- gcnode(ResourceContent, ns, string(k)):
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
return nil
|
||||
node := gcnode(ResourceContent, ns, string(k))
|
||||
return fn(ctx, node)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -61,7 +61,7 @@ func TestGCRoots(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
checkNodes(ctx, t, db, expected, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
checkNodeC(ctx, t, db, expected, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
return scanRoots(ctx, tx, nc)
|
||||
})
|
||||
}
|
||||
@ -125,8 +125,8 @@ func TestGCRemove(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
checkNodes(ctx, t, db, all, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
return scanAll(ctx, tx, nc)
|
||||
checkNodes(ctx, t, db, all, func(ctx context.Context, tx *bolt.Tx, fn func(context.Context, gc.Node) error) error {
|
||||
return scanAll(ctx, tx, fn)
|
||||
})
|
||||
if t.Failed() {
|
||||
t.Fatal("Scan all failed")
|
||||
@ -143,8 +143,8 @@ func TestGCRemove(t *testing.T) {
|
||||
t.Fatalf("Update failed: %+v", err)
|
||||
}
|
||||
|
||||
checkNodes(ctx, t, db, remaining, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
return scanAll(ctx, tx, nc)
|
||||
checkNodes(ctx, t, db, remaining, func(ctx context.Context, tx *bolt.Tx, fn func(context.Context, gc.Node) error) error {
|
||||
return scanAll(ctx, tx, fn)
|
||||
})
|
||||
}
|
||||
|
||||
@ -223,7 +223,7 @@ func TestGCRefs(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
for n, nodes := range refs {
|
||||
checkNodes(ctx, t, db, nodes, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
checkNodeC(ctx, t, db, nodes, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
return references(ctx, tx, n, func(n gc.Node) {
|
||||
select {
|
||||
case nc <- n:
|
||||
@ -255,7 +255,7 @@ func newDatabase() (*bolt.DB, func(), error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func checkNodes(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.Node, fn func(context.Context, *bolt.Tx, chan<- gc.Node) error) {
|
||||
func checkNodeC(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.Node, fn func(context.Context, *bolt.Tx, chan<- gc.Node) error) {
|
||||
var actual []gc.Node
|
||||
nc := make(chan gc.Node)
|
||||
done := make(chan struct{})
|
||||
@ -276,6 +276,22 @@ func checkNodes(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.No
|
||||
checkNodesEqual(t, actual, expected)
|
||||
}
|
||||
|
||||
func checkNodes(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.Node, fn func(context.Context, *bolt.Tx, func(context.Context, gc.Node) error) error) {
|
||||
var actual []gc.Node
|
||||
scanFn := func(ctx context.Context, n gc.Node) error {
|
||||
actual = append(actual, n)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := db.View(func(tx *bolt.Tx) error {
|
||||
return fn(ctx, tx, scanFn)
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
checkNodesEqual(t, actual, expected)
|
||||
}
|
||||
|
||||
func checkNodesEqual(t *testing.T, n1, n2 []gc.Node) {
|
||||
sort.Sort(nodeList(n1))
|
||||
sort.Sort(nodeList(n2))
|
||||
|
Loading…
Reference in New Issue
Block a user