diff --git a/content/local/store.go b/content/local/store.go index 11dfd61a3..6df3df618 100644 --- a/content/local/store.go +++ b/content/local/store.go @@ -322,6 +322,40 @@ func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]content.Statu return active, nil } +// WalkStatusRefs is used to walk all status references +// Failed status reads will be logged and ignored, if +// this function is called while references are being altered, +// these error messages may be produced. +func (s *store) WalkStatusRefs(ctx context.Context, fn func(string) error) error { + fp, err := os.Open(filepath.Join(s.root, "ingest")) + if err != nil { + return err + } + + defer fp.Close() + + fis, err := fp.Readdir(-1) + if err != nil { + return err + } + + for _, fi := range fis { + rf := filepath.Join(s.root, "ingest", fi.Name(), "ref") + + ref, err := readFileString(rf) + if err != nil { + log.G(ctx).WithError(err).WithField("path", rf).Error("failed to read ingest ref") + continue + } + + if err := fn(ref); err != nil { + return err + } + } + + return nil +} + // status works like stat above except uses the path to the ingest. func (s *store) status(ingestPath string) (content.Status, error) { dp := filepath.Join(ingestPath, "data") diff --git a/metadata/buckets.go b/metadata/buckets.go index a82c32fd0..fcf4c2959 100644 --- a/metadata/buckets.go +++ b/metadata/buckets.go @@ -72,6 +72,7 @@ var ( bucketKeyCreatedAt = []byte("createdat") bucketKeyExpected = []byte("expected") bucketKeyRef = []byte("ref") + bucketKeyExpireAt = []byte("expireat") deprecatedBucketKeyObjectIngest = []byte("ingest") // stores ingest links, deprecated in v1.2 ) diff --git a/metadata/content.go b/metadata/content.go index b8f0ae15d..5835bc37d 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -328,6 +328,10 @@ func (cs *contentStore) Abort(ctx context.Context, ref string) error { return err } + if err := removeIngestLease(ctx, tx, ref); err != nil { + return err + } + // if not shared content, delete active ingest on backend if expected == "" { return cs.Store.Abort(ctx, bref) @@ -395,6 +399,11 @@ func (cs *contentStore) Writer(ctx context.Context, opts ...content.WriterOpt) ( return err } + leased, err := addIngestLease(ctx, tx, wOpts.Ref) + if err != nil { + return err + } + brefb := bkt.Get(bucketKeyRef) if brefb == nil { sid, err := bkt.NextSequence() @@ -409,6 +418,18 @@ func (cs *contentStore) Writer(ctx context.Context, opts ...content.WriterOpt) ( } else { bref = string(brefb) } + if !leased { + // Add timestamp to allow aborting once stale + // When lease is set the ingest shoudl be aborted + // after lease it belonged to is deleted. + // Expiration can be configurable in the future to + // give more control to the daemon, however leases + // already give users more control of expiration. + expireAt := time.Now().UTC().Add(24 * 3600 * time.Second) + if err := writeExpireAt(expireAt, bkt); err != nil { + return err + } + } if shared { if err := bkt.Put(bucketKeyExpected, []byte(wOpts.Desc.Digest)); err != nil { @@ -543,6 +564,9 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig if err != nil { return err } + if err := removeIngestLease(ctx, tx, nw.ref); err != nil { + return err + } return addContentLease(ctx, tx, dgst) }) } @@ -697,6 +721,30 @@ func writeInfo(info *content.Info, bkt *bolt.Bucket) error { return bkt.Put(bucketKeySize, sizeEncoded) } +func readExpireAt(bkt *bolt.Bucket) (*time.Time, error) { + v := bkt.Get(bucketKeyExpireAt) + if v == nil { + return nil, nil + } + t := &time.Time{} + if err := t.UnmarshalBinary(v); err != nil { + return nil, err + } + return t, nil +} + +func writeExpireAt(expire time.Time, bkt *bolt.Bucket) error { + expireAt, err := expire.MarshalBinary() + if err != nil { + return err + } + if err := bkt.Put(bucketKeyExpireAt, expireAt); err != nil { + return err + } + + return nil +} + func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, err error) { cs.l.Lock() t1 := time.Now() @@ -707,7 +755,8 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er cs.l.Unlock() }() - seen := map[string]struct{}{} + contentSeen := map[string]struct{}{} + ingestSeen := map[string]struct{}{} if err := cs.db.View(func(tx *bolt.Tx) error { v1bkt := tx.Bucket(bucketKeyVersion) if v1bkt == nil { @@ -730,7 +779,7 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er if bbkt != nil { if err := bbkt.ForEach(func(ck, cv []byte) error { if cv == nil { - seen[string(ck)] = struct{}{} + contentSeen[string(ck)] = struct{}{} } return nil }); err != nil { @@ -742,9 +791,17 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er if ibkt != nil { if err := ibkt.ForEach(func(ref, v []byte) error { if v == nil { - expected := ibkt.Bucket(ref).Get(bucketKeyExpected) + bkt := ibkt.Bucket(ref) + // expected here may be from a different namespace + // so much be explicitly retained from the ingest + // in case it was removed from the other namespace + expected := bkt.Get(bucketKeyExpected) if len(expected) > 0 { - seen[string(expected)] = struct{}{} + contentSeen[string(expected)] = struct{}{} + } + bref := bkt.Get(bucketKeyRef) + if len(bref) > 0 { + ingestSeen[string(bref)] = struct{}{} } } return nil @@ -760,7 +817,7 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er } err = cs.Store.Walk(ctx, func(info content.Info) error { - if _, ok := seen[info.Digest.String()]; !ok { + if _, ok := contentSeen[info.Digest.String()]; !ok { if err := cs.Store.Delete(ctx, info.Digest); err != nil { return err } @@ -768,5 +825,40 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er } return nil }) + if err != nil { + return + } + + // If the content store has implemented a more efficient walk function + // then use that else fallback to reading all statuses which may + // cause reading of unneeded metadata. + type statusWalker interface { + WalkStatusRefs(context.Context, func(string) error) error + } + if w, ok := cs.Store.(statusWalker); ok { + err = w.WalkStatusRefs(ctx, func(ref string) error { + if _, ok := ingestSeen[ref]; !ok { + if err := cs.Store.Abort(ctx, ref); err != nil { + return err + } + log.G(ctx).WithField("ref", ref).Debug("cleanup aborting ingest") + } + return nil + }) + } else { + var statuses []content.Status + statuses, err = cs.Store.ListStatuses(ctx) + if err != nil { + return 0, err + } + for _, status := range statuses { + if _, ok := ingestSeen[status.Ref]; !ok { + if err = cs.Store.Abort(ctx, status.Ref); err != nil { + return + } + log.G(ctx).WithField("ref", status.Ref).Debug("cleanup aborting ingest") + } + } + } return } diff --git a/metadata/content_test.go b/metadata/content_test.go index 7fab6cfce..3645a85d5 100644 --- a/metadata/content_test.go +++ b/metadata/content_test.go @@ -89,6 +89,11 @@ func TestContentLeased(t *testing.T) { if err := checkContentLeased(lctx, db, expected); err != nil { t.Fatal("lease checked failed:", err) } + if err := checkIngestLeased(lctx, db, "test-1"); err == nil { + t.Fatal("test-1 should not be leased after write") + } else if !errdefs.IsNotFound(err) { + t.Fatal("lease checked failed:", err) + } lctx, _, err = createLease(ctx, db, "lease-2") if err != nil { @@ -105,6 +110,48 @@ func TestContentLeased(t *testing.T) { if err := checkContentLeased(lctx, db, expected); err != nil { t.Fatal("lease checked failed:", err) } + if err := checkIngestLeased(lctx, db, "test-2"); err == nil { + t.Fatal("test-2 should not be leased") + } else if !errdefs.IsNotFound(err) { + t.Fatal("lease checked failed:", err) + } +} + +func TestIngestLeased(t *testing.T) { + ctx, db, cancel := testDB(t) + defer cancel() + + cs := db.ContentStore() + + blob := []byte("any content") + expected := digest.FromBytes(blob) + + lctx, _, err := createLease(ctx, db, "lease-1") + if err != nil { + t.Fatal(err) + } + + w, err := cs.Writer(lctx, + content.WithRef("test-1"), + content.WithDescriptor(ocispec.Descriptor{Size: int64(len(blob)), Digest: expected})) + if err != nil { + t.Fatal(err) + } + err = checkIngestLeased(lctx, db, "test-1") + w.Close() + if err != nil { + t.Fatal("lease checked failed:", err) + } + + if err := cs.Abort(lctx, "test-1"); err != nil { + t.Fatal(err) + } + + if err := checkIngestLeased(lctx, db, "test-1"); err == nil { + t.Fatal("test-1 should not be leased after write") + } else if !errdefs.IsNotFound(err) { + t.Fatal("lease checked failed:", err) + } } func createLease(ctx context.Context, db *DB, name string) (context.Context, func() error, error) { @@ -146,3 +193,27 @@ func checkContentLeased(ctx context.Context, db *DB, dgst digest.Digest) error { return nil }) } + +func checkIngestLeased(ctx context.Context, db *DB, ref string) error { + ns, ok := namespaces.Namespace(ctx) + if !ok { + return errors.New("no namespace in context") + } + lease, ok := leases.FromContext(ctx) + if !ok { + return errors.New("no lease in context") + } + + return db.View(func(tx *bolt.Tx) error { + bkt := getBucket(tx, bucketKeyVersion, []byte(ns), bucketKeyObjectLeases, []byte(lease), bucketKeyObjectIngests) + if bkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "bucket not found %s", lease) + } + v := bkt.Get([]byte(ref)) + if v == nil { + return errors.Wrap(errdefs.ErrNotFound, "object not leased") + } + + return nil + }) +} diff --git a/metadata/db.go b/metadata/db.go index 4398fc20f..0ea8ad78f 100644 --- a/metadata/db.go +++ b/metadata/db.go @@ -275,7 +275,7 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) { if idx := strings.IndexRune(n.Key, '/'); idx > 0 { m.dirtySS[n.Key[:idx]] = struct{}{} } - } else if n.Type == ResourceContent { + } else if n.Type == ResourceContent || n.Type == ResourceIngest { m.dirtyCS = true } return remove(ctx, tx, n) diff --git a/metadata/gc.go b/metadata/gc.go index 986a6e52b..eb4d0c7c3 100644 --- a/metadata/gc.go +++ b/metadata/gc.go @@ -42,6 +42,8 @@ const ( ResourceTask // ResourceLease specifies a lease ResourceLease + // ResourceIngest specifies a content ingest + ResourceIngest ) var ( @@ -136,6 +138,20 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { } } + ibkt := libkt.Bucket(bucketKeyObjectIngests) + if ibkt != nil { + if err := ibkt.ForEach(func(k, v []byte) error { + select { + case nc <- gcnode(ResourceIngest, ns, string(k)): + case <-ctx.Done(): + return ctx.Err() + } + return nil + }); err != nil { + return err + } + } + return nil }); err != nil { return err @@ -171,16 +187,39 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { cbkt := nbkt.Bucket(bucketKeyObjectContent) if cbkt != nil { - cbkt = cbkt.Bucket(bucketKeyObjectBlob) - } - if cbkt != nil { - if err := cbkt.ForEach(func(k, v []byte) error { - if v != nil { + ibkt := cbkt.Bucket(bucketKeyObjectIngests) + if ibkt != nil { + if err := ibkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + ea, err := readExpireAt(ibkt.Bucket(k)) + if err != nil { + return err + } + if ea == nil || expThreshold.After(*ea) { + return nil + } + select { + case nc <- gcnode(ResourceIngest, ns, string(k)): + case <-ctx.Done(): + return ctx.Err() + } return nil + }); err != nil { + return err + } + } + cbkt = cbkt.Bucket(bucketKeyObjectBlob) + if cbkt != nil { + if err := cbkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + return sendRootRef(ctx, nc, gcnode(ResourceContent, ns, string(k)), cbkt.Bucket(k)) + }); err != nil { + return err } - return sendRootRef(ctx, nc, gcnode(ResourceContent, ns, string(k)), cbkt.Bucket(k)) - }); err != nil { - return err } } @@ -270,6 +309,19 @@ func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node) } return sendSnapshotRefs(node.Namespace, bkt, fn) + } else if node.Type == ResourceIngest { + // Send expected value + bkt := getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectContent, bucketKeyObjectIngests, []byte(node.Key)) + if bkt == nil { + // Node may be created from dead edge + return nil + } + // Load expected + expected := bkt.Get(bucketKeyExpected) + if len(expected) > 0 { + fn(gcnode(ResourceContent, node.Namespace, string(expected))) + } + return nil } return nil @@ -324,17 +376,30 @@ func scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx context.Context, n gc cbkt := nbkt.Bucket(bucketKeyObjectContent) if cbkt != nil { - cbkt = cbkt.Bucket(bucketKeyObjectBlob) - } - if cbkt != nil { - if err := cbkt.ForEach(func(k, v []byte) error { - if v != nil { - return nil + ibkt := cbkt.Bucket(bucketKeyObjectIngests) + if ibkt != nil { + if err := ibkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + node := gcnode(ResourceIngest, ns, string(k)) + return fn(ctx, node) + }); err != nil { + return err + } + } + + cbkt = cbkt.Bucket(bucketKeyObjectBlob) + if cbkt != nil { + if err := cbkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + node := gcnode(ResourceContent, ns, string(k)) + return fn(ctx, node) + }); err != nil { + return err } - node := gcnode(ResourceContent, ns, string(k)) - return fn(ctx, node) - }); err != nil { - return err } } } @@ -381,6 +446,15 @@ func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error { if lbkt != nil { return lbkt.DeleteBucket([]byte(node.Key)) } + case ResourceIngest: + ibkt := nsbkt.Bucket(bucketKeyObjectContent) + if ibkt != nil { + ibkt = ibkt.Bucket(bucketKeyObjectIngests) + } + if ibkt != nil { + log.G(ctx).WithField("ref", node.Key).Debug("remove ingest") + return ibkt.DeleteBucket([]byte(node.Key)) + } } return nil diff --git a/metadata/gc_test.go b/metadata/gc_test.go index d58c8fb5d..5dd41bea1 100644 --- a/metadata/gc_test.go +++ b/metadata/gc_test.go @@ -51,6 +51,15 @@ func TestGCRoots(t *testing.T) { addContent("ns1", dgst(3), nil), addContent("ns2", dgst(1), nil), addContent("ns2", dgst(2), labelmap(string(labelGCRoot), "always")), + addContent("ns2", dgst(8), nil), + addContent("ns2", dgst(9), nil), + addIngest("ns1", "ingest-1", "", nil), // will be seen as expired + addIngest("ns1", "ingest-2", "", timeIn(0)), // expired + addIngest("ns1", "ingest-3", "", timeIn(3600*time.Second)), + addIngest("ns2", "ingest-4", "", nil), + addIngest("ns2", "ingest-5", dgst(8), nil), + addIngest("ns2", "ingest-6", "", nil), // added to expired lease + addIngest("ns2", "ingest-7", dgst(9), nil), // added to expired lease addSnapshot("ns1", "overlay", "sn1", "", nil), addSnapshot("ns1", "overlay", "sn2", "", nil), addSnapshot("ns1", "overlay", "sn3", "", labelmap(string(labelGCRoot), "always")), @@ -67,9 +76,13 @@ func TestGCRoots(t *testing.T) { addLease("ns2", "l3", labelmap(string(labelGCExpire), time.Now().Add(3600*time.Second).Format(time.RFC3339))), addLeaseContent("ns2", "l3", dgst(6)), addLeaseSnapshot("ns2", "l3", "overlay", "sn7"), + addLeaseIngest("ns2", "l3", "ingest-4"), + addLeaseIngest("ns2", "l3", "ingest-5"), addLease("ns2", "l4", labelmap(string(labelGCExpire), time.Now().Format(time.RFC3339))), addLeaseContent("ns2", "l4", dgst(7)), addLeaseSnapshot("ns2", "l4", "overlay", "sn8"), + addLeaseIngest("ns2", "l4", "ingest-6"), + addLeaseIngest("ns2", "l4", "ingest-7"), } expected := []gc.Node{ @@ -93,6 +106,9 @@ func TestGCRoots(t *testing.T) { gcnode(ResourceLease, "ns2", "l1"), gcnode(ResourceLease, "ns2", "l2"), gcnode(ResourceLease, "ns2", "l3"), + gcnode(ResourceIngest, "ns1", "ingest-3"), + gcnode(ResourceIngest, "ns2", "ingest-4"), + gcnode(ResourceIngest, "ns2", "ingest-5"), } if err := db.Update(func(tx *bolt.Tx) error { @@ -133,6 +149,8 @@ func TestGCRemove(t *testing.T) { addContent("ns1", dgst(3), nil), addContent("ns2", dgst(1), nil), addContent("ns2", dgst(2), labelmap(string(labelGCRoot), "always")), + addIngest("ns1", "ingest-1", "", nil), + addIngest("ns2", "ingest-2", "", timeIn(0)), addSnapshot("ns1", "overlay", "sn1", "", nil), addSnapshot("ns1", "overlay", "sn2", "", nil), addSnapshot("ns1", "overlay", "sn3", "", labelmap(string(labelGCRoot), "always")), @@ -155,6 +173,8 @@ func TestGCRemove(t *testing.T) { gcnode(ResourceSnapshot, "ns2", "overlay/sn1"), gcnode(ResourceLease, "ns1", "l1"), gcnode(ResourceLease, "ns2", "l2"), + gcnode(ResourceIngest, "ns1", "ingest-1"), + gcnode(ResourceIngest, "ns2", "ingest-2"), } var deleted, remaining []gc.Node @@ -223,6 +243,8 @@ func TestGCRefs(t *testing.T) { addContent("ns1", dgst(7), labelmap(string(labelGCContentRef)+"/anything-1", dgst(2).String(), string(labelGCContentRef)+"/anything-2", dgst(3).String())), addContent("ns2", dgst(1), nil), addContent("ns2", dgst(2), nil), + addIngest("ns1", "ingest-1", "", nil), + addIngest("ns2", "ingest-2", dgst(8), nil), addSnapshot("ns1", "overlay", "sn1", "", nil), addSnapshot("ns1", "overlay", "sn2", "sn1", nil), addSnapshot("ns1", "overlay", "sn3", "sn2", nil), @@ -271,6 +293,10 @@ func TestGCRefs(t *testing.T) { gcnode(ResourceSnapshot, "ns2", "overlay/sn2"): { gcnode(ResourceSnapshot, "ns2", "overlay/sn1"), }, + gcnode(ResourceIngest, "ns1", "ingest-1"): nil, + gcnode(ResourceIngest, "ns2", "ingest-2"): { + gcnode(ResourceContent, "ns2", dgst(8).String()), + }, } if err := db.Update(func(tx *bolt.Tx) error { @@ -441,6 +467,26 @@ func addContent(ns string, dgst digest.Digest, labels map[string]string) alterFu } } +func addIngest(ns, ref string, expected digest.Digest, expires *time.Time) alterFunc { + return func(bkt *bolt.Bucket) error { + cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectContent), string(bucketKeyObjectIngests), ref) + if err != nil { + return err + } + if expected != "" { + if err := cbkt.Put(bucketKeyExpected, []byte(expected)); err != nil { + return err + } + } + if expires != nil { + if err := writeExpireAt(*expires, cbkt); err != nil { + return err + } + } + return nil + } +} + func addLease(ns, lid string, labels map[string]string) alterFunc { return func(bkt *bolt.Bucket) error { lbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectLeases), lid) @@ -471,6 +517,16 @@ func addLeaseContent(ns, lid string, dgst digest.Digest) alterFunc { } } +func addLeaseIngest(ns, lid, ref string) alterFunc { + return func(bkt *bolt.Bucket) error { + cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectLeases), lid, string(bucketKeyObjectIngests)) + if err != nil { + return err + } + return cbkt.Put([]byte(ref), nil) + } +} + func addContainer(ns, name, snapshotter, snapshot string, labels map[string]string) alterFunc { return func(bkt *bolt.Bucket) error { cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectContainers), name) @@ -517,3 +573,8 @@ func dgst(i int64) digest.Digest { } return dgstr.Digest() } + +func timeIn(d time.Duration) *time.Time { + t := time.Now().UTC().Add(d) + return &t +} diff --git a/metadata/leases.go b/metadata/leases.go index 25a2a6692..bf68f1766 100644 --- a/metadata/leases.go +++ b/metadata/leases.go @@ -256,3 +256,51 @@ func removeContentLease(ctx context.Context, tx *bolt.Tx, dgst digest.Digest) er return bkt.Delete([]byte(dgst.String())) } + +func addIngestLease(ctx context.Context, tx *bolt.Tx, ref string) (bool, error) { + lid, ok := leases.FromContext(ctx) + if !ok { + return false, nil + } + + namespace, ok := namespaces.Namespace(ctx) + if !ok { + panic("namespace must already be required") + } + + bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid)) + if bkt == nil { + return false, errors.Wrap(errdefs.ErrNotFound, "lease does not exist") + } + + bkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectIngests) + if err != nil { + return false, err + } + + if err := bkt.Put([]byte(ref), nil); err != nil { + return false, err + } + + return true, nil +} + +func removeIngestLease(ctx context.Context, tx *bolt.Tx, ref string) error { + lid, ok := leases.FromContext(ctx) + if !ok { + return nil + } + + namespace, ok := namespaces.Namespace(ctx) + if !ok { + panic("namespace must already be checked") + } + + bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid), bucketKeyObjectIngests) + if bkt == nil { + // Key does not exist so we return nil + return nil + } + + return bkt.Delete([]byte(ref)) +}