From f701b3b96086622f792b7a6d8b00d39b19d18ba6 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Tue, 1 May 2018 16:59:27 -0700 Subject: [PATCH 1/4] Fix race in ctr pull Signed-off-by: Derek McGowan --- cmd/ctr/commands/content/fetch.go | 16 ++++++++-------- cmd/ctr/commands/images/pull.go | 14 ++++++++++++-- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/cmd/ctr/commands/content/fetch.go b/cmd/ctr/commands/content/fetch.go index 96933eb9a..01abb4cad 100644 --- a/cmd/ctr/commands/content/fetch.go +++ b/cmd/ctr/commands/content/fetch.go @@ -61,19 +61,19 @@ Most of this is experimental and there are few leaps to make this work.`, var ( ref = clicontext.Args().First() ) - _, err := Fetch(ref, clicontext) + client, ctx, cancel, err := commands.NewClient(clicontext) + if err != nil { + return err + } + defer cancel() + + _, err = Fetch(ctx, client, ref, clicontext) return err }, } // Fetch loads all resources into the content store and returns the image -func Fetch(ref string, cliContext *cli.Context) (containerd.Image, error) { - client, ctx, cancel, err := commands.NewClient(cliContext) - if err != nil { - return nil, err - } - defer cancel() - +func Fetch(ctx context.Context, client *containerd.Client, ref string, cliContext *cli.Context) (containerd.Image, error) { resolver, err := commands.GetResolver(ctx, cliContext) if err != nil { return nil, err diff --git a/cmd/ctr/commands/images/pull.go b/cmd/ctr/commands/images/pull.go index 4ea72b6bf..10e4f1d52 100644 --- a/cmd/ctr/commands/images/pull.go +++ b/cmd/ctr/commands/images/pull.go @@ -57,10 +57,20 @@ command. As part of this process, we do the following: if ref == "" { return fmt.Errorf("please provide an image reference to pull") } - ctx, cancel := commands.AppContext(context) + + client, ctx, cancel, err := commands.NewClient(context) + if err != nil { + return err + } defer cancel() - img, err := content.Fetch(ref, context) + ctx, done, err := client.WithLease(ctx) + if err != nil { + return err + } + defer done(ctx) + + img, err := content.Fetch(ctx, client, ref, context) if err != nil { return err } From 28caf9027ee02812a94de6b9a2656f2036fb409a Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Fri, 4 May 2018 14:30:57 -0700 Subject: [PATCH 2/4] Add recursive apply layer function Update apply layers to recursive from the top layer. Update apply layer to check for exists and apply single layer. Signed-off-by: Derek McGowan --- rootfs/apply.go | 117 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 80 insertions(+), 37 deletions(-) diff --git a/rootfs/apply.go b/rootfs/apply.go index 408b4f3a9..a1a2db938 100644 --- a/rootfs/apply.go +++ b/rootfs/apply.go @@ -26,6 +26,7 @@ import ( "github.com/containerd/containerd/diff" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/mount" "github.com/containerd/containerd/snapshots" "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/identity" @@ -47,16 +48,27 @@ type Layer struct { // Layers are applied in order they are given, making the first layer the // bottom-most layer in the layer chain. func ApplyLayers(ctx context.Context, layers []Layer, sn snapshots.Snapshotter, a diff.Applier) (digest.Digest, error) { - var chain []digest.Digest - for _, layer := range layers { - if _, err := ApplyLayer(ctx, layer, chain, sn, a); err != nil { - // TODO: possibly wait and retry if extraction of same chain id was in progress - return "", err + chain := make([]digest.Digest, len(layers)) + for i, layer := range layers { + chain[i] = layer.Diff.Digest + } + chainID := identity.ChainID(chain) + + // Just stat top layer, remaining layers will have their existence checked + // on prepare. Calling prepare on upper layers first guarantees that upper + // layers are not removed while calling stat on lower layers + _, err := sn.Stat(ctx, chainID.String()) + if err != nil { + if !errdefs.IsNotFound(err) { + return "", errors.Wrapf(err, "failed to stat snapshot %s", chainID) } - chain = append(chain, layer.Diff.Digest) + if err := applyLayers(ctx, layers, chain, sn, a); err != nil && !errdefs.IsAlreadyExists(err) { + return "", err + } } - return identity.ChainID(chain), nil + + return chainID, nil } // ApplyLayer applies a single layer on top of the given provided layer chain, @@ -64,59 +76,90 @@ func ApplyLayers(ctx context.Context, layers []Layer, sn snapshots.Snapshotter, // is returned, if the layer already exists false is returned. func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snapshots.Snapshotter, a diff.Applier, opts ...snapshots.Opt) (bool, error) { var ( - parent = identity.ChainID(chain) - chainID = identity.ChainID(append(chain, layer.Diff.Digest)) + chainID = identity.ChainID(append(chain, layer.Diff.Digest)).String() + applied bool + ) + if _, err := sn.Stat(ctx, chainID); err != nil { + if !errdefs.IsNotFound(err) { + return false, errors.Wrapf(err, "failed to stat snapshot %s", chainID) + } + + if err := applyLayers(ctx, []Layer{layer}, append(chain, layer.Diff.Digest), sn, a, opts...); err != nil { + if !errdefs.IsAlreadyExists(err) { + return false, err + } + } else { + applied = true + } + } + return applied, nil +} + +func applyLayers(ctx context.Context, layers []Layer, chain []digest.Digest, sn snapshots.Snapshotter, a diff.Applier, opts ...snapshots.Opt) error { + var ( + parent = identity.ChainID(chain[:len(chain)-1]) + chainID = identity.ChainID(chain) + layer = layers[len(layers)-1] diff ocispec.Descriptor + key string + mounts []mount.Mount + err error ) - _, err := sn.Stat(ctx, chainID.String()) - if err == nil { - log.G(ctx).Debugf("Extraction not needed, layer snapshot %s exists", chainID) - return false, nil - } else if !errdefs.IsNotFound(err) { - return false, errors.Wrapf(err, "failed to stat snapshot %s", chainID) - } + for { + key = fmt.Sprintf("extract-%s %s", uniquePart(), chainID) - key := fmt.Sprintf("extract-%s %s", uniquePart(), chainID) + // Prepare snapshot with from parent, label as root + mounts, err = sn.Prepare(ctx, key, parent.String(), opts...) + if err != nil { + if errdefs.IsNotFound(err) && len(layers) > 1 { + if err := applyLayers(ctx, layers[:len(layers)-1], chain[:len(chain)-1], sn, a); err != nil { + if !errdefs.IsAlreadyExists(err) { + return err + } + } + // Do no try applying layers again + layers = nil + continue + } else if errdefs.IsAlreadyExists(err) { + // Try a different key + continue + } - // Prepare snapshot with from parent, label as root - mounts, err := sn.Prepare(ctx, key, parent.String(), opts...) - if err != nil { - //TODO: If is snapshot exists error, retry - return false, errors.Wrapf(err, "failed to prepare extraction snapshot %q", key) + // Already exists should have the caller retry + return errors.Wrapf(err, "failed to prepare extraction snapshot %q", key) + + } + break } defer func() { if err != nil { - log.G(ctx).WithError(err).WithField("key", key).Infof("Apply failure, attempting cleanup") + if !errdefs.IsAlreadyExists(err) { + log.G(ctx).WithError(err).WithField("key", key).Infof("apply failure, attempting cleanup") + } + if rerr := sn.Remove(ctx, key); rerr != nil { - log.G(ctx).WithError(rerr).Warnf("Extraction snapshot %q removal failed", key) + log.G(ctx).WithError(rerr).WithField("key", key).Warnf("extraction snapshot removal failed") } } }() diff, err = a.Apply(ctx, layer.Blob, mounts) if err != nil { - return false, errors.Wrapf(err, "failed to extract layer %s", layer.Diff.Digest) + err = errors.Wrapf(err, "failed to extract layer %s", layer.Diff.Digest) + return err } if diff.Digest != layer.Diff.Digest { err = errors.Errorf("wrong diff id calculated on extraction %q", diff.Digest) - return false, err + return err } if err = sn.Commit(ctx, chainID.String(), key, opts...); err != nil { - if !errdefs.IsAlreadyExists(err) { - return false, errors.Wrapf(err, "failed to commit snapshot %s", key) - } - - // Destination already exists, cleanup key and return without error - err = nil - if err := sn.Remove(ctx, key); err != nil { - return false, errors.Wrapf(err, "failed to cleanup aborted apply %s", key) - } - return false, nil + err = errors.Wrapf(err, "failed to commit snapshot %s", key) + return err } - return true, nil + return nil } func uniquePart() string { From 2bc9f49ffdcd8ee348917c7e625159dca7d129f9 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Fri, 4 May 2018 14:31:27 -0700 Subject: [PATCH 3/4] Retry image creation after update not found Signed-off-by: Derek McGowan --- client.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/client.go b/client.go index 340e41efc..6163a611b 100644 --- a/client.go +++ b/client.go @@ -345,19 +345,26 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image } is := c.ImageService() - if created, err := is.Create(ctx, imgrec); err != nil { - if !errdefs.IsAlreadyExists(err) { - return nil, err - } + for { + if created, err := is.Create(ctx, imgrec); err != nil { + if !errdefs.IsAlreadyExists(err) { + return nil, err + } - updated, err := is.Update(ctx, imgrec) - if err != nil { - return nil, err - } + updated, err := is.Update(ctx, imgrec) + if err != nil { + // if image was removed, try create again + if errdefs.IsNotFound(err) { + continue + } + return nil, err + } - imgrec = updated - } else { - imgrec = created + imgrec = updated + } else { + imgrec = created + } + break } img := &image{ From f0b3d5a2c5f2e867cbdfe26ff7c8690707135eeb Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Mon, 7 May 2018 11:17:30 -0700 Subject: [PATCH 4/4] Move image creation after unpack Signed-off-by: Derek McGowan --- client.go | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/client.go b/client.go index 6163a611b..cebdf89d6 100644 --- a/client.go +++ b/client.go @@ -338,20 +338,29 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image } } - imgrec := images.Image{ - Name: name, - Target: desc, - Labels: pullCtx.Labels, + img := &image{ + client: c, + i: images.Image{ + Name: name, + Target: desc, + Labels: pullCtx.Labels, + }, + } + + if pullCtx.Unpack { + if err := img.Unpack(ctx, pullCtx.Snapshotter); err != nil { + errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter) + } } is := c.ImageService() for { - if created, err := is.Create(ctx, imgrec); err != nil { + if created, err := is.Create(ctx, img.i); err != nil { if !errdefs.IsAlreadyExists(err) { return nil, err } - updated, err := is.Update(ctx, imgrec) + updated, err := is.Update(ctx, img.i) if err != nil { // if image was removed, try create again if errdefs.IsNotFound(err) { @@ -360,23 +369,12 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image return nil, err } - imgrec = updated + img.i = updated } else { - imgrec = created + img.i = created } - break + return img, nil } - - img := &image{ - client: c, - i: imgrec, - } - if pullCtx.Unpack { - if err := img.Unpack(ctx, pullCtx.Snapshotter); err != nil { - errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter) - } - } - return img, nil } // Push uploads the provided content to a remote resource