Finish image management.

Signed-off-by: Random-Liu <lantaol@google.com>
This commit is contained in:
Random-Liu
2017-05-18 16:45:40 -07:00
committed by Lantao Liu
parent 751f119cbc
commit b112418e7b
9 changed files with 501 additions and 245 deletions

View File

@@ -24,8 +24,8 @@ import (
containerdimages "github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/docker/distribution/reference"
"github.com/golang/glog"
imagedigest "github.com/opencontainers/go-digest"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
@@ -33,6 +33,44 @@ import (
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
)
// For image management:
// 1) We have an in-memory metadata index to:
// a. Maintain ImageID -> RepoTags, ImageID -> RepoDigset relationships; ImageID
// is the digest of image config, which conforms to oci image spec.
// b. Cache constant and useful information such as image chainID, config etc.
// c. An image will be added into the in-memory metadata only when it's successfully
// pulled and unpacked.
//
// 2) We use containerd image metadata store and content store:
// a. To resolve image reference (digest/tag) locally. During pulling image, we
// normalize the image reference provided by user, and put it into image metadata
// store with resolved descriptor. For the other operations, if image id is provided,
// we'll access the in-memory metadata index directly; if image reference is
// provided, we'll normalize it, resolve it in containerd image metadata store
// to get the image id.
// b. As the backup of in-memory metadata in 1). During startup, the in-memory
// metadata could be re-constructed from image metadata store + content store.
//
// Several problems with current approach:
// 1) An entry in containerd image metadata store doesn't mean a "READY" (successfully
// pulled and unpacked) image. E.g. during pulling, the client gets killed. In that case,
// if we saw an image without snapshots or with in-complete contents during startup,
// should we re-pull the image? Or should we remove the entry?
//
// 2) Containerd suggests user to add entry before pulling the image. However if
// an error occurrs during the pulling, should we remove the entry from metadata
// store? Or should we leave it there until next startup (resource leakage)?
//
// 3) CRI-containerd only exposes "READY" (successfully pulled and unpacked) images
// to the user, which are maintained in the in-memory metadata index. However, it's
// still possible that someone else removes the content or snapshot by-pass cri-containerd,
// how do we detect that and update the in-memory metadata correspondingly? Always
// check whether corresponding snapshot is ready when reporting image status?
//
// 4) Is the content important if we cached necessary information in-memory
// after we pull the image? How to manage the disk usage of contents? If some
// contents are missing but snapshots are ready, is the image still "READY"?
// PullImage pulls an image with authentication config.
// TODO(mikebrow): add authentication
// TODO(mikebrow): harden api (including figuring out at what layer we should be blocking on duplicate requests.)
@@ -40,146 +78,110 @@ func (c *criContainerdService) PullImage(ctx context.Context, r *runtime.PullIma
glog.V(2).Infof("PullImage %q with auth config %+v", r.GetImage().GetImage(), r.GetAuth())
defer func() {
if retErr == nil {
glog.V(2).Infof("PullImage returns image reference %q", retRes.GetImageRef())
glog.V(2).Infof("PullImage %q returns image reference %q",
r.GetImage().GetImage(), retRes.GetImageRef())
}
}()
var (
size int64
desc imagespec.Descriptor
)
image, err := normalizeImageRef(r.GetImage().GetImage())
namedRef, err := normalizeImageRef(r.GetImage().GetImage())
if err != nil {
return nil, fmt.Errorf("failed to parse image reference %q: %v", r.GetImage().GetImage(), err)
}
// TODO(random-liu): [P0] Avoid concurrent pulling/removing on the same image reference.
image := namedRef.String()
if r.GetImage().GetImage() != image {
glog.V(4).Info("PullImage using normalized image ref: %q", image)
glog.V(4).Infof("PullImage using normalized image ref: %q", image)
}
if desc, size, err = c.pullImage(ctx, image); err != nil {
// TODO(random-liu): [P1] Schema 1 image is not supported in containerd now, we need to support
// it for backward compatiblity.
cfgDigest, manifestDigest, err := c.pullImage(ctx, image)
if err != nil {
return nil, fmt.Errorf("failed to pull image %q: %v", image, err)
}
digest := desc.Digest.String() // TODO(mikebrow): add truncIndex for image id
// Use config digest as imageID to conform to oci image spec.
// TODO(mikebrow): add truncIndex for image id
imageID := cfgDigest.String()
glog.V(4).Infof("Pulled image %q with image id %q, manifest digest %q", image, imageID, manifestDigest)
// TODO(mikebrow): pass a metadata struct to pullimage and fill in the tags/digests
// store the image metadata
// TODO(mikebrow): consider what to do if pullimage was called and metadata already exists (error? udpate?)
meta := &metadata.ImageMetadata{
ID: digest,
RepoTags: []string{image},
RepoDigests: []string{digest},
Size: uint64(size), // TODO(mikebrow): compressed or uncompressed size? using compressed
repoDigest, repoTag := getRepoDigestAndTag(namedRef, manifestDigest)
_, err = c.imageMetadataStore.Get(imageID)
if err != nil && !metadata.IsNotExistError(err) {
return nil, fmt.Errorf("failed to get image %q metadata: %v", imageID, err)
}
if err = c.imageMetadataStore.Create(*meta); err != nil {
return &runtime.PullImageResponse{ImageRef: digest},
fmt.Errorf("pulled image `%q` but failed to store metadata for digest: %s err: %v", image, digest, err)
// There is a known race here because the image metadata could be created after `Get`.
// TODO(random-liu): [P1] Do not use metadata store. Use simple in-memory data structure to
// maintain the id -> information index. And use the container image store as backup and
// recover in-memory state during startup.
if err == nil {
// Update existing image metadata.
if err := c.imageMetadataStore.Update(imageID, func(m metadata.ImageMetadata) (metadata.ImageMetadata, error) {
updateImageMetadata(&m, repoTag, repoDigest)
return m, nil
}); err != nil {
return nil, fmt.Errorf("failed to update image %q metadata: %v", imageID, err)
}
return &runtime.PullImageResponse{ImageRef: imageID}, err
}
// Return the image digest
return &runtime.PullImageResponse{ImageRef: digest}, err
}
// normalizeImageRef normalizes the image reference following the docker convention. This is added
// mainly for backward compatibility.
func normalizeImageRef(ref string) (string, error) {
named, err := reference.ParseNormalizedNamed(ref)
// Get image information.
chainID, size, config, err := c.getImageInfo(ctx, image)
if err != nil {
return "", err
return nil, fmt.Errorf("failed to get image %q information: %v", image, err)
}
named = reference.TagNameOnly(named)
return named.String(), nil
// NOTE(random-liu): the actual state in containerd is the source of truth, even we maintain
// in-memory image metadata, it's only for in-memory indexing. The image could be removed
// by someone else anytime, before/during/after we create the metadata. We should always
// check the actual state in containerd before using the image or returning status of the
// image.
// Create corresponding image metadata.
newMeta := metadata.ImageMetadata{
ID: imageID,
ChainID: chainID.String(),
Size: size,
Config: config,
}
// Add the image reference used into repo tags. Note if the image is pulled with
// repo digest, it will also be added in to repo tags, which is fine.
updateImageMetadata(&newMeta, repoTag, repoDigest)
if err = c.imageMetadataStore.Create(newMeta); err != nil {
return nil, fmt.Errorf("failed to create image %q metadata: %v", imageID, err)
}
return &runtime.PullImageResponse{ImageRef: imageID}, err
}
// imageReferenceResolver attempts to resolve the image reference into a name
// and manifest via the containerd library call..
//
// The argument `ref` should be a scheme-less URI representing the remote.
// Structurally, it has a host and path. The "host" can be used to directly
// reference a specific host or be matched against a specific handler.
//
// The returned name should be used to identify the referenced entity.
// Dependending on the remote namespace, this may be immutable or mutable.
// While the name may differ from ref, it should itself be a valid ref.
//
// If the resolution fails, an error will be returned.
// TODO(mikebrow) add config.json based image.Config as an additional return value from this resolver fn()
func (c *criContainerdService) imageReferenceResolver(ctx context.Context, ref string) (resolvedImageName string, manifest imagespec.Manifest, compressedSize uint64, err error) {
var (
size int64
desc imagespec.Descriptor
fetcher remotes.Fetcher
)
// Resolve the image name; place that in the image store; then dispatch
// a handler to fetch the object for the manifest
// pullImage pulls image and returns image id (config digest) and manifest digest.
// The ref should be normalized image reference.
// TODO(random-liu): [P0] Wait for all downloadings to be done before return.
func (c *criContainerdService) pullImage(ctx context.Context, ref string) (
imagedigest.Digest, imagedigest.Digest, error) {
// Resolve the image reference to get descriptor and fetcher.
resolver := docker.NewResolver()
resolvedImageName, desc, fetcher, err = resolver.Resolve(ctx, ref)
_, desc, fetcher, err := resolver.Resolve(ctx, ref)
if err != nil {
return resolvedImageName, manifest, compressedSize, fmt.Errorf("failed to resolve ref %q: err: %v", ref, err)
return "", "", fmt.Errorf("failed to resolve ref %q: %v", ref, err)
}
// Currently, the resolved image name is the same with ref in docker resolver,
// but they may be different in the future.
// TODO(random-liu): Always resolve image reference and use resolved image name in
// the system.
err = c.imageStoreService.Put(ctx, resolvedImageName, desc)
if err != nil {
return resolvedImageName, manifest, compressedSize, fmt.Errorf("failed to put %q: desc: %v err: %v", resolvedImageName, desc, err)
}
err = containerdimages.Dispatch(
ctx,
remotes.FetchHandler(c.contentIngester, fetcher),
desc)
if err != nil {
return resolvedImageName, manifest, compressedSize, fmt.Errorf("failed to fetch %q: desc: %v err: %v", resolvedImageName, desc, err)
}
image, err := c.imageStoreService.Get(ctx, resolvedImageName)
if err != nil {
return resolvedImageName, manifest, compressedSize,
fmt.Errorf("get failed for image:%q err: %v", resolvedImageName, err)
}
p, err := content.ReadBlob(ctx, c.contentProvider, image.Target.Digest)
if err != nil {
return resolvedImageName, manifest, compressedSize,
fmt.Errorf("readblob failed for digest:%q err: %v", image.Target.Digest, err)
}
err = json.Unmarshal(p, &manifest)
if err != nil {
return resolvedImageName, manifest, compressedSize,
fmt.Errorf("unmarshal blob to manifest failed for digest:%q err: %v", image.Target.Digest, err)
}
size, err = image.Size(ctx, c.contentProvider)
if err != nil {
return resolvedImageName, manifest, compressedSize,
fmt.Errorf("size failed for image:%q %v", image.Target.Digest, err)
}
compressedSize = uint64(size)
return resolvedImageName, manifest, compressedSize, nil
}
func (c *criContainerdService) pullImage(ctx context.Context, ref string) (imagespec.Descriptor, int64, error) {
var (
err error
size int64
desc imagespec.Descriptor
resolvedImageName string
fetcher remotes.Fetcher
)
// Resolve the image name; place that in the image store; then dispatch
// a handler for a sequence of handlers which: 1) fetch the object using a
// FetchHandler; and 3) recurse through any sub-layers via a ChildrenHandler
resolver := docker.NewResolver()
resolvedImageName, desc, fetcher, err = resolver.Resolve(ctx, ref)
if err != nil {
return desc, size, fmt.Errorf("failed to resolve ref %q: err: %v", ref, err)
}
err = c.imageStoreService.Put(ctx, resolvedImageName, desc)
if err != nil {
return desc, size, fmt.Errorf("failed to put %q: desc: %v err: %v", resolvedImageName, desc, err)
// Put the image information into containerd image store.
// In the future, containerd will rely on the information in the image store to perform image
// garbage collection.
// For now, we simply use it to store and retrieve information required for pulling an image.
if putErr := c.imageStoreService.Put(ctx, ref, desc); putErr != nil {
return "", "", fmt.Errorf("failed to put image %q desc %v into containerd image store: %v",
ref, desc, putErr)
}
// TODO(random-liu): What if following operations fail? Do we need to do cleanup?
// Fetch all image resources into content store.
// Dispatch a handler which will run a sequence of handlers to:
// 1) fetch the object using a FetchHandler;
// 2) recurse through any sub-layers via a ChildrenHandler.
err = containerdimages.Dispatch(
ctx,
containerdimages.Handlers(
@@ -187,34 +189,60 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) (image
containerdimages.ChildrenHandler(c.contentProvider)),
desc)
if err != nil {
return desc, size, fmt.Errorf("failed to fetch %q: desc: %v err: %v", resolvedImageName, desc, err)
return "", "", fmt.Errorf("failed to fetch image %q desc %+v: %v", ref, desc, err)
}
image, err := c.imageStoreService.Get(ctx, resolvedImageName)
image, err := c.imageStoreService.Get(ctx, ref)
if err != nil {
return desc, size,
fmt.Errorf("get failed for image:%q err: %v", resolvedImageName, err)
return "", "", fmt.Errorf("failed to get image %q from containerd image store: %v", ref, err)
}
p, err := content.ReadBlob(ctx, c.contentProvider, image.Target.Digest)
// Read the image manifest from content store.
manifestDigest := image.Target.Digest
p, err := content.ReadBlob(ctx, c.contentProvider, manifestDigest)
if err != nil {
return desc, size,
fmt.Errorf("readblob failed for digest:%q err: %v", image.Target.Digest, err)
return "", "", fmt.Errorf("readblob failed for manifest digest %q: %v", manifestDigest, err)
}
var manifest imagespec.Manifest
err = json.Unmarshal(p, &manifest)
if err != nil {
return desc, size,
fmt.Errorf("unmarshal blob to manifest failed for digest:%q %v", image.Target.Digest, err)
if err := json.Unmarshal(p, &manifest); err != nil {
return "", "", fmt.Errorf("unmarshal blob to manifest failed for manifest digest %q: %v",
manifestDigest, err)
}
_, err = c.rootfsUnpacker.Unpack(ctx, manifest.Layers) // ignoring returned chainID for now
if err != nil {
return desc, size,
fmt.Errorf("unpack failed for manifest layers:%v %v", manifest.Layers, err)
// Unpack the image layers into snapshots.
if _, err = c.rootfsUnpacker.Unpack(ctx, manifest.Layers); err != nil {
return "", "", fmt.Errorf("unpack failed for manifest layers %+v: %v", manifest.Layers, err)
}
size, err = image.Size(ctx, c.contentProvider)
// TODO(random-liu): Considering how to deal with the disk usage of content.
configDesc, err := image.Config(ctx, c.contentProvider)
if err != nil {
return desc, size,
fmt.Errorf("size failed for image:%q %v", image.Target.Digest, err)
return "", "", fmt.Errorf("failed to get config descriptor for image %q: %v", ref, err)
}
return configDesc.Digest, manifestDigest, nil
}
// insertToStringSlice is a helper function to insert a string into the string slice
// if the string is not in the slice yet.
func insertToStringSlice(ss []string, s string) []string {
found := false
for _, str := range ss {
if s == str {
found = true
break
}
}
if !found {
ss = append(ss, s)
}
return ss
}
// updateImageMetadata updates existing image meta with new repoTag and repoDigest.
func updateImageMetadata(meta *metadata.ImageMetadata, repoTag, repoDigest string) {
if repoTag != "" {
meta.RepoTags = insertToStringSlice(meta.RepoTags, repoTag)
}
if repoDigest != "" {
meta.RepoDigests = insertToStringSlice(meta.RepoDigests, repoDigest)
}
return desc, size, nil
}