diff --git a/client.go b/client.go index 7d3c4ccff..d7a2f917a 100644 --- a/client.go +++ b/client.go @@ -2,7 +2,6 @@ package containerd import ( "context" - "encoding/json" "io/ioutil" "log" "net/http" @@ -21,7 +20,6 @@ import ( "github.com/containerd/containerd/images" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" - "github.com/containerd/containerd/rootfs" contentservice "github.com/containerd/containerd/services/content" "github.com/containerd/containerd/services/diff" diffservice "github.com/containerd/containerd/services/diff" @@ -29,7 +27,6 @@ import ( snapshotservice "github.com/containerd/containerd/services/snapshot" "github.com/containerd/containerd/snapshot" "github.com/opencontainers/image-spec/identity" - "github.com/opencontainers/image-spec/specs-go/v1" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "google.golang.org/grpc" @@ -205,10 +202,10 @@ type RemoteContext struct { // If no resolver is provided, defaults to Docker registry resolver. Resolver remotes.Resolver - // Unpacker is used after an image is pulled to extract into a registry. + // Unpack is done after an image is pulled to extract into a snapshotter. // If an image is not unpacked on pull, it can be unpacked any time // afterwards. Unpacking is required to run an image. - Unpacker Unpacker + Unpack bool // PushWrapper allows hooking into the push method. This can be used // track content that is being sent to the remote. @@ -232,11 +229,7 @@ func defaultRemoteContext() *RemoteContext { // uses the snapshotter, content store, and diff service // configured for the client. func WithPullUnpack(client *Client, c *RemoteContext) error { - c.Unpacker = &snapshotUnpacker{ - store: client.ContentStore(), - diff: client.DiffService(), - snapshotter: client.SnapshotService(), - } + c.Unpack = true return nil } @@ -265,55 +258,6 @@ func WithPushWrapper(w func(remotes.Pusher) remotes.Pusher) RemoteOpts { } } -type Unpacker interface { - Unpack(context.Context, images.Image) error -} - -type snapshotUnpacker struct { - snapshotter snapshot.Snapshotter - store content.Store - diff diff.DiffService -} - -func (s *snapshotUnpacker) Unpack(ctx context.Context, image images.Image) error { - layers, err := s.getLayers(ctx, image) - if err != nil { - return err - } - if _, err := rootfs.ApplyLayers(ctx, layers, s.snapshotter, s.diff); err != nil { - return err - } - return nil -} - -func (s *snapshotUnpacker) getLayers(ctx context.Context, image images.Image) ([]rootfs.Layer, error) { - p, err := content.ReadBlob(ctx, s.store, image.Target.Digest) - if err != nil { - return nil, errors.Wrapf(err, "failed to read manifest blob") - } - var manifest v1.Manifest - if err := json.Unmarshal(p, &manifest); err != nil { - return nil, errors.Wrap(err, "failed to unmarshal manifest") - } - diffIDs, err := image.RootFS(ctx, s.store) - if err != nil { - return nil, errors.Wrap(err, "failed to resolve rootfs") - } - if len(diffIDs) != len(manifest.Layers) { - return nil, errors.Errorf("mismatched image rootfs and manifest layers") - } - layers := make([]rootfs.Layer, len(diffIDs)) - for i := range diffIDs { - layers[i].Diff = v1.Descriptor{ - // TODO: derive media type from compressed type - MediaType: v1.MediaTypeImageLayer, - Digest: diffIDs[i], - } - layers[i].Blob = manifest.Layers[i] - } - return layers, nil -} - func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpts) (Image, error) { pullCtx := defaultRemoteContext() for _, o := range opts { @@ -347,27 +291,16 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpts) (Imag if err != nil { return nil, err } - if pullCtx.Unpacker != nil { - if err := pullCtx.Unpacker.Unpack(ctx, i); err != nil { + img := &image{ + client: c, + i: i, + } + if pullCtx.Unpack { + if err := img.Unpack(ctx); err != nil { return nil, err } } - return &image{ - client: c, - i: i, - }, nil -} - -// GetImage returns an existing image -func (c *Client) GetImage(ctx context.Context, ref string) (Image, error) { - i, err := c.ImageService().Get(ctx, ref) - if err != nil { - return nil, err - } - return &image{ - client: c, - i: i, - }, nil + return img, nil } func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, opts ...RemoteOpts) error { @@ -426,6 +359,34 @@ func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, return nil } +// GetImage returns an existing image +func (c *Client) GetImage(ctx context.Context, ref string) (Image, error) { + i, err := c.ImageService().Get(ctx, ref) + if err != nil { + return nil, err + } + return &image{ + client: c, + i: i, + }, nil +} + +// ListImages returns all existing images +func (c *Client) ListImages(ctx context.Context) ([]Image, error) { + imgs, err := c.ImageService().List(ctx) + if err != nil { + return nil, err + } + images := make([]Image, len(imgs)) + for i, img := range imgs { + images[i] = &image{ + client: c, + i: img, + } + } + return images, nil +} + // Close closes the clients connection to containerd func (c *Client) Close() error { return c.conn.Close() diff --git a/cmd/dist/fetch.go b/cmd/dist/fetch.go index 507f760e1..2d2abaa05 100644 --- a/cmd/dist/fetch.go +++ b/cmd/dist/fetch.go @@ -9,15 +9,15 @@ import ( "text/tabwriter" "time" - contentapi "github.com/containerd/containerd/api/services/content" + "github.com/containerd/containerd" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" "github.com/containerd/containerd/progress" "github.com/containerd/containerd/remotes" - contentservice "github.com/containerd/containerd/services/content" + digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/urfave/cli" - "golang.org/x/sync/errgroup" ) var fetchCommand = cli.Command{ @@ -47,174 +47,210 @@ Most of this is experimental and there are few leaps to make this work.`, ctx, cancel := appContext(clicontext) defer cancel() - conn, err := connectGRPC(clicontext) - if err != nil { - return err - } - - resolver, err := getResolver(ctx, clicontext) - if err != nil { - return err - } - - ongoing := newJobs() - - content := contentservice.NewStoreFromClient(contentapi.NewContentClient(conn)) - - // TODO(stevvooe): Need to replace this with content store client. - cs, err := resolveContentStore(clicontext) - if err != nil { - return err - } - - eg, ctx := errgroup.WithContext(ctx) - - resolved := make(chan struct{}) - eg.Go(func() error { - ongoing.add(ref) - name, desc, err := resolver.Resolve(ctx, ref) - if err != nil { - return err - } - fetcher, err := resolver.Fetcher(ctx, name) - if err != nil { - return err - } - log.G(ctx).WithField("image", name).Debug("fetching") - close(resolved) - - return images.Dispatch(ctx, - images.Handlers(images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { - ongoing.add(remotes.MakeRefKey(ctx, desc)) - return nil, nil - }), - remotes.FetchHandler(content, fetcher), - images.ChildrenHandler(content), - ), - desc) - }) - - errs := make(chan error) - go func() { - defer close(errs) - errs <- eg.Wait() - }() - - ticker := time.NewTicker(100 * time.Millisecond) - fw := progress.NewWriter(os.Stdout) - start := time.Now() - defer ticker.Stop() - var done bool - - for { - select { - case <-ticker.C: - fw.Flush() - - tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0) - - statuses := map[string]statusInfo{} - - activeSeen := map[string]struct{}{} - if !done { - active, err := cs.Status(ctx, "") - if err != nil { - log.G(ctx).WithError(err).Error("active check failed") - continue - } - // update status of active entries! - for _, active := range active { - statuses[active.Ref] = statusInfo{ - Ref: active.Ref, - Status: "downloading", - Offset: active.Offset, - Total: active.Total, - StartedAt: active.StartedAt, - UpdatedAt: active.UpdatedAt, - } - activeSeen[active.Ref] = struct{}{} - } - } - - js := ongoing.jobs() - // now, update the items in jobs that are not in active - for _, j := range js { - if _, ok := activeSeen[j]; ok { - continue - } - status := "done" - - if j == ref { - select { - case <-resolved: - status = "resolved" - default: - status = "resolving" - } - } - - statuses[j] = statusInfo{ - Ref: j, - Status: status, // for now! - } - } - - var ordered []statusInfo - for _, j := range js { - ordered = append(ordered, statuses[j]) - } - - display(tw, ordered, start) - tw.Flush() - - if done { - fw.Flush() - return nil - } - case err := <-errs: - if err != nil { - return err - } - done = true - case <-ctx.Done(): - done = true // allow ui to update once more - } - } + _, err := fetch(ctx, ref, clicontext) + return err }, } +func fetch(ctx context.Context, ref string, clicontext *cli.Context) (containerd.Image, error) { + client, err := getClient(clicontext) + if err != nil { + return nil, err + } + + resolver, err := getResolver(ctx, clicontext) + if err != nil { + return nil, err + } + + ongoing := newJobs(ref) + + pctx, stopProgress := context.WithCancel(ctx) + progress := make(chan struct{}) + + go func() { + showProgress(pctx, ongoing, client.ContentStore(), os.Stdout) + close(progress) + }() + + h := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + ongoing.add(desc) + return nil, nil + }) + + log.G(pctx).WithField("image", ref).Debug("fetching") + + img, err := client.Pull(pctx, ref, containerd.WithResolver(resolver), containerd.WithImageHandler(h)) + stopProgress() + if err != nil { + return nil, err + } + + <-progress + return img, nil +} + +func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, out io.Writer) { + var ( + ticker = time.NewTicker(100 * time.Millisecond) + fw = progress.NewWriter(out) + start = time.Now() + statuses = map[string]statusInfo{} + done bool + ) + defer ticker.Stop() + +outer: + for { + select { + case <-ticker.C: + fw.Flush() + + tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0) + + resolved := "resolved" + if !ongoing.isResolved() { + resolved = "resolving" + } + statuses[ongoing.name] = statusInfo{ + Ref: ongoing.name, + Status: resolved, + } + keys := []string{ongoing.name} + + activeSeen := map[string]struct{}{} + if !done { + active, err := cs.Status(ctx, "") + if err != nil { + log.G(ctx).WithError(err).Error("active check failed") + continue + } + // update status of active entries! + for _, active := range active { + statuses[active.Ref] = statusInfo{ + Ref: active.Ref, + Status: "downloading", + Offset: active.Offset, + Total: active.Total, + StartedAt: active.StartedAt, + UpdatedAt: active.UpdatedAt, + } + activeSeen[active.Ref] = struct{}{} + } + } + + // now, update the items in jobs that are not in active + for _, j := range ongoing.jobs() { + key := remotes.MakeRefKey(ctx, j) + keys = append(keys, key) + if _, ok := activeSeen[key]; ok { + continue + } + + status, ok := statuses[key] + if !done && (!ok || status.Status == "downloading") { + info, err := cs.Info(ctx, j.Digest) + if err != nil { + if !content.IsNotFound(err) { + log.G(ctx).WithError(err).Errorf("failed to get content info") + continue outer + } else { + statuses[key] = statusInfo{ + Ref: key, + Status: "waiting", + } + } + } else if info.CommittedAt.After(start) { + statuses[key] = statusInfo{ + Ref: key, + Status: "done", + Offset: info.Size, + Total: info.Size, + UpdatedAt: info.CommittedAt, + } + } else { + statuses[key] = statusInfo{ + Ref: key, + Status: "exists", + } + } + } else if done { + if ok { + if status.Status != "done" && status.Status != "exists" { + status.Status = "done" + statuses[key] = status + } + } else { + statuses[key] = statusInfo{ + Ref: key, + Status: "done", + } + } + } + } + + var ordered []statusInfo + for _, key := range keys { + ordered = append(ordered, statuses[key]) + } + + display(tw, ordered, start) + tw.Flush() + + if done { + fw.Flush() + return + } + case <-ctx.Done(): + done = true // allow ui to update once more + } + } +} + // jobs provides a way of identifying the download keys for a particular task // encountering during the pull walk. // // This is very minimal and will probably be replaced with something more // featured. type jobs struct { - added map[string]struct{} - refs []string - mu sync.Mutex + name string + added map[digest.Digest]struct{} + descs []ocispec.Descriptor + mu sync.Mutex + resolved bool } -func newJobs() *jobs { - return &jobs{added: make(map[string]struct{})} +func newJobs(name string) *jobs { + return &jobs{ + name: name, + added: map[digest.Digest]struct{}{}, + } } -func (j *jobs) add(ref string) { +func (j *jobs) add(desc ocispec.Descriptor) { j.mu.Lock() defer j.mu.Unlock() + j.resolved = true - if _, ok := j.added[ref]; ok { + if _, ok := j.added[desc.Digest]; ok { return } - j.refs = append(j.refs, ref) - j.added[ref] = struct{}{} + j.descs = append(j.descs, desc) + j.added[desc.Digest] = struct{}{} } -func (j *jobs) jobs() []string { +func (j *jobs) jobs() []ocispec.Descriptor { j.mu.Lock() defer j.mu.Unlock() - var jobs []string - return append(jobs, j.refs...) + var descs []ocispec.Descriptor + return append(descs, j.descs...) +} + +func (j *jobs) isResolved() bool { + j.mu.Lock() + defer j.mu.Unlock() + return j.resolved } type statusInfo struct { diff --git a/cmd/dist/pull.go b/cmd/dist/pull.go index 948b873f4..3f106730e 100644 --- a/cmd/dist/pull.go +++ b/cmd/dist/pull.go @@ -1,23 +1,10 @@ package main import ( - "context" - "os" - "text/tabwriter" - "time" + "fmt" - diffapi "github.com/containerd/containerd/api/services/diff" - snapshotapi "github.com/containerd/containerd/api/services/snapshot" - "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" - "github.com/containerd/containerd/progress" - "github.com/containerd/containerd/remotes" - "github.com/containerd/containerd/rootfs" - diffservice "github.com/containerd/containerd/services/diff" - snapshotservice "github.com/containerd/containerd/services/snapshot" - ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/urfave/cli" - "golang.org/x/sync/errgroup" ) var pullCommand = cli.Command{ @@ -42,175 +29,17 @@ command. As part of this process, we do the following: ctx, cancel := appContext(clicontext) defer cancel() - cs, err := resolveContentStore(clicontext) + img, err := fetch(ctx, ref, clicontext) if err != nil { return err } - imageStore, err := resolveImageStore(clicontext) - if err != nil { - return err - } - - resolver, err := getResolver(ctx, clicontext) - if err != nil { - return err - } - ongoing := newJobs() - - eg, ctx := errgroup.WithContext(ctx) - - var resolvedImageName string - resolved := make(chan struct{}) - eg.Go(func() error { - ongoing.add(ref) - name, desc, err := resolver.Resolve(ctx, ref) - if err != nil { - log.G(ctx).WithError(err).Error("failed to resolve") - return err - } - fetcher, err := resolver.Fetcher(ctx, name) - if err != nil { - return err - } - - log.G(ctx).WithField("image", name).Debug("fetching") - resolvedImageName = name - close(resolved) - - eg.Go(func() error { - return imageStore.Put(ctx, name, desc) - }) - - return images.Dispatch(ctx, - images.Handlers(images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { - ongoing.add(remotes.MakeRefKey(ctx, desc)) - return nil, nil - }), - remotes.FetchHandler(cs, fetcher), - images.ChildrenHandler(cs)), - desc) - - }) - - errs := make(chan error) - go func() { - defer close(errs) - errs <- eg.Wait() - }() - - defer func() { - // we need new ctx here, since we run on return. - ctx, cancel := appContext(clicontext) - defer cancel() - image, err := imageStore.Get(ctx, resolvedImageName) - if err != nil { - log.G(ctx).WithError(err).Fatal("failed to get image") - } - - layers, err := getImageLayers(ctx, image, cs) - if err != nil { - log.G(ctx).WithError(err).Fatal("failed to get rootfs layers") - } - - conn, err := connectGRPC(clicontext) - if err != nil { - log.G(ctx).Fatal(err) - } - snapshotter := snapshotservice.NewSnapshotterFromClient(snapshotapi.NewSnapshotClient(conn)) - applier := diffservice.NewDiffServiceFromClient(diffapi.NewDiffClient(conn)) - - log.G(ctx).Info("unpacking rootfs") - - chainID, err := rootfs.ApplyLayers(ctx, layers, snapshotter, applier) - if err != nil { - log.G(ctx).Fatal(err) - } - - log.G(ctx).Infof("Unpacked chain id: %s", chainID) - }() - - var ( - ticker = time.NewTicker(100 * time.Millisecond) - fw = progress.NewWriter(os.Stdout) - start = time.Now() - done bool - ) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - fw.Flush() - - tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0) - js := ongoing.jobs() - statuses := map[string]statusInfo{} - - activeSeen := map[string]struct{}{} - if !done { - active, err := cs.Status(ctx, "") - if err != nil { - log.G(ctx).WithError(err).Error("active check failed") - continue - } - // update status of active entries! - for _, active := range active { - statuses[active.Ref] = statusInfo{ - Ref: active.Ref, - Status: "downloading", - Offset: active.Offset, - Total: active.Total, - StartedAt: active.StartedAt, - UpdatedAt: active.UpdatedAt, - } - activeSeen[active.Ref] = struct{}{} - } - } - - // now, update the items in jobs that are not in active - for _, j := range js { - if _, ok := activeSeen[j]; ok { - continue - } - status := "done" - - if j == ref { - select { - case <-resolved: - status = "resolved" - default: - status = "resolving" - } - } - - statuses[j] = statusInfo{ - Ref: j, - Status: status, // for now! - } - } - - var ordered []statusInfo - for _, j := range js { - ordered = append(ordered, statuses[j]) - } - - display(tw, ordered, start) - tw.Flush() - - if done { - fw.Flush() - return nil - } - case err := <-errs: - if err != nil { - return err - } - done = true - case <-ctx.Done(): - done = true // allow ui to update once more - } - } + log.G(ctx).WithField("image", ref).Debug("unpacking") + // TODO: Show unpack status + fmt.Printf("unpacking %s...", img.Target().Digest) + err = img.Unpack(ctx) + fmt.Println("done") + return err }, } diff --git a/cmd/dist/rootfs.go b/cmd/dist/rootfs.go index ea496f25d..134541c06 100644 --- a/cmd/dist/rootfs.go +++ b/cmd/dist/rootfs.go @@ -1,19 +1,15 @@ package main import ( + "errors" "fmt" "os" "strings" - diffapi "github.com/containerd/containerd/api/services/diff" snapshotapi "github.com/containerd/containerd/api/services/snapshot" - "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" - "github.com/containerd/containerd/rootfs" - diffservice "github.com/containerd/containerd/services/diff" snapshotservice "github.com/containerd/containerd/services/snapshot" digest "github.com/opencontainers/go-digest" - ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/urfave/cli" ) @@ -40,39 +36,39 @@ var rootfsUnpackCommand = cli.Command{ return err } - log.G(ctx).Infof("unpacking layers from manifest %s", dgst.String()) + log.G(ctx).Debugf("unpacking layers from manifest %s", dgst.String()) - cs, err := resolveContentStore(clicontext) + client, err := getClient(clicontext) if err != nil { return err } - image := images.Image{ - Target: ocispec.Descriptor{ - MediaType: ocispec.MediaTypeImageManifest, - Digest: dgst, - }, - } + // TODO: Support unpack by name - layers, err := getImageLayers(ctx, image, cs) - if err != nil { - log.G(ctx).WithError(err).Fatal("Failed to get rootfs layers") - } - - conn, err := connectGRPC(clicontext) - if err != nil { - log.G(ctx).Fatal(err) - } - - snapshotter := snapshotservice.NewSnapshotterFromClient(snapshotapi.NewSnapshotClient(conn)) - applier := diffservice.NewDiffServiceFromClient(diffapi.NewDiffClient(conn)) - - chainID, err := rootfs.ApplyLayers(ctx, layers, snapshotter, applier) + images, err := client.ListImages(ctx) if err != nil { return err } - log.G(ctx).Infof("chain ID: %s", chainID.String()) + var unpacked bool + for _, image := range images { + if image.Target().Digest == dgst { + fmt.Printf("unpacking %s (%s)...", dgst, image.Target().MediaType) + if err := image.Unpack(ctx); err != nil { + fmt.Println() + return err + } + fmt.Println("done") + unpacked = true + break + } + } + if !unpacked { + return errors.New("manifest not found") + } + + // TODO: Get rootfs from Image + //log.G(ctx).Infof("chain ID: %s", chainID.String()) return nil }, diff --git a/image.go b/image.go index 972778daf..ee703432c 100644 --- a/image.go +++ b/image.go @@ -1,9 +1,22 @@ package containerd -import "github.com/containerd/containerd/images" +import ( + "context" + "encoding/json" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/rootfs" + "github.com/opencontainers/image-spec/specs-go/v1" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" +) type Image interface { Name() string + Target() ocispec.Descriptor + + Unpack(context.Context) error } var _ = (Image)(&image{}) @@ -17,3 +30,47 @@ type image struct { func (i *image) Name() string { return i.i.Name } + +func (i *image) Target() ocispec.Descriptor { + return i.i.Target +} + +func (i *image) Unpack(ctx context.Context) error { + layers, err := i.getLayers(ctx) + if err != nil { + return err + } + if _, err := rootfs.ApplyLayers(ctx, layers, i.client.SnapshotService(), i.client.DiffService()); err != nil { + return err + } + return nil +} + +func (i *image) getLayers(ctx context.Context) ([]rootfs.Layer, error) { + cs := i.client.ContentStore() + p, err := content.ReadBlob(ctx, cs, i.i.Target.Digest) + if err != nil { + return nil, errors.Wrapf(err, "failed to read manifest blob") + } + var manifest v1.Manifest + if err := json.Unmarshal(p, &manifest); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal manifest") + } + diffIDs, err := i.i.RootFS(ctx, cs) + if err != nil { + return nil, errors.Wrap(err, "failed to resolve rootfs") + } + if len(diffIDs) != len(manifest.Layers) { + return nil, errors.Errorf("mismatched image rootfs and manifest layers") + } + layers := make([]rootfs.Layer, len(diffIDs)) + for i := range diffIDs { + layers[i].Diff = v1.Descriptor{ + // TODO: derive media type from compressed type + MediaType: v1.MediaTypeImageLayer, + Digest: diffIDs[i], + } + layers[i].Blob = manifest.Layers[i] + } + return layers, nil +}