diff --git a/cmd/dist/active.go b/cmd/dist/active.go new file mode 100644 index 000000000..596f1b464 --- /dev/null +++ b/cmd/dist/active.go @@ -0,0 +1,68 @@ +package main + +import ( + "fmt" + "os" + "path/filepath" + "text/tabwriter" + "time" + + "github.com/docker/containerd/content" + units "github.com/docker/go-units" + "github.com/urfave/cli" +) + +var activeCommand = cli.Command{ + Name: "active", + Usage: "display active transfers.", + ArgsUsage: "[flags] [, ...]", + Description: `Display the ongoing transfers.`, + Flags: []cli.Flag{ + cli.DurationFlag{ + Name: "timeout, t", + Usage: "total timeout for fetch", + EnvVar: "CONTAINERD_FETCH_TIMEOUT", + }, + cli.StringFlag{ + Name: "root", + Usage: "path to content store root", + Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content + }, + }, + Action: func(context *cli.Context) error { + var ( + // ctx = contextpkg.Background() + root = context.String("root") + ) + + if !filepath.IsAbs(root) { + var err error + root, err = filepath.Abs(root) + if err != nil { + return err + } + } + + cs, err := content.Open(root) + if err != nil { + return err + } + + active, err := cs.Active() + if err != nil { + return err + } + + tw := tabwriter.NewWriter(os.Stdout, 1, 8, 1, '\t', 0) + fmt.Fprintf(tw, "REF\tSIZE\tAGE\n") + for _, active := range active { + fmt.Fprintf(tw, "%s\t%s\t%s\n", + active.Ref, + units.HumanSize(float64(active.Size)), + units.HumanDuration(time.Since(active.ModTime))) + } + tw.Flush() + + return nil + }, +} diff --git a/cmd/dist/delete.go b/cmd/dist/delete.go new file mode 100644 index 000000000..9cce3b56b --- /dev/null +++ b/cmd/dist/delete.go @@ -0,0 +1,72 @@ +package main + +import ( + contextpkg "context" + "fmt" + "path/filepath" + + "github.com/docker/containerd/content" + "github.com/docker/containerd/log" + digest "github.com/opencontainers/go-digest" + "github.com/urfave/cli" +) + +var deleteCommand = cli.Command{ + Name: "delete", + Aliases: []string{"del"}, + Usage: "permanently delete one or more blobs.", + ArgsUsage: "[flags] [, ...]", + Description: `Delete one or more blobs permanently. Successfully deleted + blobs are printed to stdout.`, + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "root", + Usage: "path to content store root", + Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content + }, + }, + Action: func(context *cli.Context) error { + var ( + ctx = contextpkg.Background() + root = context.String("root") + args = []string(context.Args()) + exitError error + ) + + if !filepath.IsAbs(root) { + var err error + root, err = filepath.Abs(root) + if err != nil { + return err + } + } + + cs, err := content.Open(root) + if err != nil { + return err + } + + for _, arg := range args { + dgst, err := digest.Parse(arg) + if err != nil { + if exitError == nil { + exitError = err + } + log.G(ctx).WithError(err).Errorf("could not delete %v", dgst) + continue + } + + if err := cs.Delete(dgst); err != nil { + if exitError == nil { + exitError = err + } + log.G(ctx).WithError(err).Errorf("could not delete %v", dgst) + continue + } + + fmt.Println(dgst) + } + + return exitError + }, +} diff --git a/cmd/dist/ingest.go b/cmd/dist/ingest.go new file mode 100644 index 000000000..206c18631 --- /dev/null +++ b/cmd/dist/ingest.go @@ -0,0 +1,96 @@ +package main + +import ( + contextpkg "context" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/docker/containerd/content" + "github.com/opencontainers/go-digest" + "github.com/urfave/cli" +) + +var ingestCommand = cli.Command{ + Name: "ingest", + Usage: "accept content into the store", + ArgsUsage: "[flags] ", + Description: `Ingest objects into the local content store.`, + Flags: []cli.Flag{ + cli.DurationFlag{ + Name: "timeout", + Usage: "total timeout for fetch", + EnvVar: "CONTAINERD_FETCH_TIMEOUT", + }, + cli.StringFlag{ + Name: "path, p", + Usage: "path to content store", + Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content + EnvVar: "CONTAINERD_DIST_CONTENT_STORE", + }, + cli.Int64Flag{ + Name: "expected-size", + Usage: "validate against provided size", + }, + cli.StringFlag{ + Name: "expected-digest", + Usage: "verify content against expected digest", + }, + }, + Action: func(context *cli.Context) error { + var ( + ctx = contextpkg.Background() + timeout = context.Duration("timeout") + root = context.String("path") + ref = context.Args().First() + expectedSize = context.Int64("expected-size") + expectedDigest = digest.Digest(context.String("expected-digest")) + ) + + if timeout > 0 { + var cancel func() + ctx, cancel = contextpkg.WithTimeout(ctx, timeout) + defer cancel() + } + + if err := expectedDigest.Validate(); expectedDigest != "" && err != nil { + return err + } + + if !filepath.IsAbs(root) { + var err error + root, err = filepath.Abs(root) + if err != nil { + return err + } + } + + cs, err := content.Open(root) + if err != nil { + return err + } + + if expectedDigest != "" { + if ok, err := cs.Exists(expectedDigest); err != nil { + return err + } else if ok { + fmt.Fprintf(os.Stderr, "content with digest %v already exists\n", expectedDigest) + return nil + } + } + + if ref == "" { + if expectedDigest == "" { + return fmt.Errorf("must specify a transaction reference or expected digest") + } + + ref = strings.Replace(expectedDigest.String(), ":", "-", -1) + } + + // TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect + // all data to be written in a single invocation. Allow multiple writes + // to the same transaction key followed by a commit. + return content.WriteBlob(cs, os.Stdin, ref, expectedSize, expectedDigest) + }, +} diff --git a/cmd/dist/list.go b/cmd/dist/list.go new file mode 100644 index 000000000..11b589457 --- /dev/null +++ b/cmd/dist/list.go @@ -0,0 +1,85 @@ +package main + +import ( + contextpkg "context" + "fmt" + "os" + "path/filepath" + "text/tabwriter" + "time" + + "github.com/docker/containerd/content" + "github.com/docker/containerd/log" + units "github.com/docker/go-units" + digest "github.com/opencontainers/go-digest" + "github.com/urfave/cli" +) + +var listCommand = cli.Command{ + Name: "list", + Aliases: []string{"ls"}, + Usage: "list all blobs in the store.", + ArgsUsage: "[flags] [, ...]", + Description: `List blobs in the content store.`, + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "root", + Usage: "path to content store root", + Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content + }, + cli.BoolFlag{ + Name: "quiet, q", + Usage: "print only the blob digest", + }, + }, + Action: func(context *cli.Context) error { + var ( + ctx = contextpkg.Background() + root = context.String("root") + quiet = context.Bool("quiet") + args = []string(context.Args()) + ) + + if !filepath.IsAbs(root) { + var err error + root, err = filepath.Abs(root) + if err != nil { + return err + } + } + + cs, err := content.Open(root) + if err != nil { + return err + } + + if len(args) > 0 { + // TODO(stevvooe): Implement selection of a few blobs. Not sure + // what kind of efficiency gains we can actually get here. + log.G(ctx).Warnf("args ignored; need to implement matchers") + } + + var walkFn content.WalkFunc + if quiet { + walkFn = func(path string, fi os.FileInfo, dgst digest.Digest) error { + fmt.Println(dgst) + return nil + } + } else { + tw := tabwriter.NewWriter(os.Stdout, 1, 8, 1, '\t', 0) + defer tw.Flush() + + fmt.Fprintf(tw, "DIGEST\tSIZE\tAGE\n") + walkFn = func(path string, fi os.FileInfo, dgst digest.Digest) error { + fmt.Fprintf(tw, "%s\t%s\t%s\n", + dgst, + units.HumanSize(float64(fi.Size())), + units.HumanDuration(time.Since(fi.ModTime()))) + return nil + } + + } + + return cs.Walk(walkFn) + }, +} diff --git a/cmd/dist/main.go b/cmd/dist/main.go index 4a3616637..ece885eb8 100644 --- a/cmd/dist/main.go +++ b/cmd/dist/main.go @@ -30,6 +30,11 @@ distribution tool } app.Commands = []cli.Command{ fetchCommand, + ingestCommand, + activeCommand, + pathCommand, + deleteCommand, + listCommand, } app.Before = func(context *cli.Context) error { if context.GlobalBool("debug") { diff --git a/cmd/dist/path.go b/cmd/dist/path.go new file mode 100644 index 000000000..02215a8c9 --- /dev/null +++ b/cmd/dist/path.go @@ -0,0 +1,89 @@ +package main + +import ( + contextpkg "context" + "fmt" + "path/filepath" + + "github.com/docker/containerd/content" + "github.com/docker/containerd/log" + digest "github.com/opencontainers/go-digest" + "github.com/urfave/cli" +) + +var pathCommand = cli.Command{ + Name: "path", + Usage: "print the path to one or more blobs", + ArgsUsage: "[flags] [, ...]", + Description: `Display the paths to one or more blobs. + +Output paths can be used to directly access blobs on disk.`, + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "root", + Usage: "path to content store root", + Value: ".content", // TODO(stevvooe): for now, just use the PWD/.content + EnvVar: "CONTAINERD_DIST_CONTENT_STORE", + }, + cli.BoolFlag{ + Name: "quiet, q", + Usage: "elide digests in output", + }, + }, + Action: func(context *cli.Context) error { + var ( + ctx = contextpkg.Background() + root = context.String("root") + args = []string(context.Args()) + quiet = context.Bool("quiet") + exitError error + ) + + if !filepath.IsAbs(root) { + var err error + root, err = filepath.Abs(root) + if err != nil { + return err + } + } + + cs, err := content.Open(root) + if err != nil { + return err + } + + // TODO(stevvooe): Take the set of paths from stdin. + + if len(args) < 1 { + return fmt.Errorf("please specify a blob digest") + } + + for _, arg := range args { + dgst, err := digest.Parse(arg) + if err != nil { + log.G(ctx).WithError(err).Errorf("parsing %q as digest failed", arg) + if exitError == nil { + exitError = err + } + continue + } + + p, err := cs.GetPath(dgst) + if err != nil { + log.G(ctx).WithError(err).Errorf("getting path for %q failed", dgst) + if exitError == nil { + exitError = err + } + continue + } + + if !quiet { + fmt.Println(dgst, p) + } else { + fmt.Println(p) + } + } + + return exitError + }, +} diff --git a/content/content.go b/content/content.go index 1f2a2df31..e1d5e61b5 100644 --- a/content/content.go +++ b/content/content.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "sync" + "time" "github.com/docker/containerd/log" "github.com/nightlyone/lockfile" @@ -43,11 +44,58 @@ func Open(root string) (*Store, error) { } type Status struct { - Ref string - Size int64 - Meta interface{} + Ref string + Size int64 + ModTime time.Time + Meta interface{} } +func (cs *Store) Exists(dgst digest.Digest) (bool, error) { + if _, err := os.Stat(cs.blobPath(dgst)); err != nil { + if !os.IsNotExist(err) { + return false, err + } + + return false, nil + } + + return true, nil +} + +func (cs *Store) GetPath(dgst digest.Digest) (string, error) { + p := cs.blobPath(dgst) + if _, err := os.Stat(p); err != nil { + if os.IsNotExist(err) { + return "", ErrBlobNotFound + } + + return "", err + } + + return p, nil +} + +// Delete removes a blob by its digest. +// +// While this is safe to do concurrently, safe exist-removal logic must hold +// some global lock on the store. +func (cs *Store) Delete(dgst digest.Digest) error { + if err := os.RemoveAll(cs.blobPath(dgst)); err != nil { + if !os.IsNotExist(err) { + return err + } + + return nil + } + + return nil +} + +func (cs *Store) blobPath(dgst digest.Digest) string { + return filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) +} + +// Stat returns the current status of a blob by the ingest ref. func (cs *Store) Stat(ref string) (Status, error) { dp := filepath.Join(cs.ingestRoot(ref), "data") return cs.stat(dp) @@ -67,10 +115,10 @@ func (cs *Store) stat(ingestPath string) (Status, error) { } return Status{ - Ref: ref, - Size: dfi.Size(), + Ref: ref, + Size: dfi.Size(), + ModTime: dfi.ModTime(), }, nil - } func (cs *Store) Active() ([]Status, error) { @@ -114,7 +162,14 @@ func (cs *Store) Active() ([]Status, error) { // TODO(stevvooe): Allow querying the set of blobs in the blob store. -func (cs *Store) Walk(fn func(path string, dgst digest.Digest) error) error { +// WalkFunc defines the callback for a blob walk. +// +// TODO(stevvooe): Remove the file info. Just need size and modtime. Perhaps, +// not a huge deal, considering we have a path, but let's not just let this one +// go without scrunity. +type WalkFunc func(path string, fi os.FileInfo, dgst digest.Digest) error + +func (cs *Store) Walk(fn WalkFunc) error { root := filepath.Join(cs.root, "blobs") var alg digest.Algorithm return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error { @@ -148,23 +203,10 @@ func (cs *Store) Walk(fn func(path string, dgst digest.Digest) error) error { // store or extra paths not expected previously. } - return fn(path, dgst) + return fn(path, fi, dgst) }) } -func (cs *Store) GetPath(dgst digest.Digest) (string, error) { - p := filepath.Join(cs.root, "blobs", dgst.Algorithm().String(), dgst.Hex()) - if _, err := os.Stat(p); err != nil { - if os.IsNotExist(err) { - return "", ErrBlobNotFound - } - - return "", err - } - - return p, nil -} - // Begin starts a new write transaction against the blob store. // // The argument `ref` is used to identify the transaction. It must be a valid @@ -267,6 +309,20 @@ func (cs *Store) Resume(ref string) (*Writer, error) { }, nil } +// Remove an active transaction keyed by ref. +func (cs *Store) Remove(ref string) error { + root := cs.ingestRoot(ref) + if err := os.RemoveAll(root); err != nil { + if os.IsNotExist(err) { + return nil + } + + return err + } + + return nil +} + func (cs *Store) ingestRoot(ref string) string { dgst := digest.FromString(ref) return filepath.Join(cs.root, "ingest", dgst.Hex()) diff --git a/content/content_test.go b/content/content_test.go index 50bfff9a2..c086874e3 100644 --- a/content/content_test.go +++ b/content/content_test.go @@ -14,6 +14,7 @@ import ( "reflect" "runtime" "testing" + "time" "github.com/opencontainers/go-digest" ) @@ -59,6 +60,12 @@ func TestContentWriter(t *testing.T) { t.Fatal(err) } + // clear out the time and meta cause we don't care for this test + for i := range ingestions { + ingestions[i].Meta = nil + ingestions[i].ModTime = time.Time{} + } + if !reflect.DeepEqual(ingestions, []Status{ { Ref: "myref", @@ -129,7 +136,7 @@ func TestWalkBlobs(t *testing.T) { expected[dgst] = struct{}{} } - if err := cs.Walk(func(path string, dgst digest.Digest) error { + if err := cs.Walk(func(path string, fi os.FileInfo, dgst digest.Digest) error { found[dgst] = struct{}{} if checked := checkBlobPath(t, cs, dgst); checked != path { t.Fatalf("blob path did not match: %v != %v", path, checked) @@ -266,7 +273,7 @@ func checkBlobPath(t *testing.T, cs *Store, dgst digest.Digest) string { } func checkWrite(t checker, cs *Store, dgst digest.Digest, p []byte) digest.Digest { - if err := WriteBlob(cs, bytes.NewReader(p), int64(len(p)), dgst); err != nil { + if err := WriteBlob(cs, bytes.NewReader(p), dgst.String(), int64(len(p)), dgst); err != nil { t.Fatal(err) } diff --git a/content/helpers.go b/content/helpers.go index 111dc55a0..37cb71e22 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -39,8 +39,8 @@ type Ingester interface { // This is useful when the digest and size are known beforehand. // // Copy is buffered, so no need to wrap reader in buffered io. -func WriteBlob(cs Ingester, r io.Reader, size int64, expected digest.Digest) error { - cw, err := cs.Begin(expected.Hex()) +func WriteBlob(cs Ingester, r io.Reader, ref string, size int64, expected digest.Digest) error { + cw, err := cs.Begin(ref) if err != nil { return err } @@ -52,7 +52,7 @@ func WriteBlob(cs Ingester, r io.Reader, size int64, expected digest.Digest) err return err } - if nn != size { + if size > 0 && nn != size { return errors.Errorf("failed size verification: %v != %v", nn, size) } diff --git a/content/writer.go b/content/writer.go index 231f68aa5..a2ff020f6 100644 --- a/content/writer.go +++ b/content/writer.go @@ -25,6 +25,21 @@ func (cw *Writer) Ref() string { return cw.ref } +// Size returns the current size written. +// +// Cannot be called concurrently with `Write`. If you need need concurrent +// status, query it with `Store.Stat`. +func (cw *Writer) Size() int64 { + return cw.offset +} + +// Digest returns the current digest of the content, up to the current write. +// +// Cannot be called concurrently with `Write`. +func (cw *Writer) Digest() digest.Digest { + return cw.digester.Digest() +} + // Write p to the transaction. // // Note that writes are unbuffered to the backing file. When writing, it is @@ -32,6 +47,7 @@ func (cw *Writer) Ref() string { func (cw *Writer) Write(p []byte) (n int, err error) { n, err = cw.fp.Write(p) cw.digester.Hash().Write(p[:n]) + cw.offset += int64(len(p)) return n, err } @@ -54,7 +70,7 @@ func (cw *Writer) Commit(size int64, expected digest.Digest) error { return errors.Wrap(err, "failed to change ingest file permissions") } - if size != fi.Size() { + if size > 0 && size != fi.Size() { return errors.Errorf("failed size validation: %v != %v", fi.Size(), size) } @@ -63,24 +79,23 @@ func (cw *Writer) Commit(size int64, expected digest.Digest) error { } dgst := cw.digester.Digest() - // TODO(stevvooe): Correctly handle missing expected digest or allow no - // expected digest at commit time. if expected != "" && expected != dgst { return errors.Errorf("unexpected digest: %v != %v", dgst, expected) } - apath := filepath.Join(cw.cs.root, "blobs", dgst.Algorithm().String()) - if err := os.MkdirAll(apath, 0755); err != nil { + var ( + ingest = filepath.Join(cw.path, "data") + target = cw.cs.blobPath(dgst) + ) + + // make sure parent directories of blob exist + if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil { return err } - var ( - ingest = filepath.Join(cw.path, "data") - target = filepath.Join(apath, dgst.Hex()) - ) - // clean up!! defer os.RemoveAll(cw.path) + if err := os.Rename(ingest, target); err != nil { if os.IsExist(err) { // collision with the target file! @@ -100,6 +115,9 @@ func (cw *Writer) Commit(size int64, expected digest.Digest) error { // If one needs to resume the transaction, a new writer can be obtained from // `ContentStore.Resume` using the same key. The write can then be continued // from it was left off. +// +// To abandon a transaction completely, first call close then `Store.Remove` to +// clean up the associated resources. func (cw *Writer) Close() (err error) { if err := unlock(cw.lock); err != nil { log.Printf("unlock failed: %v", err)