diff --git a/client.go b/client.go index eb8e9942a..3cf7ff3ee 100644 --- a/client.go +++ b/client.go @@ -338,38 +338,43 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image } } - imgrec := images.Image{ - Name: name, - Target: desc, - Labels: pullCtx.Labels, - } - - is := c.ImageService() - if created, err := is.Create(ctx, imgrec); err != nil { - if !errdefs.IsAlreadyExists(err) { - return nil, err - } - - updated, err := is.Update(ctx, imgrec) - if err != nil { - return nil, err - } - - imgrec = updated - } else { - imgrec = created - } - img := &image{ client: c, - i: imgrec, + i: images.Image{ + Name: name, + Target: desc, + Labels: pullCtx.Labels, + }, } + if pullCtx.Unpack { if err := img.Unpack(ctx, pullCtx.Snapshotter); err != nil { return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter) } } - return img, nil + + is := c.ImageService() + for { + if created, err := is.Create(ctx, img.i); err != nil { + if !errdefs.IsAlreadyExists(err) { + return nil, err + } + + updated, err := is.Update(ctx, img.i) + if err != nil { + // if image was removed, try create again + if errdefs.IsNotFound(err) { + continue + } + return nil, err + } + + img.i = updated + } else { + img.i = created + } + return img, nil + } } // Push uploads the provided content to a remote resource diff --git a/cmd/ctr/commands/content/fetch.go b/cmd/ctr/commands/content/fetch.go index 96933eb9a..01abb4cad 100644 --- a/cmd/ctr/commands/content/fetch.go +++ b/cmd/ctr/commands/content/fetch.go @@ -61,19 +61,19 @@ Most of this is experimental and there are few leaps to make this work.`, var ( ref = clicontext.Args().First() ) - _, err := Fetch(ref, clicontext) + client, ctx, cancel, err := commands.NewClient(clicontext) + if err != nil { + return err + } + defer cancel() + + _, err = Fetch(ctx, client, ref, clicontext) return err }, } // Fetch loads all resources into the content store and returns the image -func Fetch(ref string, cliContext *cli.Context) (containerd.Image, error) { - client, ctx, cancel, err := commands.NewClient(cliContext) - if err != nil { - return nil, err - } - defer cancel() - +func Fetch(ctx context.Context, client *containerd.Client, ref string, cliContext *cli.Context) (containerd.Image, error) { resolver, err := commands.GetResolver(ctx, cliContext) if err != nil { return nil, err diff --git a/cmd/ctr/commands/images/pull.go b/cmd/ctr/commands/images/pull.go index 4ea72b6bf..10e4f1d52 100644 --- a/cmd/ctr/commands/images/pull.go +++ b/cmd/ctr/commands/images/pull.go @@ -57,10 +57,20 @@ command. As part of this process, we do the following: if ref == "" { return fmt.Errorf("please provide an image reference to pull") } - ctx, cancel := commands.AppContext(context) + + client, ctx, cancel, err := commands.NewClient(context) + if err != nil { + return err + } defer cancel() - img, err := content.Fetch(ref, context) + ctx, done, err := client.WithLease(ctx) + if err != nil { + return err + } + defer done(ctx) + + img, err := content.Fetch(ctx, client, ref, context) if err != nil { return err } diff --git a/rootfs/apply.go b/rootfs/apply.go index 408b4f3a9..a1a2db938 100644 --- a/rootfs/apply.go +++ b/rootfs/apply.go @@ -26,6 +26,7 @@ import ( "github.com/containerd/containerd/diff" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/mount" "github.com/containerd/containerd/snapshots" "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/identity" @@ -47,16 +48,27 @@ type Layer struct { // Layers are applied in order they are given, making the first layer the // bottom-most layer in the layer chain. func ApplyLayers(ctx context.Context, layers []Layer, sn snapshots.Snapshotter, a diff.Applier) (digest.Digest, error) { - var chain []digest.Digest - for _, layer := range layers { - if _, err := ApplyLayer(ctx, layer, chain, sn, a); err != nil { - // TODO: possibly wait and retry if extraction of same chain id was in progress - return "", err + chain := make([]digest.Digest, len(layers)) + for i, layer := range layers { + chain[i] = layer.Diff.Digest + } + chainID := identity.ChainID(chain) + + // Just stat top layer, remaining layers will have their existence checked + // on prepare. Calling prepare on upper layers first guarantees that upper + // layers are not removed while calling stat on lower layers + _, err := sn.Stat(ctx, chainID.String()) + if err != nil { + if !errdefs.IsNotFound(err) { + return "", errors.Wrapf(err, "failed to stat snapshot %s", chainID) } - chain = append(chain, layer.Diff.Digest) + if err := applyLayers(ctx, layers, chain, sn, a); err != nil && !errdefs.IsAlreadyExists(err) { + return "", err + } } - return identity.ChainID(chain), nil + + return chainID, nil } // ApplyLayer applies a single layer on top of the given provided layer chain, @@ -64,59 +76,90 @@ func ApplyLayers(ctx context.Context, layers []Layer, sn snapshots.Snapshotter, // is returned, if the layer already exists false is returned. func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snapshots.Snapshotter, a diff.Applier, opts ...snapshots.Opt) (bool, error) { var ( - parent = identity.ChainID(chain) - chainID = identity.ChainID(append(chain, layer.Diff.Digest)) + chainID = identity.ChainID(append(chain, layer.Diff.Digest)).String() + applied bool + ) + if _, err := sn.Stat(ctx, chainID); err != nil { + if !errdefs.IsNotFound(err) { + return false, errors.Wrapf(err, "failed to stat snapshot %s", chainID) + } + + if err := applyLayers(ctx, []Layer{layer}, append(chain, layer.Diff.Digest), sn, a, opts...); err != nil { + if !errdefs.IsAlreadyExists(err) { + return false, err + } + } else { + applied = true + } + } + return applied, nil +} + +func applyLayers(ctx context.Context, layers []Layer, chain []digest.Digest, sn snapshots.Snapshotter, a diff.Applier, opts ...snapshots.Opt) error { + var ( + parent = identity.ChainID(chain[:len(chain)-1]) + chainID = identity.ChainID(chain) + layer = layers[len(layers)-1] diff ocispec.Descriptor + key string + mounts []mount.Mount + err error ) - _, err := sn.Stat(ctx, chainID.String()) - if err == nil { - log.G(ctx).Debugf("Extraction not needed, layer snapshot %s exists", chainID) - return false, nil - } else if !errdefs.IsNotFound(err) { - return false, errors.Wrapf(err, "failed to stat snapshot %s", chainID) - } + for { + key = fmt.Sprintf("extract-%s %s", uniquePart(), chainID) - key := fmt.Sprintf("extract-%s %s", uniquePart(), chainID) + // Prepare snapshot with from parent, label as root + mounts, err = sn.Prepare(ctx, key, parent.String(), opts...) + if err != nil { + if errdefs.IsNotFound(err) && len(layers) > 1 { + if err := applyLayers(ctx, layers[:len(layers)-1], chain[:len(chain)-1], sn, a); err != nil { + if !errdefs.IsAlreadyExists(err) { + return err + } + } + // Do no try applying layers again + layers = nil + continue + } else if errdefs.IsAlreadyExists(err) { + // Try a different key + continue + } - // Prepare snapshot with from parent, label as root - mounts, err := sn.Prepare(ctx, key, parent.String(), opts...) - if err != nil { - //TODO: If is snapshot exists error, retry - return false, errors.Wrapf(err, "failed to prepare extraction snapshot %q", key) + // Already exists should have the caller retry + return errors.Wrapf(err, "failed to prepare extraction snapshot %q", key) + + } + break } defer func() { if err != nil { - log.G(ctx).WithError(err).WithField("key", key).Infof("Apply failure, attempting cleanup") + if !errdefs.IsAlreadyExists(err) { + log.G(ctx).WithError(err).WithField("key", key).Infof("apply failure, attempting cleanup") + } + if rerr := sn.Remove(ctx, key); rerr != nil { - log.G(ctx).WithError(rerr).Warnf("Extraction snapshot %q removal failed", key) + log.G(ctx).WithError(rerr).WithField("key", key).Warnf("extraction snapshot removal failed") } } }() diff, err = a.Apply(ctx, layer.Blob, mounts) if err != nil { - return false, errors.Wrapf(err, "failed to extract layer %s", layer.Diff.Digest) + err = errors.Wrapf(err, "failed to extract layer %s", layer.Diff.Digest) + return err } if diff.Digest != layer.Diff.Digest { err = errors.Errorf("wrong diff id calculated on extraction %q", diff.Digest) - return false, err + return err } if err = sn.Commit(ctx, chainID.String(), key, opts...); err != nil { - if !errdefs.IsAlreadyExists(err) { - return false, errors.Wrapf(err, "failed to commit snapshot %s", key) - } - - // Destination already exists, cleanup key and return without error - err = nil - if err := sn.Remove(ctx, key); err != nil { - return false, errors.Wrapf(err, "failed to cleanup aborted apply %s", key) - } - return false, nil + err = errors.Wrapf(err, "failed to commit snapshot %s", key) + return err } - return true, nil + return nil } func uniquePart() string {