Merge pull request #946 from dmcgowan/client-dist-pull

Update dist pull to use client
This commit is contained in:
Michael Crosby 2017-06-06 15:25:22 -07:00 committed by GitHub
commit 95efd45db0
5 changed files with 312 additions and 433 deletions

115
client.go
View File

@ -2,7 +2,6 @@ package containerd
import (
"context"
"encoding/json"
"io/ioutil"
"log"
"net/http"
@ -22,7 +21,6 @@ import (
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/rootfs"
contentservice "github.com/containerd/containerd/services/content"
"github.com/containerd/containerd/services/diff"
diffservice "github.com/containerd/containerd/services/diff"
@ -31,7 +29,6 @@ import (
"github.com/containerd/containerd/snapshot"
pempty "github.com/golang/protobuf/ptypes/empty"
"github.com/opencontainers/image-spec/identity"
"github.com/opencontainers/image-spec/specs-go/v1"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"google.golang.org/grpc"
@ -207,10 +204,10 @@ type RemoteContext struct {
// If no resolver is provided, defaults to Docker registry resolver.
Resolver remotes.Resolver
// Unpacker is used after an image is pulled to extract into a registry.
// Unpack is done after an image is pulled to extract into a snapshotter.
// If an image is not unpacked on pull, it can be unpacked any time
// afterwards. Unpacking is required to run an image.
Unpacker Unpacker
Unpack bool
// PushWrapper allows hooking into the push method. This can be used
// track content that is being sent to the remote.
@ -234,11 +231,7 @@ func defaultRemoteContext() *RemoteContext {
// uses the snapshotter, content store, and diff service
// configured for the client.
func WithPullUnpack(client *Client, c *RemoteContext) error {
c.Unpacker = &snapshotUnpacker{
store: client.ContentStore(),
diff: client.DiffService(),
snapshotter: client.SnapshotService(),
}
c.Unpack = true
return nil
}
@ -267,55 +260,6 @@ func WithPushWrapper(w func(remotes.Pusher) remotes.Pusher) RemoteOpts {
}
}
type Unpacker interface {
Unpack(context.Context, images.Image) error
}
type snapshotUnpacker struct {
snapshotter snapshot.Snapshotter
store content.Store
diff diff.DiffService
}
func (s *snapshotUnpacker) Unpack(ctx context.Context, image images.Image) error {
layers, err := s.getLayers(ctx, image)
if err != nil {
return err
}
if _, err := rootfs.ApplyLayers(ctx, layers, s.snapshotter, s.diff); err != nil {
return err
}
return nil
}
func (s *snapshotUnpacker) getLayers(ctx context.Context, image images.Image) ([]rootfs.Layer, error) {
p, err := content.ReadBlob(ctx, s.store, image.Target.Digest)
if err != nil {
return nil, errors.Wrapf(err, "failed to read manifest blob")
}
var manifest v1.Manifest
if err := json.Unmarshal(p, &manifest); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal manifest")
}
diffIDs, err := image.RootFS(ctx, s.store)
if err != nil {
return nil, errors.Wrap(err, "failed to resolve rootfs")
}
if len(diffIDs) != len(manifest.Layers) {
return nil, errors.Errorf("mismatched image rootfs and manifest layers")
}
layers := make([]rootfs.Layer, len(diffIDs))
for i := range diffIDs {
layers[i].Diff = v1.Descriptor{
// TODO: derive media type from compressed type
MediaType: v1.MediaTypeImageLayer,
Digest: diffIDs[i],
}
layers[i].Blob = manifest.Layers[i]
}
return layers, nil
}
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpts) (Image, error) {
pullCtx := defaultRemoteContext()
for _, o := range opts {
@ -349,27 +293,16 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpts) (Imag
if err != nil {
return nil, err
}
if pullCtx.Unpacker != nil {
if err := pullCtx.Unpacker.Unpack(ctx, i); err != nil {
img := &image{
client: c,
i: i,
}
if pullCtx.Unpack {
if err := img.Unpack(ctx); err != nil {
return nil, err
}
}
return &image{
client: c,
i: i,
}, nil
}
// GetImage returns an existing image
func (c *Client) GetImage(ctx context.Context, ref string) (Image, error) {
i, err := c.ImageService().Get(ctx, ref)
if err != nil {
return nil, err
}
return &image{
client: c,
i: i,
}, nil
return img, nil
}
func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, opts ...RemoteOpts) error {
@ -428,6 +361,34 @@ func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor,
return nil
}
// GetImage returns an existing image
func (c *Client) GetImage(ctx context.Context, ref string) (Image, error) {
i, err := c.ImageService().Get(ctx, ref)
if err != nil {
return nil, err
}
return &image{
client: c,
i: i,
}, nil
}
// ListImages returns all existing images
func (c *Client) ListImages(ctx context.Context) ([]Image, error) {
imgs, err := c.ImageService().List(ctx)
if err != nil {
return nil, err
}
images := make([]Image, len(imgs))
for i, img := range imgs {
images[i] = &image{
client: c,
i: img,
}
}
return images, nil
}
// Close closes the clients connection to containerd
func (c *Client) Close() error {
return c.conn.Close()

332
cmd/dist/fetch.go vendored
View File

@ -9,15 +9,15 @@ import (
"text/tabwriter"
"time"
contentapi "github.com/containerd/containerd/api/services/content"
"github.com/containerd/containerd"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/progress"
"github.com/containerd/containerd/remotes"
contentservice "github.com/containerd/containerd/services/content"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/urfave/cli"
"golang.org/x/sync/errgroup"
)
var fetchCommand = cli.Command{
@ -47,174 +47,210 @@ Most of this is experimental and there are few leaps to make this work.`,
ctx, cancel := appContext(clicontext)
defer cancel()
conn, err := connectGRPC(clicontext)
if err != nil {
return err
}
resolver, err := getResolver(ctx, clicontext)
if err != nil {
return err
}
ongoing := newJobs()
content := contentservice.NewStoreFromClient(contentapi.NewContentClient(conn))
// TODO(stevvooe): Need to replace this with content store client.
cs, err := resolveContentStore(clicontext)
if err != nil {
return err
}
eg, ctx := errgroup.WithContext(ctx)
resolved := make(chan struct{})
eg.Go(func() error {
ongoing.add(ref)
name, desc, err := resolver.Resolve(ctx, ref)
if err != nil {
return err
}
fetcher, err := resolver.Fetcher(ctx, name)
if err != nil {
return err
}
log.G(ctx).WithField("image", name).Debug("fetching")
close(resolved)
return images.Dispatch(ctx,
images.Handlers(images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ongoing.add(remotes.MakeRefKey(ctx, desc))
return nil, nil
}),
remotes.FetchHandler(content, fetcher),
images.ChildrenHandler(content),
),
desc)
})
errs := make(chan error)
go func() {
defer close(errs)
errs <- eg.Wait()
}()
ticker := time.NewTicker(100 * time.Millisecond)
fw := progress.NewWriter(os.Stdout)
start := time.Now()
defer ticker.Stop()
var done bool
for {
select {
case <-ticker.C:
fw.Flush()
tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
statuses := map[string]statusInfo{}
activeSeen := map[string]struct{}{}
if !done {
active, err := cs.Status(ctx, "")
if err != nil {
log.G(ctx).WithError(err).Error("active check failed")
continue
}
// update status of active entries!
for _, active := range active {
statuses[active.Ref] = statusInfo{
Ref: active.Ref,
Status: "downloading",
Offset: active.Offset,
Total: active.Total,
StartedAt: active.StartedAt,
UpdatedAt: active.UpdatedAt,
}
activeSeen[active.Ref] = struct{}{}
}
}
js := ongoing.jobs()
// now, update the items in jobs that are not in active
for _, j := range js {
if _, ok := activeSeen[j]; ok {
continue
}
status := "done"
if j == ref {
select {
case <-resolved:
status = "resolved"
default:
status = "resolving"
}
}
statuses[j] = statusInfo{
Ref: j,
Status: status, // for now!
}
}
var ordered []statusInfo
for _, j := range js {
ordered = append(ordered, statuses[j])
}
display(tw, ordered, start)
tw.Flush()
if done {
fw.Flush()
return nil
}
case err := <-errs:
if err != nil {
return err
}
done = true
case <-ctx.Done():
done = true // allow ui to update once more
}
}
_, err := fetch(ctx, ref, clicontext)
return err
},
}
func fetch(ctx context.Context, ref string, clicontext *cli.Context) (containerd.Image, error) {
client, err := getClient(clicontext)
if err != nil {
return nil, err
}
resolver, err := getResolver(ctx, clicontext)
if err != nil {
return nil, err
}
ongoing := newJobs(ref)
pctx, stopProgress := context.WithCancel(ctx)
progress := make(chan struct{})
go func() {
showProgress(pctx, ongoing, client.ContentStore(), os.Stdout)
close(progress)
}()
h := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ongoing.add(desc)
return nil, nil
})
log.G(pctx).WithField("image", ref).Debug("fetching")
img, err := client.Pull(pctx, ref, containerd.WithResolver(resolver), containerd.WithImageHandler(h))
stopProgress()
if err != nil {
return nil, err
}
<-progress
return img, nil
}
func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, out io.Writer) {
var (
ticker = time.NewTicker(100 * time.Millisecond)
fw = progress.NewWriter(out)
start = time.Now()
statuses = map[string]statusInfo{}
done bool
)
defer ticker.Stop()
outer:
for {
select {
case <-ticker.C:
fw.Flush()
tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
resolved := "resolved"
if !ongoing.isResolved() {
resolved = "resolving"
}
statuses[ongoing.name] = statusInfo{
Ref: ongoing.name,
Status: resolved,
}
keys := []string{ongoing.name}
activeSeen := map[string]struct{}{}
if !done {
active, err := cs.Status(ctx, "")
if err != nil {
log.G(ctx).WithError(err).Error("active check failed")
continue
}
// update status of active entries!
for _, active := range active {
statuses[active.Ref] = statusInfo{
Ref: active.Ref,
Status: "downloading",
Offset: active.Offset,
Total: active.Total,
StartedAt: active.StartedAt,
UpdatedAt: active.UpdatedAt,
}
activeSeen[active.Ref] = struct{}{}
}
}
// now, update the items in jobs that are not in active
for _, j := range ongoing.jobs() {
key := remotes.MakeRefKey(ctx, j)
keys = append(keys, key)
if _, ok := activeSeen[key]; ok {
continue
}
status, ok := statuses[key]
if !done && (!ok || status.Status == "downloading") {
info, err := cs.Info(ctx, j.Digest)
if err != nil {
if !content.IsNotFound(err) {
log.G(ctx).WithError(err).Errorf("failed to get content info")
continue outer
} else {
statuses[key] = statusInfo{
Ref: key,
Status: "waiting",
}
}
} else if info.CommittedAt.After(start) {
statuses[key] = statusInfo{
Ref: key,
Status: "done",
Offset: info.Size,
Total: info.Size,
UpdatedAt: info.CommittedAt,
}
} else {
statuses[key] = statusInfo{
Ref: key,
Status: "exists",
}
}
} else if done {
if ok {
if status.Status != "done" && status.Status != "exists" {
status.Status = "done"
statuses[key] = status
}
} else {
statuses[key] = statusInfo{
Ref: key,
Status: "done",
}
}
}
}
var ordered []statusInfo
for _, key := range keys {
ordered = append(ordered, statuses[key])
}
display(tw, ordered, start)
tw.Flush()
if done {
fw.Flush()
return
}
case <-ctx.Done():
done = true // allow ui to update once more
}
}
}
// jobs provides a way of identifying the download keys for a particular task
// encountering during the pull walk.
//
// This is very minimal and will probably be replaced with something more
// featured.
type jobs struct {
added map[string]struct{}
refs []string
mu sync.Mutex
name string
added map[digest.Digest]struct{}
descs []ocispec.Descriptor
mu sync.Mutex
resolved bool
}
func newJobs() *jobs {
return &jobs{added: make(map[string]struct{})}
func newJobs(name string) *jobs {
return &jobs{
name: name,
added: map[digest.Digest]struct{}{},
}
}
func (j *jobs) add(ref string) {
func (j *jobs) add(desc ocispec.Descriptor) {
j.mu.Lock()
defer j.mu.Unlock()
j.resolved = true
if _, ok := j.added[ref]; ok {
if _, ok := j.added[desc.Digest]; ok {
return
}
j.refs = append(j.refs, ref)
j.added[ref] = struct{}{}
j.descs = append(j.descs, desc)
j.added[desc.Digest] = struct{}{}
}
func (j *jobs) jobs() []string {
func (j *jobs) jobs() []ocispec.Descriptor {
j.mu.Lock()
defer j.mu.Unlock()
var jobs []string
return append(jobs, j.refs...)
var descs []ocispec.Descriptor
return append(descs, j.descs...)
}
func (j *jobs) isResolved() bool {
j.mu.Lock()
defer j.mu.Unlock()
return j.resolved
}
type statusInfo struct {

187
cmd/dist/pull.go vendored
View File

@ -1,23 +1,10 @@
package main
import (
"context"
"os"
"text/tabwriter"
"time"
"fmt"
diffapi "github.com/containerd/containerd/api/services/diff"
snapshotapi "github.com/containerd/containerd/api/services/snapshot"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/progress"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/rootfs"
diffservice "github.com/containerd/containerd/services/diff"
snapshotservice "github.com/containerd/containerd/services/snapshot"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/urfave/cli"
"golang.org/x/sync/errgroup"
)
var pullCommand = cli.Command{
@ -42,175 +29,17 @@ command. As part of this process, we do the following:
ctx, cancel := appContext(clicontext)
defer cancel()
cs, err := resolveContentStore(clicontext)
img, err := fetch(ctx, ref, clicontext)
if err != nil {
return err
}
imageStore, err := resolveImageStore(clicontext)
if err != nil {
return err
}
resolver, err := getResolver(ctx, clicontext)
if err != nil {
return err
}
ongoing := newJobs()
eg, ctx := errgroup.WithContext(ctx)
var resolvedImageName string
resolved := make(chan struct{})
eg.Go(func() error {
ongoing.add(ref)
name, desc, err := resolver.Resolve(ctx, ref)
if err != nil {
log.G(ctx).WithError(err).Error("failed to resolve")
return err
}
fetcher, err := resolver.Fetcher(ctx, name)
if err != nil {
return err
}
log.G(ctx).WithField("image", name).Debug("fetching")
resolvedImageName = name
close(resolved)
eg.Go(func() error {
return imageStore.Put(ctx, name, desc)
})
return images.Dispatch(ctx,
images.Handlers(images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ongoing.add(remotes.MakeRefKey(ctx, desc))
return nil, nil
}),
remotes.FetchHandler(cs, fetcher),
images.ChildrenHandler(cs)),
desc)
})
errs := make(chan error)
go func() {
defer close(errs)
errs <- eg.Wait()
}()
defer func() {
// we need new ctx here, since we run on return.
ctx, cancel := appContext(clicontext)
defer cancel()
image, err := imageStore.Get(ctx, resolvedImageName)
if err != nil {
log.G(ctx).WithError(err).Fatal("failed to get image")
}
layers, err := getImageLayers(ctx, image, cs)
if err != nil {
log.G(ctx).WithError(err).Fatal("failed to get rootfs layers")
}
conn, err := connectGRPC(clicontext)
if err != nil {
log.G(ctx).Fatal(err)
}
snapshotter := snapshotservice.NewSnapshotterFromClient(snapshotapi.NewSnapshotClient(conn))
applier := diffservice.NewDiffServiceFromClient(diffapi.NewDiffClient(conn))
log.G(ctx).Info("unpacking rootfs")
chainID, err := rootfs.ApplyLayers(ctx, layers, snapshotter, applier)
if err != nil {
log.G(ctx).Fatal(err)
}
log.G(ctx).Infof("Unpacked chain id: %s", chainID)
}()
var (
ticker = time.NewTicker(100 * time.Millisecond)
fw = progress.NewWriter(os.Stdout)
start = time.Now()
done bool
)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fw.Flush()
tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
js := ongoing.jobs()
statuses := map[string]statusInfo{}
activeSeen := map[string]struct{}{}
if !done {
active, err := cs.Status(ctx, "")
if err != nil {
log.G(ctx).WithError(err).Error("active check failed")
continue
}
// update status of active entries!
for _, active := range active {
statuses[active.Ref] = statusInfo{
Ref: active.Ref,
Status: "downloading",
Offset: active.Offset,
Total: active.Total,
StartedAt: active.StartedAt,
UpdatedAt: active.UpdatedAt,
}
activeSeen[active.Ref] = struct{}{}
}
}
// now, update the items in jobs that are not in active
for _, j := range js {
if _, ok := activeSeen[j]; ok {
continue
}
status := "done"
if j == ref {
select {
case <-resolved:
status = "resolved"
default:
status = "resolving"
}
}
statuses[j] = statusInfo{
Ref: j,
Status: status, // for now!
}
}
var ordered []statusInfo
for _, j := range js {
ordered = append(ordered, statuses[j])
}
display(tw, ordered, start)
tw.Flush()
if done {
fw.Flush()
return nil
}
case err := <-errs:
if err != nil {
return err
}
done = true
case <-ctx.Done():
done = true // allow ui to update once more
}
}
log.G(ctx).WithField("image", ref).Debug("unpacking")
// TODO: Show unpack status
fmt.Printf("unpacking %s...", img.Target().Digest)
err = img.Unpack(ctx)
fmt.Println("done")
return err
},
}

52
cmd/dist/rootfs.go vendored
View File

@ -1,19 +1,15 @@
package main
import (
"errors"
"fmt"
"os"
"strings"
diffapi "github.com/containerd/containerd/api/services/diff"
snapshotapi "github.com/containerd/containerd/api/services/snapshot"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/rootfs"
diffservice "github.com/containerd/containerd/services/diff"
snapshotservice "github.com/containerd/containerd/services/snapshot"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/urfave/cli"
)
@ -40,39 +36,39 @@ var rootfsUnpackCommand = cli.Command{
return err
}
log.G(ctx).Infof("unpacking layers from manifest %s", dgst.String())
log.G(ctx).Debugf("unpacking layers from manifest %s", dgst.String())
cs, err := resolveContentStore(clicontext)
client, err := getClient(clicontext)
if err != nil {
return err
}
image := images.Image{
Target: ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageManifest,
Digest: dgst,
},
}
// TODO: Support unpack by name
layers, err := getImageLayers(ctx, image, cs)
if err != nil {
log.G(ctx).WithError(err).Fatal("Failed to get rootfs layers")
}
conn, err := connectGRPC(clicontext)
if err != nil {
log.G(ctx).Fatal(err)
}
snapshotter := snapshotservice.NewSnapshotterFromClient(snapshotapi.NewSnapshotClient(conn))
applier := diffservice.NewDiffServiceFromClient(diffapi.NewDiffClient(conn))
chainID, err := rootfs.ApplyLayers(ctx, layers, snapshotter, applier)
images, err := client.ListImages(ctx)
if err != nil {
return err
}
log.G(ctx).Infof("chain ID: %s", chainID.String())
var unpacked bool
for _, image := range images {
if image.Target().Digest == dgst {
fmt.Printf("unpacking %s (%s)...", dgst, image.Target().MediaType)
if err := image.Unpack(ctx); err != nil {
fmt.Println()
return err
}
fmt.Println("done")
unpacked = true
break
}
}
if !unpacked {
return errors.New("manifest not found")
}
// TODO: Get rootfs from Image
//log.G(ctx).Infof("chain ID: %s", chainID.String())
return nil
},

View File

@ -1,9 +1,22 @@
package containerd
import "github.com/containerd/containerd/images"
import (
"context"
"encoding/json"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/rootfs"
"github.com/opencontainers/image-spec/specs-go/v1"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
type Image interface {
Name() string
Target() ocispec.Descriptor
Unpack(context.Context) error
}
var _ = (Image)(&image{})
@ -17,3 +30,47 @@ type image struct {
func (i *image) Name() string {
return i.i.Name
}
func (i *image) Target() ocispec.Descriptor {
return i.i.Target
}
func (i *image) Unpack(ctx context.Context) error {
layers, err := i.getLayers(ctx)
if err != nil {
return err
}
if _, err := rootfs.ApplyLayers(ctx, layers, i.client.SnapshotService(), i.client.DiffService()); err != nil {
return err
}
return nil
}
func (i *image) getLayers(ctx context.Context) ([]rootfs.Layer, error) {
cs := i.client.ContentStore()
p, err := content.ReadBlob(ctx, cs, i.i.Target.Digest)
if err != nil {
return nil, errors.Wrapf(err, "failed to read manifest blob")
}
var manifest v1.Manifest
if err := json.Unmarshal(p, &manifest); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal manifest")
}
diffIDs, err := i.i.RootFS(ctx, cs)
if err != nil {
return nil, errors.Wrap(err, "failed to resolve rootfs")
}
if len(diffIDs) != len(manifest.Layers) {
return nil, errors.Errorf("mismatched image rootfs and manifest layers")
}
layers := make([]rootfs.Layer, len(diffIDs))
for i := range diffIDs {
layers[i].Diff = v1.Descriptor{
// TODO: derive media type from compressed type
MediaType: v1.MediaTypeImageLayer,
Digest: diffIDs[i],
}
layers[i].Blob = manifest.Layers[i]
}
return layers, nil
}