Merge pull request #2496 from dmcgowan/lease-content-uploads

Add content ingests to lease and gc
This commit is contained in:
Michael Crosby 2018-07-31 11:27:57 -04:00 committed by GitHub
commit d0ab8c8510
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 405 additions and 24 deletions

View File

@ -322,6 +322,40 @@ func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]content.Statu
return active, nil
}
// WalkStatusRefs is used to walk all status references
// Failed status reads will be logged and ignored, if
// this function is called while references are being altered,
// these error messages may be produced.
func (s *store) WalkStatusRefs(ctx context.Context, fn func(string) error) error {
fp, err := os.Open(filepath.Join(s.root, "ingest"))
if err != nil {
return err
}
defer fp.Close()
fis, err := fp.Readdir(-1)
if err != nil {
return err
}
for _, fi := range fis {
rf := filepath.Join(s.root, "ingest", fi.Name(), "ref")
ref, err := readFileString(rf)
if err != nil {
log.G(ctx).WithError(err).WithField("path", rf).Error("failed to read ingest ref")
continue
}
if err := fn(ref); err != nil {
return err
}
}
return nil
}
// status works like stat above except uses the path to the ingest.
func (s *store) status(ingestPath string) (content.Status, error) {
dp := filepath.Join(ingestPath, "data")

View File

@ -72,6 +72,7 @@ var (
bucketKeyCreatedAt = []byte("createdat")
bucketKeyExpected = []byte("expected")
bucketKeyRef = []byte("ref")
bucketKeyExpireAt = []byte("expireat")
deprecatedBucketKeyObjectIngest = []byte("ingest") // stores ingest links, deprecated in v1.2
)

View File

@ -328,6 +328,10 @@ func (cs *contentStore) Abort(ctx context.Context, ref string) error {
return err
}
if err := removeIngestLease(ctx, tx, ref); err != nil {
return err
}
// if not shared content, delete active ingest on backend
if expected == "" {
return cs.Store.Abort(ctx, bref)
@ -395,6 +399,11 @@ func (cs *contentStore) Writer(ctx context.Context, opts ...content.WriterOpt) (
return err
}
leased, err := addIngestLease(ctx, tx, wOpts.Ref)
if err != nil {
return err
}
brefb := bkt.Get(bucketKeyRef)
if brefb == nil {
sid, err := bkt.NextSequence()
@ -409,6 +418,18 @@ func (cs *contentStore) Writer(ctx context.Context, opts ...content.WriterOpt) (
} else {
bref = string(brefb)
}
if !leased {
// Add timestamp to allow aborting once stale
// When lease is set the ingest shoudl be aborted
// after lease it belonged to is deleted.
// Expiration can be configurable in the future to
// give more control to the daemon, however leases
// already give users more control of expiration.
expireAt := time.Now().UTC().Add(24 * 3600 * time.Second)
if err := writeExpireAt(expireAt, bkt); err != nil {
return err
}
}
if shared {
if err := bkt.Put(bucketKeyExpected, []byte(wOpts.Desc.Digest)); err != nil {
@ -543,6 +564,9 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig
if err != nil {
return err
}
if err := removeIngestLease(ctx, tx, nw.ref); err != nil {
return err
}
return addContentLease(ctx, tx, dgst)
})
}
@ -697,6 +721,30 @@ func writeInfo(info *content.Info, bkt *bolt.Bucket) error {
return bkt.Put(bucketKeySize, sizeEncoded)
}
func readExpireAt(bkt *bolt.Bucket) (*time.Time, error) {
v := bkt.Get(bucketKeyExpireAt)
if v == nil {
return nil, nil
}
t := &time.Time{}
if err := t.UnmarshalBinary(v); err != nil {
return nil, err
}
return t, nil
}
func writeExpireAt(expire time.Time, bkt *bolt.Bucket) error {
expireAt, err := expire.MarshalBinary()
if err != nil {
return err
}
if err := bkt.Put(bucketKeyExpireAt, expireAt); err != nil {
return err
}
return nil
}
func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, err error) {
cs.l.Lock()
t1 := time.Now()
@ -707,7 +755,8 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er
cs.l.Unlock()
}()
seen := map[string]struct{}{}
contentSeen := map[string]struct{}{}
ingestSeen := map[string]struct{}{}
if err := cs.db.View(func(tx *bolt.Tx) error {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
@ -730,7 +779,7 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er
if bbkt != nil {
if err := bbkt.ForEach(func(ck, cv []byte) error {
if cv == nil {
seen[string(ck)] = struct{}{}
contentSeen[string(ck)] = struct{}{}
}
return nil
}); err != nil {
@ -742,9 +791,17 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er
if ibkt != nil {
if err := ibkt.ForEach(func(ref, v []byte) error {
if v == nil {
expected := ibkt.Bucket(ref).Get(bucketKeyExpected)
bkt := ibkt.Bucket(ref)
// expected here may be from a different namespace
// so much be explicitly retained from the ingest
// in case it was removed from the other namespace
expected := bkt.Get(bucketKeyExpected)
if len(expected) > 0 {
seen[string(expected)] = struct{}{}
contentSeen[string(expected)] = struct{}{}
}
bref := bkt.Get(bucketKeyRef)
if len(bref) > 0 {
ingestSeen[string(bref)] = struct{}{}
}
}
return nil
@ -760,7 +817,7 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er
}
err = cs.Store.Walk(ctx, func(info content.Info) error {
if _, ok := seen[info.Digest.String()]; !ok {
if _, ok := contentSeen[info.Digest.String()]; !ok {
if err := cs.Store.Delete(ctx, info.Digest); err != nil {
return err
}
@ -768,5 +825,40 @@ func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, er
}
return nil
})
if err != nil {
return
}
// If the content store has implemented a more efficient walk function
// then use that else fallback to reading all statuses which may
// cause reading of unneeded metadata.
type statusWalker interface {
WalkStatusRefs(context.Context, func(string) error) error
}
if w, ok := cs.Store.(statusWalker); ok {
err = w.WalkStatusRefs(ctx, func(ref string) error {
if _, ok := ingestSeen[ref]; !ok {
if err := cs.Store.Abort(ctx, ref); err != nil {
return err
}
log.G(ctx).WithField("ref", ref).Debug("cleanup aborting ingest")
}
return nil
})
} else {
var statuses []content.Status
statuses, err = cs.Store.ListStatuses(ctx)
if err != nil {
return 0, err
}
for _, status := range statuses {
if _, ok := ingestSeen[status.Ref]; !ok {
if err = cs.Store.Abort(ctx, status.Ref); err != nil {
return
}
log.G(ctx).WithField("ref", status.Ref).Debug("cleanup aborting ingest")
}
}
}
return
}

View File

@ -89,6 +89,11 @@ func TestContentLeased(t *testing.T) {
if err := checkContentLeased(lctx, db, expected); err != nil {
t.Fatal("lease checked failed:", err)
}
if err := checkIngestLeased(lctx, db, "test-1"); err == nil {
t.Fatal("test-1 should not be leased after write")
} else if !errdefs.IsNotFound(err) {
t.Fatal("lease checked failed:", err)
}
lctx, _, err = createLease(ctx, db, "lease-2")
if err != nil {
@ -105,6 +110,48 @@ func TestContentLeased(t *testing.T) {
if err := checkContentLeased(lctx, db, expected); err != nil {
t.Fatal("lease checked failed:", err)
}
if err := checkIngestLeased(lctx, db, "test-2"); err == nil {
t.Fatal("test-2 should not be leased")
} else if !errdefs.IsNotFound(err) {
t.Fatal("lease checked failed:", err)
}
}
func TestIngestLeased(t *testing.T) {
ctx, db, cancel := testDB(t)
defer cancel()
cs := db.ContentStore()
blob := []byte("any content")
expected := digest.FromBytes(blob)
lctx, _, err := createLease(ctx, db, "lease-1")
if err != nil {
t.Fatal(err)
}
w, err := cs.Writer(lctx,
content.WithRef("test-1"),
content.WithDescriptor(ocispec.Descriptor{Size: int64(len(blob)), Digest: expected}))
if err != nil {
t.Fatal(err)
}
err = checkIngestLeased(lctx, db, "test-1")
w.Close()
if err != nil {
t.Fatal("lease checked failed:", err)
}
if err := cs.Abort(lctx, "test-1"); err != nil {
t.Fatal(err)
}
if err := checkIngestLeased(lctx, db, "test-1"); err == nil {
t.Fatal("test-1 should not be leased after write")
} else if !errdefs.IsNotFound(err) {
t.Fatal("lease checked failed:", err)
}
}
func createLease(ctx context.Context, db *DB, name string) (context.Context, func() error, error) {
@ -146,3 +193,27 @@ func checkContentLeased(ctx context.Context, db *DB, dgst digest.Digest) error {
return nil
})
}
func checkIngestLeased(ctx context.Context, db *DB, ref string) error {
ns, ok := namespaces.Namespace(ctx)
if !ok {
return errors.New("no namespace in context")
}
lease, ok := leases.FromContext(ctx)
if !ok {
return errors.New("no lease in context")
}
return db.View(func(tx *bolt.Tx) error {
bkt := getBucket(tx, bucketKeyVersion, []byte(ns), bucketKeyObjectLeases, []byte(lease), bucketKeyObjectIngests)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "bucket not found %s", lease)
}
v := bkt.Get([]byte(ref))
if v == nil {
return errors.Wrap(errdefs.ErrNotFound, "object not leased")
}
return nil
})
}

View File

@ -275,7 +275,7 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
if idx := strings.IndexRune(n.Key, '/'); idx > 0 {
m.dirtySS[n.Key[:idx]] = struct{}{}
}
} else if n.Type == ResourceContent {
} else if n.Type == ResourceContent || n.Type == ResourceIngest {
m.dirtyCS = true
}
return remove(ctx, tx, n)

View File

@ -42,6 +42,8 @@ const (
ResourceTask
// ResourceLease specifies a lease
ResourceLease
// ResourceIngest specifies a content ingest
ResourceIngest
)
var (
@ -136,6 +138,20 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
}
}
ibkt := libkt.Bucket(bucketKeyObjectIngests)
if ibkt != nil {
if err := ibkt.ForEach(func(k, v []byte) error {
select {
case nc <- gcnode(ResourceIngest, ns, string(k)):
case <-ctx.Done():
return ctx.Err()
}
return nil
}); err != nil {
return err
}
}
return nil
}); err != nil {
return err
@ -171,16 +187,39 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
cbkt := nbkt.Bucket(bucketKeyObjectContent)
if cbkt != nil {
cbkt = cbkt.Bucket(bucketKeyObjectBlob)
}
if cbkt != nil {
if err := cbkt.ForEach(func(k, v []byte) error {
if v != nil {
ibkt := cbkt.Bucket(bucketKeyObjectIngests)
if ibkt != nil {
if err := ibkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
ea, err := readExpireAt(ibkt.Bucket(k))
if err != nil {
return err
}
if ea == nil || expThreshold.After(*ea) {
return nil
}
select {
case nc <- gcnode(ResourceIngest, ns, string(k)):
case <-ctx.Done():
return ctx.Err()
}
return nil
}); err != nil {
return err
}
}
cbkt = cbkt.Bucket(bucketKeyObjectBlob)
if cbkt != nil {
if err := cbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
return sendRootRef(ctx, nc, gcnode(ResourceContent, ns, string(k)), cbkt.Bucket(k))
}); err != nil {
return err
}
return sendRootRef(ctx, nc, gcnode(ResourceContent, ns, string(k)), cbkt.Bucket(k))
}); err != nil {
return err
}
}
@ -270,6 +309,19 @@ func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)
}
return sendSnapshotRefs(node.Namespace, bkt, fn)
} else if node.Type == ResourceIngest {
// Send expected value
bkt := getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectContent, bucketKeyObjectIngests, []byte(node.Key))
if bkt == nil {
// Node may be created from dead edge
return nil
}
// Load expected
expected := bkt.Get(bucketKeyExpected)
if len(expected) > 0 {
fn(gcnode(ResourceContent, node.Namespace, string(expected)))
}
return nil
}
return nil
@ -324,17 +376,30 @@ func scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx context.Context, n gc
cbkt := nbkt.Bucket(bucketKeyObjectContent)
if cbkt != nil {
cbkt = cbkt.Bucket(bucketKeyObjectBlob)
}
if cbkt != nil {
if err := cbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
ibkt := cbkt.Bucket(bucketKeyObjectIngests)
if ibkt != nil {
if err := ibkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
node := gcnode(ResourceIngest, ns, string(k))
return fn(ctx, node)
}); err != nil {
return err
}
}
cbkt = cbkt.Bucket(bucketKeyObjectBlob)
if cbkt != nil {
if err := cbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
node := gcnode(ResourceContent, ns, string(k))
return fn(ctx, node)
}); err != nil {
return err
}
node := gcnode(ResourceContent, ns, string(k))
return fn(ctx, node)
}); err != nil {
return err
}
}
}
@ -381,6 +446,15 @@ func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error {
if lbkt != nil {
return lbkt.DeleteBucket([]byte(node.Key))
}
case ResourceIngest:
ibkt := nsbkt.Bucket(bucketKeyObjectContent)
if ibkt != nil {
ibkt = ibkt.Bucket(bucketKeyObjectIngests)
}
if ibkt != nil {
log.G(ctx).WithField("ref", node.Key).Debug("remove ingest")
return ibkt.DeleteBucket([]byte(node.Key))
}
}
return nil

View File

@ -51,6 +51,15 @@ func TestGCRoots(t *testing.T) {
addContent("ns1", dgst(3), nil),
addContent("ns2", dgst(1), nil),
addContent("ns2", dgst(2), labelmap(string(labelGCRoot), "always")),
addContent("ns2", dgst(8), nil),
addContent("ns2", dgst(9), nil),
addIngest("ns1", "ingest-1", "", nil), // will be seen as expired
addIngest("ns1", "ingest-2", "", timeIn(0)), // expired
addIngest("ns1", "ingest-3", "", timeIn(3600*time.Second)),
addIngest("ns2", "ingest-4", "", nil),
addIngest("ns2", "ingest-5", dgst(8), nil),
addIngest("ns2", "ingest-6", "", nil), // added to expired lease
addIngest("ns2", "ingest-7", dgst(9), nil), // added to expired lease
addSnapshot("ns1", "overlay", "sn1", "", nil),
addSnapshot("ns1", "overlay", "sn2", "", nil),
addSnapshot("ns1", "overlay", "sn3", "", labelmap(string(labelGCRoot), "always")),
@ -67,9 +76,13 @@ func TestGCRoots(t *testing.T) {
addLease("ns2", "l3", labelmap(string(labelGCExpire), time.Now().Add(3600*time.Second).Format(time.RFC3339))),
addLeaseContent("ns2", "l3", dgst(6)),
addLeaseSnapshot("ns2", "l3", "overlay", "sn7"),
addLeaseIngest("ns2", "l3", "ingest-4"),
addLeaseIngest("ns2", "l3", "ingest-5"),
addLease("ns2", "l4", labelmap(string(labelGCExpire), time.Now().Format(time.RFC3339))),
addLeaseContent("ns2", "l4", dgst(7)),
addLeaseSnapshot("ns2", "l4", "overlay", "sn8"),
addLeaseIngest("ns2", "l4", "ingest-6"),
addLeaseIngest("ns2", "l4", "ingest-7"),
}
expected := []gc.Node{
@ -93,6 +106,9 @@ func TestGCRoots(t *testing.T) {
gcnode(ResourceLease, "ns2", "l1"),
gcnode(ResourceLease, "ns2", "l2"),
gcnode(ResourceLease, "ns2", "l3"),
gcnode(ResourceIngest, "ns1", "ingest-3"),
gcnode(ResourceIngest, "ns2", "ingest-4"),
gcnode(ResourceIngest, "ns2", "ingest-5"),
}
if err := db.Update(func(tx *bolt.Tx) error {
@ -133,6 +149,8 @@ func TestGCRemove(t *testing.T) {
addContent("ns1", dgst(3), nil),
addContent("ns2", dgst(1), nil),
addContent("ns2", dgst(2), labelmap(string(labelGCRoot), "always")),
addIngest("ns1", "ingest-1", "", nil),
addIngest("ns2", "ingest-2", "", timeIn(0)),
addSnapshot("ns1", "overlay", "sn1", "", nil),
addSnapshot("ns1", "overlay", "sn2", "", nil),
addSnapshot("ns1", "overlay", "sn3", "", labelmap(string(labelGCRoot), "always")),
@ -155,6 +173,8 @@ func TestGCRemove(t *testing.T) {
gcnode(ResourceSnapshot, "ns2", "overlay/sn1"),
gcnode(ResourceLease, "ns1", "l1"),
gcnode(ResourceLease, "ns2", "l2"),
gcnode(ResourceIngest, "ns1", "ingest-1"),
gcnode(ResourceIngest, "ns2", "ingest-2"),
}
var deleted, remaining []gc.Node
@ -223,6 +243,8 @@ func TestGCRefs(t *testing.T) {
addContent("ns1", dgst(7), labelmap(string(labelGCContentRef)+"/anything-1", dgst(2).String(), string(labelGCContentRef)+"/anything-2", dgst(3).String())),
addContent("ns2", dgst(1), nil),
addContent("ns2", dgst(2), nil),
addIngest("ns1", "ingest-1", "", nil),
addIngest("ns2", "ingest-2", dgst(8), nil),
addSnapshot("ns1", "overlay", "sn1", "", nil),
addSnapshot("ns1", "overlay", "sn2", "sn1", nil),
addSnapshot("ns1", "overlay", "sn3", "sn2", nil),
@ -271,6 +293,10 @@ func TestGCRefs(t *testing.T) {
gcnode(ResourceSnapshot, "ns2", "overlay/sn2"): {
gcnode(ResourceSnapshot, "ns2", "overlay/sn1"),
},
gcnode(ResourceIngest, "ns1", "ingest-1"): nil,
gcnode(ResourceIngest, "ns2", "ingest-2"): {
gcnode(ResourceContent, "ns2", dgst(8).String()),
},
}
if err := db.Update(func(tx *bolt.Tx) error {
@ -441,6 +467,26 @@ func addContent(ns string, dgst digest.Digest, labels map[string]string) alterFu
}
}
func addIngest(ns, ref string, expected digest.Digest, expires *time.Time) alterFunc {
return func(bkt *bolt.Bucket) error {
cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectContent), string(bucketKeyObjectIngests), ref)
if err != nil {
return err
}
if expected != "" {
if err := cbkt.Put(bucketKeyExpected, []byte(expected)); err != nil {
return err
}
}
if expires != nil {
if err := writeExpireAt(*expires, cbkt); err != nil {
return err
}
}
return nil
}
}
func addLease(ns, lid string, labels map[string]string) alterFunc {
return func(bkt *bolt.Bucket) error {
lbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectLeases), lid)
@ -471,6 +517,16 @@ func addLeaseContent(ns, lid string, dgst digest.Digest) alterFunc {
}
}
func addLeaseIngest(ns, lid, ref string) alterFunc {
return func(bkt *bolt.Bucket) error {
cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectLeases), lid, string(bucketKeyObjectIngests))
if err != nil {
return err
}
return cbkt.Put([]byte(ref), nil)
}
}
func addContainer(ns, name, snapshotter, snapshot string, labels map[string]string) alterFunc {
return func(bkt *bolt.Bucket) error {
cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectContainers), name)
@ -517,3 +573,8 @@ func dgst(i int64) digest.Digest {
}
return dgstr.Digest()
}
func timeIn(d time.Duration) *time.Time {
t := time.Now().UTC().Add(d)
return &t
}

View File

@ -256,3 +256,51 @@ func removeContentLease(ctx context.Context, tx *bolt.Tx, dgst digest.Digest) er
return bkt.Delete([]byte(dgst.String()))
}
func addIngestLease(ctx context.Context, tx *bolt.Tx, ref string) (bool, error) {
lid, ok := leases.FromContext(ctx)
if !ok {
return false, nil
}
namespace, ok := namespaces.Namespace(ctx)
if !ok {
panic("namespace must already be required")
}
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid))
if bkt == nil {
return false, errors.Wrap(errdefs.ErrNotFound, "lease does not exist")
}
bkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectIngests)
if err != nil {
return false, err
}
if err := bkt.Put([]byte(ref), nil); err != nil {
return false, err
}
return true, nil
}
func removeIngestLease(ctx context.Context, tx *bolt.Tx, ref string) error {
lid, ok := leases.FromContext(ctx)
if !ok {
return nil
}
namespace, ok := namespaces.Namespace(ctx)
if !ok {
panic("namespace must already be checked")
}
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid), bucketKeyObjectIngests)
if bkt == nil {
// Key does not exist so we return nil
return nil
}
return bkt.Delete([]byte(ref))
}