Merge pull request #2331 from dmcgowan/fix-image-remove-race
Fix image pull and remove race
This commit is contained in:
commit
e63768ea09
53
client.go
53
client.go
@ -338,38 +338,43 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
imgrec := images.Image{
|
|
||||||
Name: name,
|
|
||||||
Target: desc,
|
|
||||||
Labels: pullCtx.Labels,
|
|
||||||
}
|
|
||||||
|
|
||||||
is := c.ImageService()
|
|
||||||
if created, err := is.Create(ctx, imgrec); err != nil {
|
|
||||||
if !errdefs.IsAlreadyExists(err) {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
updated, err := is.Update(ctx, imgrec)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
imgrec = updated
|
|
||||||
} else {
|
|
||||||
imgrec = created
|
|
||||||
}
|
|
||||||
|
|
||||||
img := &image{
|
img := &image{
|
||||||
client: c,
|
client: c,
|
||||||
i: imgrec,
|
i: images.Image{
|
||||||
|
Name: name,
|
||||||
|
Target: desc,
|
||||||
|
Labels: pullCtx.Labels,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
if pullCtx.Unpack {
|
if pullCtx.Unpack {
|
||||||
if err := img.Unpack(ctx, pullCtx.Snapshotter); err != nil {
|
if err := img.Unpack(ctx, pullCtx.Snapshotter); err != nil {
|
||||||
return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter)
|
return nil, errors.Wrapf(err, "failed to unpack image on snapshotter %s", pullCtx.Snapshotter)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return img, nil
|
|
||||||
|
is := c.ImageService()
|
||||||
|
for {
|
||||||
|
if created, err := is.Create(ctx, img.i); err != nil {
|
||||||
|
if !errdefs.IsAlreadyExists(err) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
updated, err := is.Update(ctx, img.i)
|
||||||
|
if err != nil {
|
||||||
|
// if image was removed, try create again
|
||||||
|
if errdefs.IsNotFound(err) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
img.i = updated
|
||||||
|
} else {
|
||||||
|
img.i = created
|
||||||
|
}
|
||||||
|
return img, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Push uploads the provided content to a remote resource
|
// Push uploads the provided content to a remote resource
|
||||||
|
@ -61,19 +61,19 @@ Most of this is experimental and there are few leaps to make this work.`,
|
|||||||
var (
|
var (
|
||||||
ref = clicontext.Args().First()
|
ref = clicontext.Args().First()
|
||||||
)
|
)
|
||||||
_, err := Fetch(ref, clicontext)
|
client, ctx, cancel, err := commands.NewClient(clicontext)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
_, err = Fetch(ctx, client, ref, clicontext)
|
||||||
return err
|
return err
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch loads all resources into the content store and returns the image
|
// Fetch loads all resources into the content store and returns the image
|
||||||
func Fetch(ref string, cliContext *cli.Context) (containerd.Image, error) {
|
func Fetch(ctx context.Context, client *containerd.Client, ref string, cliContext *cli.Context) (containerd.Image, error) {
|
||||||
client, ctx, cancel, err := commands.NewClient(cliContext)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
resolver, err := commands.GetResolver(ctx, cliContext)
|
resolver, err := commands.GetResolver(ctx, cliContext)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -57,10 +57,20 @@ command. As part of this process, we do the following:
|
|||||||
if ref == "" {
|
if ref == "" {
|
||||||
return fmt.Errorf("please provide an image reference to pull")
|
return fmt.Errorf("please provide an image reference to pull")
|
||||||
}
|
}
|
||||||
ctx, cancel := commands.AppContext(context)
|
|
||||||
|
client, ctx, cancel, err := commands.NewClient(context)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
img, err := content.Fetch(ref, context)
|
ctx, done, err := client.WithLease(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer done(ctx)
|
||||||
|
|
||||||
|
img, err := content.Fetch(ctx, client, ref, context)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
117
rootfs/apply.go
117
rootfs/apply.go
@ -26,6 +26,7 @@ import (
|
|||||||
"github.com/containerd/containerd/diff"
|
"github.com/containerd/containerd/diff"
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
|
"github.com/containerd/containerd/mount"
|
||||||
"github.com/containerd/containerd/snapshots"
|
"github.com/containerd/containerd/snapshots"
|
||||||
"github.com/opencontainers/go-digest"
|
"github.com/opencontainers/go-digest"
|
||||||
"github.com/opencontainers/image-spec/identity"
|
"github.com/opencontainers/image-spec/identity"
|
||||||
@ -47,16 +48,27 @@ type Layer struct {
|
|||||||
// Layers are applied in order they are given, making the first layer the
|
// Layers are applied in order they are given, making the first layer the
|
||||||
// bottom-most layer in the layer chain.
|
// bottom-most layer in the layer chain.
|
||||||
func ApplyLayers(ctx context.Context, layers []Layer, sn snapshots.Snapshotter, a diff.Applier) (digest.Digest, error) {
|
func ApplyLayers(ctx context.Context, layers []Layer, sn snapshots.Snapshotter, a diff.Applier) (digest.Digest, error) {
|
||||||
var chain []digest.Digest
|
chain := make([]digest.Digest, len(layers))
|
||||||
for _, layer := range layers {
|
for i, layer := range layers {
|
||||||
if _, err := ApplyLayer(ctx, layer, chain, sn, a); err != nil {
|
chain[i] = layer.Diff.Digest
|
||||||
// TODO: possibly wait and retry if extraction of same chain id was in progress
|
}
|
||||||
return "", err
|
chainID := identity.ChainID(chain)
|
||||||
|
|
||||||
|
// Just stat top layer, remaining layers will have their existence checked
|
||||||
|
// on prepare. Calling prepare on upper layers first guarantees that upper
|
||||||
|
// layers are not removed while calling stat on lower layers
|
||||||
|
_, err := sn.Stat(ctx, chainID.String())
|
||||||
|
if err != nil {
|
||||||
|
if !errdefs.IsNotFound(err) {
|
||||||
|
return "", errors.Wrapf(err, "failed to stat snapshot %s", chainID)
|
||||||
}
|
}
|
||||||
|
|
||||||
chain = append(chain, layer.Diff.Digest)
|
if err := applyLayers(ctx, layers, chain, sn, a); err != nil && !errdefs.IsAlreadyExists(err) {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return identity.ChainID(chain), nil
|
|
||||||
|
return chainID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ApplyLayer applies a single layer on top of the given provided layer chain,
|
// ApplyLayer applies a single layer on top of the given provided layer chain,
|
||||||
@ -64,59 +76,90 @@ func ApplyLayers(ctx context.Context, layers []Layer, sn snapshots.Snapshotter,
|
|||||||
// is returned, if the layer already exists false is returned.
|
// is returned, if the layer already exists false is returned.
|
||||||
func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snapshots.Snapshotter, a diff.Applier, opts ...snapshots.Opt) (bool, error) {
|
func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snapshots.Snapshotter, a diff.Applier, opts ...snapshots.Opt) (bool, error) {
|
||||||
var (
|
var (
|
||||||
parent = identity.ChainID(chain)
|
chainID = identity.ChainID(append(chain, layer.Diff.Digest)).String()
|
||||||
chainID = identity.ChainID(append(chain, layer.Diff.Digest))
|
applied bool
|
||||||
|
)
|
||||||
|
if _, err := sn.Stat(ctx, chainID); err != nil {
|
||||||
|
if !errdefs.IsNotFound(err) {
|
||||||
|
return false, errors.Wrapf(err, "failed to stat snapshot %s", chainID)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := applyLayers(ctx, []Layer{layer}, append(chain, layer.Diff.Digest), sn, a, opts...); err != nil {
|
||||||
|
if !errdefs.IsAlreadyExists(err) {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
applied = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return applied, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func applyLayers(ctx context.Context, layers []Layer, chain []digest.Digest, sn snapshots.Snapshotter, a diff.Applier, opts ...snapshots.Opt) error {
|
||||||
|
var (
|
||||||
|
parent = identity.ChainID(chain[:len(chain)-1])
|
||||||
|
chainID = identity.ChainID(chain)
|
||||||
|
layer = layers[len(layers)-1]
|
||||||
diff ocispec.Descriptor
|
diff ocispec.Descriptor
|
||||||
|
key string
|
||||||
|
mounts []mount.Mount
|
||||||
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
_, err := sn.Stat(ctx, chainID.String())
|
for {
|
||||||
if err == nil {
|
key = fmt.Sprintf("extract-%s %s", uniquePart(), chainID)
|
||||||
log.G(ctx).Debugf("Extraction not needed, layer snapshot %s exists", chainID)
|
|
||||||
return false, nil
|
|
||||||
} else if !errdefs.IsNotFound(err) {
|
|
||||||
return false, errors.Wrapf(err, "failed to stat snapshot %s", chainID)
|
|
||||||
}
|
|
||||||
|
|
||||||
key := fmt.Sprintf("extract-%s %s", uniquePart(), chainID)
|
// Prepare snapshot with from parent, label as root
|
||||||
|
mounts, err = sn.Prepare(ctx, key, parent.String(), opts...)
|
||||||
|
if err != nil {
|
||||||
|
if errdefs.IsNotFound(err) && len(layers) > 1 {
|
||||||
|
if err := applyLayers(ctx, layers[:len(layers)-1], chain[:len(chain)-1], sn, a); err != nil {
|
||||||
|
if !errdefs.IsAlreadyExists(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Do no try applying layers again
|
||||||
|
layers = nil
|
||||||
|
continue
|
||||||
|
} else if errdefs.IsAlreadyExists(err) {
|
||||||
|
// Try a different key
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// Prepare snapshot with from parent, label as root
|
// Already exists should have the caller retry
|
||||||
mounts, err := sn.Prepare(ctx, key, parent.String(), opts...)
|
return errors.Wrapf(err, "failed to prepare extraction snapshot %q", key)
|
||||||
if err != nil {
|
|
||||||
//TODO: If is snapshot exists error, retry
|
}
|
||||||
return false, errors.Wrapf(err, "failed to prepare extraction snapshot %q", key)
|
break
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.G(ctx).WithError(err).WithField("key", key).Infof("Apply failure, attempting cleanup")
|
if !errdefs.IsAlreadyExists(err) {
|
||||||
|
log.G(ctx).WithError(err).WithField("key", key).Infof("apply failure, attempting cleanup")
|
||||||
|
}
|
||||||
|
|
||||||
if rerr := sn.Remove(ctx, key); rerr != nil {
|
if rerr := sn.Remove(ctx, key); rerr != nil {
|
||||||
log.G(ctx).WithError(rerr).Warnf("Extraction snapshot %q removal failed", key)
|
log.G(ctx).WithError(rerr).WithField("key", key).Warnf("extraction snapshot removal failed")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
diff, err = a.Apply(ctx, layer.Blob, mounts)
|
diff, err = a.Apply(ctx, layer.Blob, mounts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, errors.Wrapf(err, "failed to extract layer %s", layer.Diff.Digest)
|
err = errors.Wrapf(err, "failed to extract layer %s", layer.Diff.Digest)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
if diff.Digest != layer.Diff.Digest {
|
if diff.Digest != layer.Diff.Digest {
|
||||||
err = errors.Errorf("wrong diff id calculated on extraction %q", diff.Digest)
|
err = errors.Errorf("wrong diff id calculated on extraction %q", diff.Digest)
|
||||||
return false, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = sn.Commit(ctx, chainID.String(), key, opts...); err != nil {
|
if err = sn.Commit(ctx, chainID.String(), key, opts...); err != nil {
|
||||||
if !errdefs.IsAlreadyExists(err) {
|
err = errors.Wrapf(err, "failed to commit snapshot %s", key)
|
||||||
return false, errors.Wrapf(err, "failed to commit snapshot %s", key)
|
return err
|
||||||
}
|
|
||||||
|
|
||||||
// Destination already exists, cleanup key and return without error
|
|
||||||
err = nil
|
|
||||||
if err := sn.Remove(ctx, key); err != nil {
|
|
||||||
return false, errors.Wrapf(err, "failed to cleanup aborted apply %s", key)
|
|
||||||
}
|
|
||||||
return false, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func uniquePart() string {
|
func uniquePart() string {
|
||||||
|
Loading…
Reference in New Issue
Block a user