Merge pull request #80 from Random-Liu/fix-image-pull
Register all possible repo tags and repo digests.
This commit is contained in:
commit
4b53d843bc
@ -330,12 +330,17 @@ func (c *criContainerdService) getImageInfo(ctx context.Context, ref string) (
|
||||
}
|
||||
|
||||
// getRepoDigestAngTag returns image repoDigest and repoTag of the named image reference.
|
||||
func getRepoDigestAndTag(namedRef reference.Named, digest imagedigest.Digest) (string, string) {
|
||||
var repoTag string
|
||||
func getRepoDigestAndTag(namedRef reference.Named, digest imagedigest.Digest, schema1 bool) (string, string) {
|
||||
var repoTag, repoDigest string
|
||||
if _, ok := namedRef.(reference.NamedTagged); ok {
|
||||
repoTag = namedRef.String()
|
||||
}
|
||||
repoDigest := namedRef.Name() + "@" + digest.String()
|
||||
if _, ok := namedRef.(reference.Canonical); ok {
|
||||
repoDigest = namedRef.String()
|
||||
} else if !schema1 {
|
||||
// digest is not actual repo digest for schema1 image.
|
||||
repoDigest = namedRef.Name() + "@" + digest.String()
|
||||
}
|
||||
return repoDigest, repoTag
|
||||
}
|
||||
|
||||
|
@ -265,6 +265,7 @@ func TestGetRepoDigestAndTag(t *testing.T) {
|
||||
digest := imagedigest.Digest("sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582")
|
||||
for desc, test := range map[string]struct {
|
||||
ref string
|
||||
schema1 bool
|
||||
expectedRepoDigest string
|
||||
expectedRepoTag string
|
||||
}{
|
||||
@ -277,11 +278,23 @@ func TestGetRepoDigestAndTag(t *testing.T) {
|
||||
expectedRepoDigest: "gcr.io/library/busybox@" + digest.String(),
|
||||
expectedRepoTag: "gcr.io/library/busybox:latest",
|
||||
},
|
||||
"repo digest should be empty if original ref is schema1 and has no digest": {
|
||||
ref: "gcr.io/library/busybox:latest",
|
||||
schema1: true,
|
||||
expectedRepoDigest: "",
|
||||
expectedRepoTag: "gcr.io/library/busybox:latest",
|
||||
},
|
||||
"repo digest should not be empty if orignal ref is schema1 but has digest": {
|
||||
ref: "gcr.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59594",
|
||||
schema1: true,
|
||||
expectedRepoDigest: "gcr.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59594",
|
||||
expectedRepoTag: "",
|
||||
},
|
||||
} {
|
||||
t.Logf("TestCase %q", desc)
|
||||
named, err := normalizeImageRef(test.ref)
|
||||
assert.NoError(t, err)
|
||||
repoDigest, repoTag := getRepoDigestAndTag(named, digest)
|
||||
repoDigest, repoTag := getRepoDigestAndTag(named, digest, test.schema1)
|
||||
assert.Equal(t, test.expectedRepoDigest, repoDigest)
|
||||
assert.Equal(t, test.expectedRepoTag, repoTag)
|
||||
}
|
||||
|
@ -31,7 +31,6 @@ import (
|
||||
"github.com/containerd/containerd/remotes/docker/schema1"
|
||||
containerdrootfs "github.com/containerd/containerd/rootfs"
|
||||
"github.com/golang/glog"
|
||||
imagedigest "github.com/opencontainers/go-digest"
|
||||
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"golang.org/x/net/context"
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
|
||||
@ -87,27 +86,18 @@ func (c *criContainerdService) PullImage(ctx context.Context, r *runtime.PullIma
|
||||
r.GetImage().GetImage(), retRes.GetImageRef())
|
||||
}
|
||||
}()
|
||||
|
||||
namedRef, err := normalizeImageRef(r.GetImage().GetImage())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse image reference %q: %v", r.GetImage().GetImage(), err)
|
||||
}
|
||||
// TODO(random-liu): [P0] Avoid concurrent pulling/removing on the same image reference.
|
||||
image := namedRef.String()
|
||||
if r.GetImage().GetImage() != image {
|
||||
glog.V(4).Infof("PullImage using normalized image ref: %q", image)
|
||||
}
|
||||
image := r.GetImage().GetImage()
|
||||
|
||||
// TODO(random-liu): [P1] Schema 1 image is not supported in containerd now, we need to support
|
||||
// it for backward compatiblity.
|
||||
// TODO(mikebrow): add truncIndex for image id
|
||||
imageID, manifestDigest, err := c.pullImage(ctx, image)
|
||||
imageID, repoTag, repoDigest, err := c.pullImage(ctx, image)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to pull image %q: %v", image, err)
|
||||
}
|
||||
glog.V(4).Infof("Pulled image %q with image id %q, manifest digest %q", image, imageID, manifestDigest)
|
||||
glog.V(4).Infof("Pulled image %q with image id %q, repo tag %q, repo digest %q", image, imageID,
|
||||
repoTag, repoDigest)
|
||||
|
||||
repoDigest, repoTag := getRepoDigestAndTag(namedRef, manifestDigest)
|
||||
_, err = c.imageMetadataStore.Get(imageID)
|
||||
if err != nil && !metadata.IsNotExistError(err) {
|
||||
return nil, fmt.Errorf("failed to get image %q metadata: %v", imageID, err)
|
||||
@ -149,7 +139,7 @@ func (c *criContainerdService) PullImage(ctx context.Context, r *runtime.PullIma
|
||||
// Add the image reference used into repo tags. Note if the image is pulled with
|
||||
// repo digest, it will also be added in to repo tags, which is fine.
|
||||
updateImageMetadata(&newMeta, repoTag, repoDigest)
|
||||
if err = c.imageMetadataStore.Create(newMeta); err != nil {
|
||||
if err := c.imageMetadataStore.Create(newMeta); err != nil {
|
||||
return nil, fmt.Errorf("failed to create image %q metadata: %v", imageID, err)
|
||||
}
|
||||
return &runtime.PullImageResponse{ImageRef: imageID}, err
|
||||
@ -183,11 +173,20 @@ func (r *resourceSet) all() map[string]struct{} {
|
||||
return resources
|
||||
}
|
||||
|
||||
// pullImage pulls image and returns image id (config digest) and manifest digest.
|
||||
// The ref should be normalized image reference.
|
||||
func (c *criContainerdService) pullImage(ctx context.Context, ref string) (
|
||||
// pullImage pulls image and returns image id (config digest), repoTag and repoDigest.
|
||||
func (c *criContainerdService) pullImage(ctx context.Context, rawRef string) (
|
||||
// TODO(random-liu): Replace with client.Pull.
|
||||
string, imagedigest.Digest, error) {
|
||||
string, string, string, error) {
|
||||
namedRef, err := normalizeImageRef(rawRef)
|
||||
if err != nil {
|
||||
return "", "", "", fmt.Errorf("failed to parse image reference %q: %v", rawRef, err)
|
||||
}
|
||||
// TODO(random-liu): [P0] Avoid concurrent pulling/removing on the same image reference.
|
||||
ref := namedRef.String()
|
||||
if ref != rawRef {
|
||||
glog.V(4).Infof("PullImage using normalized image ref: %q", ref)
|
||||
}
|
||||
|
||||
// Resolve the image reference to get descriptor and fetcher.
|
||||
resolver := docker.NewResolver(docker.ResolverOptions{
|
||||
// TODO(random-liu): Add authentication by setting credentials.
|
||||
@ -197,11 +196,11 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) (
|
||||
})
|
||||
_, desc, err := resolver.Resolve(ctx, ref)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("failed to resolve ref %q: %v", ref, err)
|
||||
return "", "", "", fmt.Errorf("failed to resolve ref %q: %v", ref, err)
|
||||
}
|
||||
fetcher, err := resolver.Fetcher(ctx, ref)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("failed to get fetcher for ref %q: %v", ref, err)
|
||||
return "", "", "", fmt.Errorf("failed to get fetcher for ref %q: %v", ref, err)
|
||||
}
|
||||
// Currently, the resolved image name is the same with ref in docker resolver,
|
||||
// but they may be different in the future.
|
||||
@ -238,7 +237,7 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) (
|
||||
containerdimages.ChildrenHandler(c.contentStoreService),
|
||||
)
|
||||
}
|
||||
if err = containerdimages.Dispatch(ctx, handler, desc); err != nil {
|
||||
if err := containerdimages.Dispatch(ctx, handler, desc); err != nil {
|
||||
// Dispatch returns error when requested resources are locked.
|
||||
// In that case, we should start waiting and checking the pulling
|
||||
// progress.
|
||||
@ -247,13 +246,13 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) (
|
||||
}
|
||||
// Wait for the image pulling to finish
|
||||
if err := c.waitForResourcesDownloading(ctx, resources.all()); err != nil {
|
||||
return "", "", fmt.Errorf("failed to wait for image %q downloading: %v", ref, err)
|
||||
return "", "", "", fmt.Errorf("failed to wait for image %q downloading: %v", ref, err)
|
||||
}
|
||||
glog.V(4).Infof("Finished downloading resources for image %q", ref)
|
||||
if schema1Converter != nil {
|
||||
desc, err = schema1Converter.Convert(ctx)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("failed to convert schema 1 image %q: %v", ref, err)
|
||||
return "", "", "", fmt.Errorf("failed to convert schema 1 image %q: %v", ref, err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -265,34 +264,43 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) (
|
||||
// 2) We need desc returned by schema1 converter.
|
||||
// So just put the image metadata after downloading now.
|
||||
// TODO(random-liu): Fix the potential garbage collection race.
|
||||
if err = c.imageStoreService.Put(ctx, ref, desc); err != nil {
|
||||
return "", "", fmt.Errorf("failed to put image %q desc %v into containerd image store: %v",
|
||||
ref, desc, err)
|
||||
repoDigest, repoTag := getRepoDigestAndTag(namedRef, desc.Digest, schema1Converter != nil)
|
||||
if ref != repoTag && ref != repoDigest {
|
||||
return "", "", "", fmt.Errorf("unexpected repo tag %q and repo digest %q for %q", repoTag, repoDigest, ref)
|
||||
}
|
||||
for _, r := range []string{repoTag, repoDigest} {
|
||||
if r == "" {
|
||||
continue
|
||||
}
|
||||
if err := c.imageStoreService.Put(ctx, r, desc); err != nil {
|
||||
return "", "", "", fmt.Errorf("failed to put image reference %q desc %v into containerd image store: %v",
|
||||
r, desc, err)
|
||||
}
|
||||
}
|
||||
// Do not cleanup if following operations fail so as to make resumable download possible.
|
||||
// TODO(random-liu): Replace with image.Unpack.
|
||||
// Unpack the image layers into snapshots.
|
||||
image, err := c.imageStoreService.Get(ctx, ref)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("failed to get image %q from containerd image store: %v", ref, err)
|
||||
return "", "", "", fmt.Errorf("failed to get image %q from containerd image store: %v", ref, err)
|
||||
}
|
||||
// Read the image manifest from content store.
|
||||
manifestDigest := image.Target.Digest
|
||||
p, err := content.ReadBlob(ctx, c.contentStoreService, manifestDigest)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("readblob failed for manifest digest %q: %v", manifestDigest, err)
|
||||
return "", "", "", fmt.Errorf("readblob failed for manifest digest %q: %v", manifestDigest, err)
|
||||
}
|
||||
var manifest imagespec.Manifest
|
||||
if err := json.Unmarshal(p, &manifest); err != nil {
|
||||
return "", "", fmt.Errorf("unmarshal blob to manifest failed for manifest digest %q: %v",
|
||||
return "", "", "", fmt.Errorf("unmarshal blob to manifest failed for manifest digest %q: %v",
|
||||
manifestDigest, err)
|
||||
}
|
||||
diffIDs, err := image.RootFS(ctx, c.contentStoreService)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
return "", "", "", fmt.Errorf("failed to get image rootfs: %v", err)
|
||||
}
|
||||
if len(diffIDs) != len(manifest.Layers) {
|
||||
return "", "", fmt.Errorf("mismatched image rootfs and manifest layers")
|
||||
return "", "", "", fmt.Errorf("mismatched image rootfs and manifest layers")
|
||||
}
|
||||
layers := make([]containerdrootfs.Layer, len(diffIDs))
|
||||
for i := range diffIDs {
|
||||
@ -303,24 +311,24 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) (
|
||||
}
|
||||
layers[i].Blob = manifest.Layers[i]
|
||||
}
|
||||
if _, err = containerdrootfs.ApplyLayers(ctx, layers, c.snapshotService, c.diffService); err != nil {
|
||||
return "", "", fmt.Errorf("failed to apply layers %+v: %v", layers, err)
|
||||
if _, err := containerdrootfs.ApplyLayers(ctx, layers, c.snapshotService, c.diffService); err != nil {
|
||||
return "", "", "", fmt.Errorf("failed to apply layers %+v: %v", layers, err)
|
||||
}
|
||||
|
||||
// TODO(random-liu): Considering how to deal with the disk usage of content.
|
||||
|
||||
configDesc, err := image.Config(ctx, c.contentStoreService)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("failed to get config descriptor for image %q: %v", ref, err)
|
||||
return "", "", "", fmt.Errorf("failed to get config descriptor for image %q: %v", ref, err)
|
||||
}
|
||||
// Use config digest as imageID to conform to oci image spec, and also add image id as
|
||||
// image reference.
|
||||
imageID := configDesc.Digest.String()
|
||||
if err = c.imageStoreService.Put(ctx, imageID, desc); err != nil {
|
||||
return "", "", fmt.Errorf("failed to put image id %q into containerd image store: %v",
|
||||
if err := c.imageStoreService.Put(ctx, imageID, desc); err != nil {
|
||||
return "", "", "", fmt.Errorf("failed to put image id %q into containerd image store: %v",
|
||||
imageID, err)
|
||||
}
|
||||
return imageID, manifestDigest, nil
|
||||
return imageID, repoTag, repoDigest, nil
|
||||
}
|
||||
|
||||
// waitDownloadingPollInterval is the interval to check resource downloading progress.
|
||||
|
Loading…
Reference in New Issue
Block a user