Update dist pull to use client

Replaced pull unpacker with boolean to call unpack.
Added unpack and target to image type.
Updated progress logic for pull.
Added list images to client.
Updated rootfs unpacker to use client.

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
Derek McGowan 2017-05-31 13:06:41 -07:00
parent bdf9f5f738
commit ca25c0408e
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
5 changed files with 312 additions and 433 deletions

115
client.go
View File

@ -2,7 +2,6 @@ package containerd
import (
"context"
"encoding/json"
"io/ioutil"
"log"
"net/http"
@ -21,7 +20,6 @@ import (
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/rootfs"
contentservice "github.com/containerd/containerd/services/content"
"github.com/containerd/containerd/services/diff"
diffservice "github.com/containerd/containerd/services/diff"
@ -29,7 +27,6 @@ import (
snapshotservice "github.com/containerd/containerd/services/snapshot"
"github.com/containerd/containerd/snapshot"
"github.com/opencontainers/image-spec/identity"
"github.com/opencontainers/image-spec/specs-go/v1"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"google.golang.org/grpc"
@ -205,10 +202,10 @@ type RemoteContext struct {
// If no resolver is provided, defaults to Docker registry resolver.
Resolver remotes.Resolver
// Unpacker is used after an image is pulled to extract into a registry.
// Unpack is done after an image is pulled to extract into a snapshotter.
// If an image is not unpacked on pull, it can be unpacked any time
// afterwards. Unpacking is required to run an image.
Unpacker Unpacker
Unpack bool
// PushWrapper allows hooking into the push method. This can be used
// track content that is being sent to the remote.
@ -232,11 +229,7 @@ func defaultRemoteContext() *RemoteContext {
// uses the snapshotter, content store, and diff service
// configured for the client.
func WithPullUnpack(client *Client, c *RemoteContext) error {
c.Unpacker = &snapshotUnpacker{
store: client.ContentStore(),
diff: client.DiffService(),
snapshotter: client.SnapshotService(),
}
c.Unpack = true
return nil
}
@ -265,55 +258,6 @@ func WithPushWrapper(w func(remotes.Pusher) remotes.Pusher) RemoteOpts {
}
}
type Unpacker interface {
Unpack(context.Context, images.Image) error
}
type snapshotUnpacker struct {
snapshotter snapshot.Snapshotter
store content.Store
diff diff.DiffService
}
func (s *snapshotUnpacker) Unpack(ctx context.Context, image images.Image) error {
layers, err := s.getLayers(ctx, image)
if err != nil {
return err
}
if _, err := rootfs.ApplyLayers(ctx, layers, s.snapshotter, s.diff); err != nil {
return err
}
return nil
}
func (s *snapshotUnpacker) getLayers(ctx context.Context, image images.Image) ([]rootfs.Layer, error) {
p, err := content.ReadBlob(ctx, s.store, image.Target.Digest)
if err != nil {
return nil, errors.Wrapf(err, "failed to read manifest blob")
}
var manifest v1.Manifest
if err := json.Unmarshal(p, &manifest); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal manifest")
}
diffIDs, err := image.RootFS(ctx, s.store)
if err != nil {
return nil, errors.Wrap(err, "failed to resolve rootfs")
}
if len(diffIDs) != len(manifest.Layers) {
return nil, errors.Errorf("mismatched image rootfs and manifest layers")
}
layers := make([]rootfs.Layer, len(diffIDs))
for i := range diffIDs {
layers[i].Diff = v1.Descriptor{
// TODO: derive media type from compressed type
MediaType: v1.MediaTypeImageLayer,
Digest: diffIDs[i],
}
layers[i].Blob = manifest.Layers[i]
}
return layers, nil
}
func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpts) (Image, error) {
pullCtx := defaultRemoteContext()
for _, o := range opts {
@ -347,27 +291,16 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpts) (Imag
if err != nil {
return nil, err
}
if pullCtx.Unpacker != nil {
if err := pullCtx.Unpacker.Unpack(ctx, i); err != nil {
img := &image{
client: c,
i: i,
}
if pullCtx.Unpack {
if err := img.Unpack(ctx); err != nil {
return nil, err
}
}
return &image{
client: c,
i: i,
}, nil
}
// GetImage returns an existing image
func (c *Client) GetImage(ctx context.Context, ref string) (Image, error) {
i, err := c.ImageService().Get(ctx, ref)
if err != nil {
return nil, err
}
return &image{
client: c,
i: i,
}, nil
return img, nil
}
func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, opts ...RemoteOpts) error {
@ -426,6 +359,34 @@ func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor,
return nil
}
// GetImage returns an existing image
func (c *Client) GetImage(ctx context.Context, ref string) (Image, error) {
i, err := c.ImageService().Get(ctx, ref)
if err != nil {
return nil, err
}
return &image{
client: c,
i: i,
}, nil
}
// ListImages returns all existing images
func (c *Client) ListImages(ctx context.Context) ([]Image, error) {
imgs, err := c.ImageService().List(ctx)
if err != nil {
return nil, err
}
images := make([]Image, len(imgs))
for i, img := range imgs {
images[i] = &image{
client: c,
i: img,
}
}
return images, nil
}
// Close closes the clients connection to containerd
func (c *Client) Close() error {
return c.conn.Close()

332
cmd/dist/fetch.go vendored
View File

@ -9,15 +9,15 @@ import (
"text/tabwriter"
"time"
contentapi "github.com/containerd/containerd/api/services/content"
"github.com/containerd/containerd"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/progress"
"github.com/containerd/containerd/remotes"
contentservice "github.com/containerd/containerd/services/content"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/urfave/cli"
"golang.org/x/sync/errgroup"
)
var fetchCommand = cli.Command{
@ -47,174 +47,210 @@ Most of this is experimental and there are few leaps to make this work.`,
ctx, cancel := appContext(clicontext)
defer cancel()
conn, err := connectGRPC(clicontext)
if err != nil {
return err
}
resolver, err := getResolver(ctx, clicontext)
if err != nil {
return err
}
ongoing := newJobs()
content := contentservice.NewStoreFromClient(contentapi.NewContentClient(conn))
// TODO(stevvooe): Need to replace this with content store client.
cs, err := resolveContentStore(clicontext)
if err != nil {
return err
}
eg, ctx := errgroup.WithContext(ctx)
resolved := make(chan struct{})
eg.Go(func() error {
ongoing.add(ref)
name, desc, err := resolver.Resolve(ctx, ref)
if err != nil {
return err
}
fetcher, err := resolver.Fetcher(ctx, name)
if err != nil {
return err
}
log.G(ctx).WithField("image", name).Debug("fetching")
close(resolved)
return images.Dispatch(ctx,
images.Handlers(images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ongoing.add(remotes.MakeRefKey(ctx, desc))
return nil, nil
}),
remotes.FetchHandler(content, fetcher),
images.ChildrenHandler(content),
),
desc)
})
errs := make(chan error)
go func() {
defer close(errs)
errs <- eg.Wait()
}()
ticker := time.NewTicker(100 * time.Millisecond)
fw := progress.NewWriter(os.Stdout)
start := time.Now()
defer ticker.Stop()
var done bool
for {
select {
case <-ticker.C:
fw.Flush()
tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
statuses := map[string]statusInfo{}
activeSeen := map[string]struct{}{}
if !done {
active, err := cs.Status(ctx, "")
if err != nil {
log.G(ctx).WithError(err).Error("active check failed")
continue
}
// update status of active entries!
for _, active := range active {
statuses[active.Ref] = statusInfo{
Ref: active.Ref,
Status: "downloading",
Offset: active.Offset,
Total: active.Total,
StartedAt: active.StartedAt,
UpdatedAt: active.UpdatedAt,
}
activeSeen[active.Ref] = struct{}{}
}
}
js := ongoing.jobs()
// now, update the items in jobs that are not in active
for _, j := range js {
if _, ok := activeSeen[j]; ok {
continue
}
status := "done"
if j == ref {
select {
case <-resolved:
status = "resolved"
default:
status = "resolving"
}
}
statuses[j] = statusInfo{
Ref: j,
Status: status, // for now!
}
}
var ordered []statusInfo
for _, j := range js {
ordered = append(ordered, statuses[j])
}
display(tw, ordered, start)
tw.Flush()
if done {
fw.Flush()
return nil
}
case err := <-errs:
if err != nil {
return err
}
done = true
case <-ctx.Done():
done = true // allow ui to update once more
}
}
_, err := fetch(ctx, ref, clicontext)
return err
},
}
func fetch(ctx context.Context, ref string, clicontext *cli.Context) (containerd.Image, error) {
client, err := getClient(clicontext)
if err != nil {
return nil, err
}
resolver, err := getResolver(ctx, clicontext)
if err != nil {
return nil, err
}
ongoing := newJobs(ref)
pctx, stopProgress := context.WithCancel(ctx)
progress := make(chan struct{})
go func() {
showProgress(pctx, ongoing, client.ContentStore(), os.Stdout)
close(progress)
}()
h := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ongoing.add(desc)
return nil, nil
})
log.G(pctx).WithField("image", ref).Debug("fetching")
img, err := client.Pull(pctx, ref, containerd.WithResolver(resolver), containerd.WithImageHandler(h))
stopProgress()
if err != nil {
return nil, err
}
<-progress
return img, nil
}
func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, out io.Writer) {
var (
ticker = time.NewTicker(100 * time.Millisecond)
fw = progress.NewWriter(out)
start = time.Now()
statuses = map[string]statusInfo{}
done bool
)
defer ticker.Stop()
outer:
for {
select {
case <-ticker.C:
fw.Flush()
tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
resolved := "resolved"
if !ongoing.isResolved() {
resolved = "resolving"
}
statuses[ongoing.name] = statusInfo{
Ref: ongoing.name,
Status: resolved,
}
keys := []string{ongoing.name}
activeSeen := map[string]struct{}{}
if !done {
active, err := cs.Status(ctx, "")
if err != nil {
log.G(ctx).WithError(err).Error("active check failed")
continue
}
// update status of active entries!
for _, active := range active {
statuses[active.Ref] = statusInfo{
Ref: active.Ref,
Status: "downloading",
Offset: active.Offset,
Total: active.Total,
StartedAt: active.StartedAt,
UpdatedAt: active.UpdatedAt,
}
activeSeen[active.Ref] = struct{}{}
}
}
// now, update the items in jobs that are not in active
for _, j := range ongoing.jobs() {
key := remotes.MakeRefKey(ctx, j)
keys = append(keys, key)
if _, ok := activeSeen[key]; ok {
continue
}
status, ok := statuses[key]
if !done && (!ok || status.Status == "downloading") {
info, err := cs.Info(ctx, j.Digest)
if err != nil {
if !content.IsNotFound(err) {
log.G(ctx).WithError(err).Errorf("failed to get content info")
continue outer
} else {
statuses[key] = statusInfo{
Ref: key,
Status: "waiting",
}
}
} else if info.CommittedAt.After(start) {
statuses[key] = statusInfo{
Ref: key,
Status: "done",
Offset: info.Size,
Total: info.Size,
UpdatedAt: info.CommittedAt,
}
} else {
statuses[key] = statusInfo{
Ref: key,
Status: "exists",
}
}
} else if done {
if ok {
if status.Status != "done" && status.Status != "exists" {
status.Status = "done"
statuses[key] = status
}
} else {
statuses[key] = statusInfo{
Ref: key,
Status: "done",
}
}
}
}
var ordered []statusInfo
for _, key := range keys {
ordered = append(ordered, statuses[key])
}
display(tw, ordered, start)
tw.Flush()
if done {
fw.Flush()
return
}
case <-ctx.Done():
done = true // allow ui to update once more
}
}
}
// jobs provides a way of identifying the download keys for a particular task
// encountering during the pull walk.
//
// This is very minimal and will probably be replaced with something more
// featured.
type jobs struct {
added map[string]struct{}
refs []string
mu sync.Mutex
name string
added map[digest.Digest]struct{}
descs []ocispec.Descriptor
mu sync.Mutex
resolved bool
}
func newJobs() *jobs {
return &jobs{added: make(map[string]struct{})}
func newJobs(name string) *jobs {
return &jobs{
name: name,
added: map[digest.Digest]struct{}{},
}
}
func (j *jobs) add(ref string) {
func (j *jobs) add(desc ocispec.Descriptor) {
j.mu.Lock()
defer j.mu.Unlock()
j.resolved = true
if _, ok := j.added[ref]; ok {
if _, ok := j.added[desc.Digest]; ok {
return
}
j.refs = append(j.refs, ref)
j.added[ref] = struct{}{}
j.descs = append(j.descs, desc)
j.added[desc.Digest] = struct{}{}
}
func (j *jobs) jobs() []string {
func (j *jobs) jobs() []ocispec.Descriptor {
j.mu.Lock()
defer j.mu.Unlock()
var jobs []string
return append(jobs, j.refs...)
var descs []ocispec.Descriptor
return append(descs, j.descs...)
}
func (j *jobs) isResolved() bool {
j.mu.Lock()
defer j.mu.Unlock()
return j.resolved
}
type statusInfo struct {

187
cmd/dist/pull.go vendored
View File

@ -1,23 +1,10 @@
package main
import (
"context"
"os"
"text/tabwriter"
"time"
"fmt"
diffapi "github.com/containerd/containerd/api/services/diff"
snapshotapi "github.com/containerd/containerd/api/services/snapshot"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/progress"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/rootfs"
diffservice "github.com/containerd/containerd/services/diff"
snapshotservice "github.com/containerd/containerd/services/snapshot"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/urfave/cli"
"golang.org/x/sync/errgroup"
)
var pullCommand = cli.Command{
@ -42,175 +29,17 @@ command. As part of this process, we do the following:
ctx, cancel := appContext(clicontext)
defer cancel()
cs, err := resolveContentStore(clicontext)
img, err := fetch(ctx, ref, clicontext)
if err != nil {
return err
}
imageStore, err := resolveImageStore(clicontext)
if err != nil {
return err
}
resolver, err := getResolver(ctx, clicontext)
if err != nil {
return err
}
ongoing := newJobs()
eg, ctx := errgroup.WithContext(ctx)
var resolvedImageName string
resolved := make(chan struct{})
eg.Go(func() error {
ongoing.add(ref)
name, desc, err := resolver.Resolve(ctx, ref)
if err != nil {
log.G(ctx).WithError(err).Error("failed to resolve")
return err
}
fetcher, err := resolver.Fetcher(ctx, name)
if err != nil {
return err
}
log.G(ctx).WithField("image", name).Debug("fetching")
resolvedImageName = name
close(resolved)
eg.Go(func() error {
return imageStore.Put(ctx, name, desc)
})
return images.Dispatch(ctx,
images.Handlers(images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ongoing.add(remotes.MakeRefKey(ctx, desc))
return nil, nil
}),
remotes.FetchHandler(cs, fetcher),
images.ChildrenHandler(cs)),
desc)
})
errs := make(chan error)
go func() {
defer close(errs)
errs <- eg.Wait()
}()
defer func() {
// we need new ctx here, since we run on return.
ctx, cancel := appContext(clicontext)
defer cancel()
image, err := imageStore.Get(ctx, resolvedImageName)
if err != nil {
log.G(ctx).WithError(err).Fatal("failed to get image")
}
layers, err := getImageLayers(ctx, image, cs)
if err != nil {
log.G(ctx).WithError(err).Fatal("failed to get rootfs layers")
}
conn, err := connectGRPC(clicontext)
if err != nil {
log.G(ctx).Fatal(err)
}
snapshotter := snapshotservice.NewSnapshotterFromClient(snapshotapi.NewSnapshotClient(conn))
applier := diffservice.NewDiffServiceFromClient(diffapi.NewDiffClient(conn))
log.G(ctx).Info("unpacking rootfs")
chainID, err := rootfs.ApplyLayers(ctx, layers, snapshotter, applier)
if err != nil {
log.G(ctx).Fatal(err)
}
log.G(ctx).Infof("Unpacked chain id: %s", chainID)
}()
var (
ticker = time.NewTicker(100 * time.Millisecond)
fw = progress.NewWriter(os.Stdout)
start = time.Now()
done bool
)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fw.Flush()
tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
js := ongoing.jobs()
statuses := map[string]statusInfo{}
activeSeen := map[string]struct{}{}
if !done {
active, err := cs.Status(ctx, "")
if err != nil {
log.G(ctx).WithError(err).Error("active check failed")
continue
}
// update status of active entries!
for _, active := range active {
statuses[active.Ref] = statusInfo{
Ref: active.Ref,
Status: "downloading",
Offset: active.Offset,
Total: active.Total,
StartedAt: active.StartedAt,
UpdatedAt: active.UpdatedAt,
}
activeSeen[active.Ref] = struct{}{}
}
}
// now, update the items in jobs that are not in active
for _, j := range js {
if _, ok := activeSeen[j]; ok {
continue
}
status := "done"
if j == ref {
select {
case <-resolved:
status = "resolved"
default:
status = "resolving"
}
}
statuses[j] = statusInfo{
Ref: j,
Status: status, // for now!
}
}
var ordered []statusInfo
for _, j := range js {
ordered = append(ordered, statuses[j])
}
display(tw, ordered, start)
tw.Flush()
if done {
fw.Flush()
return nil
}
case err := <-errs:
if err != nil {
return err
}
done = true
case <-ctx.Done():
done = true // allow ui to update once more
}
}
log.G(ctx).WithField("image", ref).Debug("unpacking")
// TODO: Show unpack status
fmt.Printf("unpacking %s...", img.Target().Digest)
err = img.Unpack(ctx)
fmt.Println("done")
return err
},
}

52
cmd/dist/rootfs.go vendored
View File

@ -1,19 +1,15 @@
package main
import (
"errors"
"fmt"
"os"
"strings"
diffapi "github.com/containerd/containerd/api/services/diff"
snapshotapi "github.com/containerd/containerd/api/services/snapshot"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/rootfs"
diffservice "github.com/containerd/containerd/services/diff"
snapshotservice "github.com/containerd/containerd/services/snapshot"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/urfave/cli"
)
@ -40,39 +36,39 @@ var rootfsUnpackCommand = cli.Command{
return err
}
log.G(ctx).Infof("unpacking layers from manifest %s", dgst.String())
log.G(ctx).Debugf("unpacking layers from manifest %s", dgst.String())
cs, err := resolveContentStore(clicontext)
client, err := getClient(clicontext)
if err != nil {
return err
}
image := images.Image{
Target: ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageManifest,
Digest: dgst,
},
}
// TODO: Support unpack by name
layers, err := getImageLayers(ctx, image, cs)
if err != nil {
log.G(ctx).WithError(err).Fatal("Failed to get rootfs layers")
}
conn, err := connectGRPC(clicontext)
if err != nil {
log.G(ctx).Fatal(err)
}
snapshotter := snapshotservice.NewSnapshotterFromClient(snapshotapi.NewSnapshotClient(conn))
applier := diffservice.NewDiffServiceFromClient(diffapi.NewDiffClient(conn))
chainID, err := rootfs.ApplyLayers(ctx, layers, snapshotter, applier)
images, err := client.ListImages(ctx)
if err != nil {
return err
}
log.G(ctx).Infof("chain ID: %s", chainID.String())
var unpacked bool
for _, image := range images {
if image.Target().Digest == dgst {
fmt.Printf("unpacking %s (%s)...", dgst, image.Target().MediaType)
if err := image.Unpack(ctx); err != nil {
fmt.Println()
return err
}
fmt.Println("done")
unpacked = true
break
}
}
if !unpacked {
return errors.New("manifest not found")
}
// TODO: Get rootfs from Image
//log.G(ctx).Infof("chain ID: %s", chainID.String())
return nil
},

View File

@ -1,9 +1,22 @@
package containerd
import "github.com/containerd/containerd/images"
import (
"context"
"encoding/json"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/rootfs"
"github.com/opencontainers/image-spec/specs-go/v1"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
type Image interface {
Name() string
Target() ocispec.Descriptor
Unpack(context.Context) error
}
var _ = (Image)(&image{})
@ -17,3 +30,47 @@ type image struct {
func (i *image) Name() string {
return i.i.Name
}
func (i *image) Target() ocispec.Descriptor {
return i.i.Target
}
func (i *image) Unpack(ctx context.Context) error {
layers, err := i.getLayers(ctx)
if err != nil {
return err
}
if _, err := rootfs.ApplyLayers(ctx, layers, i.client.SnapshotService(), i.client.DiffService()); err != nil {
return err
}
return nil
}
func (i *image) getLayers(ctx context.Context) ([]rootfs.Layer, error) {
cs := i.client.ContentStore()
p, err := content.ReadBlob(ctx, cs, i.i.Target.Digest)
if err != nil {
return nil, errors.Wrapf(err, "failed to read manifest blob")
}
var manifest v1.Manifest
if err := json.Unmarshal(p, &manifest); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal manifest")
}
diffIDs, err := i.i.RootFS(ctx, cs)
if err != nil {
return nil, errors.Wrap(err, "failed to resolve rootfs")
}
if len(diffIDs) != len(manifest.Layers) {
return nil, errors.Errorf("mismatched image rootfs and manifest layers")
}
layers := make([]rootfs.Layer, len(diffIDs))
for i := range diffIDs {
layers[i].Diff = v1.Descriptor{
// TODO: derive media type from compressed type
MediaType: v1.MediaTypeImageLayer,
Digest: diffIDs[i],
}
layers[i].Blob = manifest.Layers[i]
}
return layers, nil
}