Merge pull request #41 from Random-Liu/finish-image-management

Finish image management.
This commit is contained in:
Lantao Liu 2017-05-26 17:04:41 -07:00 committed by GitHub
commit bdc443a77c
12 changed files with 803 additions and 223 deletions

View File

@ -19,6 +19,8 @@ package metadata
import ( import (
"encoding/json" "encoding/json"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store" "github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store"
) )
@ -41,14 +43,18 @@ type versionedImageMetadata struct {
// ImageMetadata is the unversioned image metadata. // ImageMetadata is the unversioned image metadata.
type ImageMetadata struct { type ImageMetadata struct {
// Id of the image. Normally the Digest // Id of the image. Normally the digest of image config.
ID string `json:"id,omitempty"` ID string `json:"id,omitempty"`
// ChainID is the chainID of the image.
ChainID string `json:"chain_id,omitempty"`
// Other names by which this image is known. // Other names by which this image is known.
RepoTags []string `json:"repo_tags,omitempty"` RepoTags []string `json:"repo_tags,omitempty"`
// Digests by which this image is known. // Digests by which this image is known.
RepoDigests []string `json:"repo_digests,omitempty"` RepoDigests []string `json:"repo_digests,omitempty"`
// Size of the image in bytes. Must be > 0. // Size is the compressed size of the image.
Size uint64 `json:"size,omitempty"` Size int64 `json:"size,omitempty"`
// Config is the oci image config of the image.
Config *imagespec.ImageConfig `json:"config,omitempty"`
} }
// ImageMetadataUpdateFunc is the function used to update ImageMetadata. // ImageMetadataUpdateFunc is the function used to update ImageMetadata.

View File

@ -19,6 +19,7 @@ package metadata
import ( import (
"testing" "testing"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
assertlib "github.com/stretchr/testify/assert" assertlib "github.com/stretchr/testify/assert"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store" "github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store"
@ -27,16 +28,28 @@ import (
func TestImageMetadataStore(t *testing.T) { func TestImageMetadataStore(t *testing.T) {
imageMetadataMap := map[string]*ImageMetadata{ imageMetadataMap := map[string]*ImageMetadata{
"1": { "1": {
ID: "1", // TODO(mikebrow): fill ID: "1",
ChainID: "test-chain-id-1",
RepoTags: []string{"tag-1"},
RepoDigests: []string{"digest-1"},
Size: 10, Size: 10,
Config: &imagespec.ImageConfig{},
}, },
"2": { "2": {
ID: "2", ID: "2",
ChainID: "test-chain-id-2",
RepoTags: []string{"tag-2"},
RepoDigests: []string{"digest-2"},
Size: 20, Size: 20,
Config: &imagespec.ImageConfig{},
}, },
"3": { "3": {
ID: "3", ID: "3",
RepoTags: []string{"tag-3"},
RepoDigests: []string{"digest-3"},
ChainID: "test-chain-id-3",
Size: 30, Size: 30,
Config: &imagespec.ImageConfig{},
}, },
} }
assert := assertlib.New(t) assert := assertlib.New(t)
@ -62,7 +75,7 @@ func TestImageMetadataStore(t *testing.T) {
t.Logf("should be able to update image metadata") t.Logf("should be able to update image metadata")
testID := "2" testID := "2"
newSize := uint64(200) newSize := int64(200)
expectMeta := *imageMetadataMap[testID] expectMeta := *imageMetadataMap[testID]
expectMeta.Size = newSize expectMeta.Size = newSize
err = s.Update(testID, func(o ImageMetadata) (ImageMetadata, error) { err = s.Update(testID, func(o ImageMetadata) (ImageMetadata, error) {

View File

@ -17,18 +17,25 @@ limitations under the License.
package server package server
import ( import (
"encoding/json"
"fmt" "fmt"
"io" "io"
"path/filepath" "path/filepath"
"strconv"
"strings" "strings"
"syscall" "syscall"
"github.com/docker/distribution/reference"
"github.com/docker/docker/pkg/stringid" "github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/pkg/truncindex" "github.com/docker/docker/pkg/truncindex"
imagedigest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/images"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata" "github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
@ -216,3 +223,130 @@ func (c *criContainerdService) getSandbox(id string) (*metadata.SandboxMetadata,
func criContainerStateToString(state runtime.ContainerState) string { func criContainerStateToString(state runtime.ContainerState) string {
return runtime.ContainerState_name[int32(state)] return runtime.ContainerState_name[int32(state)]
} }
// normalizeImageRef normalizes the image reference following the docker convention. This is added
// mainly for backward compatibility.
// The reference returned can only be either tagged or digested. For reference contains both tag
// and digest, the function returns digested reference, e.g. docker.io/library/busybox:latest@
// sha256:7cc4b5aefd1d0cadf8d97d4350462ba51c694ebca145b08d7d41b41acc8db5aa will be returned as
// docker.io/library/busybox@sha256:7cc4b5aefd1d0cadf8d97d4350462ba51c694ebca145b08d7d41b41acc8db5aa.
func normalizeImageRef(ref string) (reference.Named, error) {
named, err := reference.ParseNormalizedNamed(ref)
if err != nil {
return nil, err
}
if _, ok := named.(reference.NamedTagged); ok {
if canonical, ok := named.(reference.Canonical); ok {
// The reference is both tagged and digested, only
// return digested.
newNamed, err := reference.WithName(canonical.Name())
if err != nil {
return nil, err
}
newCanonical, err := reference.WithDigest(newNamed, canonical.Digest())
if err != nil {
return nil, err
}
return newCanonical, nil
}
}
return reference.TagNameOnly(named), nil
}
// getImageInfo returns image chainID, compressed size and oci config. Note that getImageInfo
// assumes that the image has been pulled or it will return an error.
func (c *criContainerdService) getImageInfo(ctx context.Context, ref string) (
imagedigest.Digest, int64, *imagespec.ImageConfig, error) {
normalized, err := normalizeImageRef(ref)
if err != nil {
return "", 0, nil, fmt.Errorf("failed to normalize image reference %q: %v", ref, err)
}
normalizedRef := normalized.String()
image, err := c.imageStoreService.Get(ctx, normalizedRef)
if err != nil {
return "", 0, nil, fmt.Errorf("failed to get image %q from containerd image store: %v",
normalizedRef, err)
}
// Get image config
desc, err := image.Config(ctx, c.contentProvider)
if err != nil {
return "", 0, nil, fmt.Errorf("failed to get image config descriptor: %v", err)
}
rc, err := c.contentProvider.Reader(ctx, desc.Digest)
if err != nil {
return "", 0, nil, fmt.Errorf("failed to get image config reader: %v", err)
}
defer rc.Close()
var imageConfig imagespec.Image
if err = json.NewDecoder(rc).Decode(&imageConfig); err != nil {
return "", 0, nil, fmt.Errorf("failed to decode image config: %v", err)
}
// Get image chainID
diffIDs, err := image.RootFS(ctx, c.contentProvider)
if err != nil {
return "", 0, nil, fmt.Errorf("failed to get image diff ids: %v", err)
}
chainID := identity.ChainID(diffIDs)
// Get image size
size, err := image.Size(ctx, c.contentProvider)
if err != nil {
return "", 0, nil, fmt.Errorf("failed to get image size: %v", err)
}
return chainID, size, &imageConfig.Config, nil
}
// getRepoDigestAngTag returns image repoDigest and repoTag of the named image reference.
func getRepoDigestAndTag(namedRef reference.Named, digest imagedigest.Digest) (string, string) {
var repoTag string
if _, ok := namedRef.(reference.NamedTagged); ok {
repoTag = namedRef.String()
}
repoDigest := namedRef.Name() + "@" + digest.String()
return repoDigest, repoTag
}
// localResolve resolves image reference to image id locally. It returns empty string
// without error if the reference doesn't exist.
func (c *criContainerdService) localResolve(ctx context.Context, ref string) (string, error) {
_, err := imagedigest.Parse(ref)
if err == nil {
return ref, nil
}
// ref is not image id, try to resolve it locally.
normalized, err := normalizeImageRef(ref)
if err != nil {
return "", fmt.Errorf("invalid image reference %q: %v", ref, err)
}
image, err := c.imageStoreService.Get(ctx, normalized.String())
if err != nil {
if images.IsNotFound(err) {
return "", nil
}
return "", fmt.Errorf("an error occurred when getting image %q from containerd image store: %v",
normalized.String(), err)
}
desc, err := image.Config(ctx, c.contentProvider)
if err != nil {
return "", fmt.Errorf("failed to get image config descriptor: %v", err)
}
return desc.Digest.String(), nil
}
// getUserFromImage gets uid or user name of the image user.
// If user is numeric, it will be treated as uid; or else, it is treated as user name.
func getUserFromImage(user string) (*int64, string) {
// return both empty if user is not specified in the image.
if user == "" {
return nil, ""
}
// split instances where the id may contain user:group
user = strings.Split(user, ":")[0]
// user could be either uid or user name. Try to interpret as numeric uid.
uid, err := strconv.ParseInt(user, 10, 64)
if err != nil {
// If user is non numeric, assume it's user name.
return nil, user
}
// If user is a numeric uid.
return &uid, ""
}

View File

@ -23,6 +23,8 @@ import (
"syscall" "syscall"
"testing" "testing"
"github.com/containerd/containerd/reference"
imagedigest "github.com/opencontainers/go-digest"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -159,3 +161,128 @@ func TestGetSandbox(t *testing.T) {
assert.Equal(t, test.expected, sb) assert.Equal(t, test.expected, sb)
} }
} }
func TestNormalizeImageRef(t *testing.T) {
for _, test := range []struct {
input string
expect string
}{
{ // has nothing
input: "busybox",
expect: "docker.io/library/busybox:latest",
},
{ // only has tag
input: "busybox:latest",
expect: "docker.io/library/busybox:latest",
},
{ // only has digest
input: "busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
expect: "docker.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
},
{ // only has path
input: "library/busybox",
expect: "docker.io/library/busybox:latest",
},
{ // only has hostname
input: "docker.io/busybox",
expect: "docker.io/library/busybox:latest",
},
{ // has no tag
input: "docker.io/library/busybox",
expect: "docker.io/library/busybox:latest",
},
{ // has no path
input: "docker.io/busybox:latest",
expect: "docker.io/library/busybox:latest",
},
{ // has no hostname
input: "library/busybox:latest",
expect: "docker.io/library/busybox:latest",
},
{ // full reference
input: "docker.io/library/busybox:latest",
expect: "docker.io/library/busybox:latest",
},
{ // gcr reference
input: "gcr.io/library/busybox",
expect: "gcr.io/library/busybox:latest",
},
{ // both tag and digest
input: "gcr.io/library/busybox:latest@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
expect: "gcr.io/library/busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582",
},
} {
t.Logf("TestCase %q", test.input)
normalized, err := normalizeImageRef(test.input)
assert.NoError(t, err)
output := normalized.String()
assert.Equal(t, test.expect, output)
_, err = reference.Parse(output)
assert.NoError(t, err, "%q should be containerd supported reference", output)
}
}
// TestGetUserFromImage tests the logic of getting image uid or user name of image user.
func TestGetUserFromImage(t *testing.T) {
newI64 := func(i int64) *int64 { return &i }
for c, test := range map[string]struct {
user string
uid *int64
name string
}{
"no gid": {
user: "0",
uid: newI64(0),
},
"uid/gid": {
user: "0:1",
uid: newI64(0),
},
"empty user": {
user: "",
},
"multiple spearators": {
user: "1:2:3",
uid: newI64(1),
},
"root username": {
user: "root:root",
name: "root",
},
"username": {
user: "test:test",
name: "test",
},
} {
t.Logf("TestCase - %q", c)
actualUID, actualName := getUserFromImage(test.user)
assert.Equal(t, test.uid, actualUID)
assert.Equal(t, test.name, actualName)
}
}
func TestGetRepoDigestAndTag(t *testing.T) {
digest := imagedigest.Digest("sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582")
for desc, test := range map[string]struct {
ref string
expectedRepoDigest string
expectedRepoTag string
}{
"repo tag should be empty if original ref has no tag": {
ref: "gcr.io/library/busybox@" + digest.String(),
expectedRepoDigest: "gcr.io/library/busybox@" + digest.String(),
},
"repo tag should not be empty if original ref has tag": {
ref: "gcr.io/library/busybox:latest",
expectedRepoDigest: "gcr.io/library/busybox@" + digest.String(),
expectedRepoTag: "gcr.io/library/busybox:latest",
},
} {
t.Logf("TestCase %q", desc)
named, err := normalizeImageRef(test.ref)
assert.NoError(t, err)
repoDigest, repoTag := getRepoDigestAndTag(named, digest)
assert.Equal(t, test.expectedRepoDigest, repoDigest)
assert.Equal(t, test.expectedRepoTag, repoTag)
}
}

View File

@ -18,33 +18,51 @@ package server
import ( import (
"fmt" "fmt"
"golang.org/x/net/context"
"github.com/golang/glog"
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1" runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
) )
// ListImages lists existing images. // ListImages lists existing images.
// TODO(mikebrow): add filters // TODO(random-liu): Add image list filters after CRI defines this more clear, and kubelet
// TODO(mikebrow): harden api // actually needs it.
func (c *criContainerdService) ListImages(ctx context.Context, r *runtime.ListImagesRequest) (*runtime.ListImagesResponse, error) { func (c *criContainerdService) ListImages(ctx context.Context, r *runtime.ListImagesRequest) (retRes *runtime.ListImagesResponse, retErr error) {
glog.V(4).Infof("ListImages with filter %+v", r.GetFilter())
defer func() {
if retErr == nil {
glog.V(4).Infof("ListImages returns image list %+v", retRes.GetImages())
}
}()
imageMetadataA, err := c.imageMetadataStore.List() imageMetadataA, err := c.imageMetadataStore.List()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to list image metadata from store %v", err) return nil, fmt.Errorf("failed to list image metadata from store: %v", err)
} }
// TODO(mikebrow): Get the id->tags, and id->digest mapping from our checkpoint;
// Get other information from containerd image/content store
var images []*runtime.Image var images []*runtime.Image
for _, image := range imageMetadataA { // TODO(mikebrow): write a ImageMetadata to runtime.Image converter for _, image := range imageMetadataA {
ri := &runtime.Image{ // TODO(random-liu): [P0] Make sure corresponding snapshot exists. What if snapshot
Id: image.ID, // doesn't exist?
RepoTags: image.RepoTags, images = append(images, toCRIImage(image))
RepoDigests: image.RepoDigests,
Size_: image.Size,
// TODO(mikebrow): Uid and Username?
}
images = append(images, ri)
} }
return &runtime.ListImagesResponse{Images: images}, nil return &runtime.ListImagesResponse{Images: images}, nil
} }
// toCRIImage converts image metadata to CRI image type.
func toCRIImage(image *metadata.ImageMetadata) *runtime.Image {
runtimeImage := &runtime.Image{
Id: image.ID,
RepoTags: image.RepoTags,
RepoDigests: image.RepoDigests,
Size_: uint64(image.Size),
}
uid, username := getUserFromImage(image.Config.User)
if uid != nil {
runtimeImage.Uid = &runtime.Int64Value{Value: *uid}
}
runtimeImage.Username = username
return runtimeImage
}

View File

@ -0,0 +1,101 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"testing"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
)
func TestListImages(t *testing.T) {
c := newTestCRIContainerdService()
imagesInStore := []metadata.ImageMetadata{
{
ID: "test-id-1",
ChainID: "test-chainid-1",
RepoTags: []string{"tag-a-1", "tag-b-1"},
RepoDigests: []string{"digest-a-1", "digest-b-1"},
Size: 1000,
Config: &imagespec.ImageConfig{
User: "root",
},
},
{
ID: "test-id-2",
ChainID: "test-chainid-2",
RepoTags: []string{"tag-a-2", "tag-b-2"},
RepoDigests: []string{"digest-a-2", "digest-b-2"},
Size: 2000,
Config: &imagespec.ImageConfig{
User: "1234:1234",
},
},
{
ID: "test-id-3",
ChainID: "test-chainid-3",
RepoTags: []string{"tag-a-3", "tag-b-3"},
RepoDigests: []string{"digest-a-3", "digest-b-3"},
Size: 3000,
Config: &imagespec.ImageConfig{
User: "nobody",
},
},
}
expect := []*runtime.Image{
{
Id: "test-id-1",
RepoTags: []string{"tag-a-1", "tag-b-1"},
RepoDigests: []string{"digest-a-1", "digest-b-1"},
Size_: uint64(1000),
Username: "root",
},
{
Id: "test-id-2",
RepoTags: []string{"tag-a-2", "tag-b-2"},
RepoDigests: []string{"digest-a-2", "digest-b-2"},
Size_: uint64(2000),
Uid: &runtime.Int64Value{Value: 1234},
},
{
Id: "test-id-3",
RepoTags: []string{"tag-a-3", "tag-b-3"},
RepoDigests: []string{"digest-a-3", "digest-b-3"},
Size_: uint64(3000),
Username: "nobody",
},
}
for _, i := range imagesInStore {
assert.NoError(t, c.imageMetadataStore.Create(i))
}
resp, err := c.ListImages(context.Background(), &runtime.ListImagesRequest{})
assert.NoError(t, err)
require.NotNil(t, resp)
images := resp.GetImages()
assert.Len(t, images, len(expect))
for _, i := range expect {
assert.Contains(t, images, i)
}
}

View File

@ -24,8 +24,8 @@ import (
containerdimages "github.com/containerd/containerd/images" containerdimages "github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker" "github.com/containerd/containerd/remotes/docker"
"github.com/docker/distribution/reference"
"github.com/golang/glog" "github.com/golang/glog"
imagedigest "github.com/opencontainers/go-digest"
imagespec "github.com/opencontainers/image-spec/specs-go/v1" imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/net/context" "golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1" runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
@ -33,6 +33,44 @@ import (
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata" "github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
) )
// For image management:
// 1) We have an in-memory metadata index to:
// a. Maintain ImageID -> RepoTags, ImageID -> RepoDigset relationships; ImageID
// is the digest of image config, which conforms to oci image spec.
// b. Cache constant and useful information such as image chainID, config etc.
// c. An image will be added into the in-memory metadata only when it's successfully
// pulled and unpacked.
//
// 2) We use containerd image metadata store and content store:
// a. To resolve image reference (digest/tag) locally. During pulling image, we
// normalize the image reference provided by user, and put it into image metadata
// store with resolved descriptor. For the other operations, if image id is provided,
// we'll access the in-memory metadata index directly; if image reference is
// provided, we'll normalize it, resolve it in containerd image metadata store
// to get the image id.
// b. As the backup of in-memory metadata in 1). During startup, the in-memory
// metadata could be re-constructed from image metadata store + content store.
//
// Several problems with current approach:
// 1) An entry in containerd image metadata store doesn't mean a "READY" (successfully
// pulled and unpacked) image. E.g. during pulling, the client gets killed. In that case,
// if we saw an image without snapshots or with in-complete contents during startup,
// should we re-pull the image? Or should we remove the entry?
//
// 2) Containerd suggests user to add entry before pulling the image. However if
// an error occurrs during the pulling, should we remove the entry from metadata
// store? Or should we leave it there until next startup (resource leakage)?
//
// 3) CRI-containerd only exposes "READY" (successfully pulled and unpacked) images
// to the user, which are maintained in the in-memory metadata index. However, it's
// still possible that someone else removes the content or snapshot by-pass cri-containerd,
// how do we detect that and update the in-memory metadata correspondingly? Always
// check whether corresponding snapshot is ready when reporting image status?
//
// 4) Is the content important if we cached necessary information in-memory
// after we pull the image? How to manage the disk usage of contents? If some
// contents are missing but snapshots are ready, is the image still "READY"?
// PullImage pulls an image with authentication config. // PullImage pulls an image with authentication config.
// TODO(mikebrow): add authentication // TODO(mikebrow): add authentication
// TODO(mikebrow): harden api (including figuring out at what layer we should be blocking on duplicate requests.) // TODO(mikebrow): harden api (including figuring out at what layer we should be blocking on duplicate requests.)
@ -40,146 +78,110 @@ func (c *criContainerdService) PullImage(ctx context.Context, r *runtime.PullIma
glog.V(2).Infof("PullImage %q with auth config %+v", r.GetImage().GetImage(), r.GetAuth()) glog.V(2).Infof("PullImage %q with auth config %+v", r.GetImage().GetImage(), r.GetAuth())
defer func() { defer func() {
if retErr == nil { if retErr == nil {
glog.V(2).Infof("PullImage returns image reference %q", retRes.GetImageRef()) glog.V(2).Infof("PullImage %q returns image reference %q",
r.GetImage().GetImage(), retRes.GetImageRef())
} }
}() }()
var ( namedRef, err := normalizeImageRef(r.GetImage().GetImage())
size int64
desc imagespec.Descriptor
)
image, err := normalizeImageRef(r.GetImage().GetImage())
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to parse image reference %q: %v", r.GetImage().GetImage(), err) return nil, fmt.Errorf("failed to parse image reference %q: %v", r.GetImage().GetImage(), err)
} }
// TODO(random-liu): [P0] Avoid concurrent pulling/removing on the same image reference.
image := namedRef.String()
if r.GetImage().GetImage() != image { if r.GetImage().GetImage() != image {
glog.V(4).Info("PullImage using normalized image ref: %q", image) glog.V(4).Infof("PullImage using normalized image ref: %q", image)
} }
if desc, size, err = c.pullImage(ctx, image); err != nil { // TODO(random-liu): [P1] Schema 1 image is not supported in containerd now, we need to support
// it for backward compatiblity.
cfgDigest, manifestDigest, err := c.pullImage(ctx, image)
if err != nil {
return nil, fmt.Errorf("failed to pull image %q: %v", image, err) return nil, fmt.Errorf("failed to pull image %q: %v", image, err)
} }
digest := desc.Digest.String() // TODO(mikebrow): add truncIndex for image id // Use config digest as imageID to conform to oci image spec.
// TODO(mikebrow): add truncIndex for image id
imageID := cfgDigest.String()
glog.V(4).Infof("Pulled image %q with image id %q, manifest digest %q", image, imageID, manifestDigest)
// TODO(mikebrow): pass a metadata struct to pullimage and fill in the tags/digests repoDigest, repoTag := getRepoDigestAndTag(namedRef, manifestDigest)
// store the image metadata _, err = c.imageMetadataStore.Get(imageID)
// TODO(mikebrow): consider what to do if pullimage was called and metadata already exists (error? udpate?) if err != nil && !metadata.IsNotExistError(err) {
meta := &metadata.ImageMetadata{ return nil, fmt.Errorf("failed to get image %q metadata: %v", imageID, err)
ID: digest,
RepoTags: []string{image},
RepoDigests: []string{digest},
Size: uint64(size), // TODO(mikebrow): compressed or uncompressed size? using compressed
} }
if err = c.imageMetadataStore.Create(*meta); err != nil { // There is a known race here because the image metadata could be created after `Get`.
return &runtime.PullImageResponse{ImageRef: digest}, // TODO(random-liu): [P1] Do not use metadata store. Use simple in-memory data structure to
fmt.Errorf("pulled image `%q` but failed to store metadata for digest: %s err: %v", image, digest, err) // maintain the id -> information index. And use the container image store as backup and
// recover in-memory state during startup.
if err == nil {
// Update existing image metadata.
if err := c.imageMetadataStore.Update(imageID, func(m metadata.ImageMetadata) (metadata.ImageMetadata, error) {
updateImageMetadata(&m, repoTag, repoDigest)
return m, nil
}); err != nil {
return nil, fmt.Errorf("failed to update image %q metadata: %v", imageID, err)
}
return &runtime.PullImageResponse{ImageRef: imageID}, err
} }
// Return the image digest // Get image information.
return &runtime.PullImageResponse{ImageRef: digest}, err chainID, size, config, err := c.getImageInfo(ctx, image)
}
// normalizeImageRef normalizes the image reference following the docker convention. This is added
// mainly for backward compatibility.
func normalizeImageRef(ref string) (string, error) {
named, err := reference.ParseNormalizedNamed(ref)
if err != nil { if err != nil {
return "", err return nil, fmt.Errorf("failed to get image %q information: %v", image, err)
} }
named = reference.TagNameOnly(named)
return named.String(), nil // NOTE(random-liu): the actual state in containerd is the source of truth, even we maintain
// in-memory image metadata, it's only for in-memory indexing. The image could be removed
// by someone else anytime, before/during/after we create the metadata. We should always
// check the actual state in containerd before using the image or returning status of the
// image.
// Create corresponding image metadata.
newMeta := metadata.ImageMetadata{
ID: imageID,
ChainID: chainID.String(),
Size: size,
Config: config,
}
// Add the image reference used into repo tags. Note if the image is pulled with
// repo digest, it will also be added in to repo tags, which is fine.
updateImageMetadata(&newMeta, repoTag, repoDigest)
if err = c.imageMetadataStore.Create(newMeta); err != nil {
return nil, fmt.Errorf("failed to create image %q metadata: %v", imageID, err)
}
return &runtime.PullImageResponse{ImageRef: imageID}, err
} }
// imageReferenceResolver attempts to resolve the image reference into a name // pullImage pulls image and returns image id (config digest) and manifest digest.
// and manifest via the containerd library call.. // The ref should be normalized image reference.
// // TODO(random-liu): [P0] Wait for all downloadings to be done before return.
// The argument `ref` should be a scheme-less URI representing the remote. func (c *criContainerdService) pullImage(ctx context.Context, ref string) (
// Structurally, it has a host and path. The "host" can be used to directly imagedigest.Digest, imagedigest.Digest, error) {
// reference a specific host or be matched against a specific handler. // Resolve the image reference to get descriptor and fetcher.
//
// The returned name should be used to identify the referenced entity.
// Dependending on the remote namespace, this may be immutable or mutable.
// While the name may differ from ref, it should itself be a valid ref.
//
// If the resolution fails, an error will be returned.
// TODO(mikebrow) add config.json based image.Config as an additional return value from this resolver fn()
func (c *criContainerdService) imageReferenceResolver(ctx context.Context, ref string) (resolvedImageName string, manifest imagespec.Manifest, compressedSize uint64, err error) {
var (
size int64
desc imagespec.Descriptor
fetcher remotes.Fetcher
)
// Resolve the image name; place that in the image store; then dispatch
// a handler to fetch the object for the manifest
resolver := docker.NewResolver() resolver := docker.NewResolver()
resolvedImageName, desc, fetcher, err = resolver.Resolve(ctx, ref) _, desc, fetcher, err := resolver.Resolve(ctx, ref)
if err != nil { if err != nil {
return resolvedImageName, manifest, compressedSize, fmt.Errorf("failed to resolve ref %q: err: %v", ref, err) return "", "", fmt.Errorf("failed to resolve ref %q: %v", ref, err)
} }
// Currently, the resolved image name is the same with ref in docker resolver,
// but they may be different in the future.
// TODO(random-liu): Always resolve image reference and use resolved image name in
// the system.
err = c.imageStoreService.Put(ctx, resolvedImageName, desc) // Put the image information into containerd image store.
if err != nil { // In the future, containerd will rely on the information in the image store to perform image
return resolvedImageName, manifest, compressedSize, fmt.Errorf("failed to put %q: desc: %v err: %v", resolvedImageName, desc, err) // garbage collection.
} // For now, we simply use it to store and retrieve information required for pulling an image.
if putErr := c.imageStoreService.Put(ctx, ref, desc); putErr != nil {
err = containerdimages.Dispatch( return "", "", fmt.Errorf("failed to put image %q desc %v into containerd image store: %v",
ctx, ref, desc, putErr)
remotes.FetchHandler(c.contentIngester, fetcher),
desc)
if err != nil {
return resolvedImageName, manifest, compressedSize, fmt.Errorf("failed to fetch %q: desc: %v err: %v", resolvedImageName, desc, err)
}
image, err := c.imageStoreService.Get(ctx, resolvedImageName)
if err != nil {
return resolvedImageName, manifest, compressedSize,
fmt.Errorf("get failed for image:%q err: %v", resolvedImageName, err)
}
p, err := content.ReadBlob(ctx, c.contentProvider, image.Target.Digest)
if err != nil {
return resolvedImageName, manifest, compressedSize,
fmt.Errorf("readblob failed for digest:%q err: %v", image.Target.Digest, err)
}
err = json.Unmarshal(p, &manifest)
if err != nil {
return resolvedImageName, manifest, compressedSize,
fmt.Errorf("unmarshal blob to manifest failed for digest:%q err: %v", image.Target.Digest, err)
}
size, err = image.Size(ctx, c.contentProvider)
if err != nil {
return resolvedImageName, manifest, compressedSize,
fmt.Errorf("size failed for image:%q %v", image.Target.Digest, err)
}
compressedSize = uint64(size)
return resolvedImageName, manifest, compressedSize, nil
}
func (c *criContainerdService) pullImage(ctx context.Context, ref string) (imagespec.Descriptor, int64, error) {
var (
err error
size int64
desc imagespec.Descriptor
resolvedImageName string
fetcher remotes.Fetcher
)
// Resolve the image name; place that in the image store; then dispatch
// a handler for a sequence of handlers which: 1) fetch the object using a
// FetchHandler; and 3) recurse through any sub-layers via a ChildrenHandler
resolver := docker.NewResolver()
resolvedImageName, desc, fetcher, err = resolver.Resolve(ctx, ref)
if err != nil {
return desc, size, fmt.Errorf("failed to resolve ref %q: err: %v", ref, err)
}
err = c.imageStoreService.Put(ctx, resolvedImageName, desc)
if err != nil {
return desc, size, fmt.Errorf("failed to put %q: desc: %v err: %v", resolvedImageName, desc, err)
} }
// TODO(random-liu): What if following operations fail? Do we need to do cleanup?
// Fetch all image resources into content store.
// Dispatch a handler which will run a sequence of handlers to:
// 1) fetch the object using a FetchHandler;
// 2) recurse through any sub-layers via a ChildrenHandler.
err = containerdimages.Dispatch( err = containerdimages.Dispatch(
ctx, ctx,
containerdimages.Handlers( containerdimages.Handlers(
@ -187,34 +189,60 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) (image
containerdimages.ChildrenHandler(c.contentProvider)), containerdimages.ChildrenHandler(c.contentProvider)),
desc) desc)
if err != nil { if err != nil {
return desc, size, fmt.Errorf("failed to fetch %q: desc: %v err: %v", resolvedImageName, desc, err) return "", "", fmt.Errorf("failed to fetch image %q desc %+v: %v", ref, desc, err)
} }
image, err := c.imageStoreService.Get(ctx, resolvedImageName) image, err := c.imageStoreService.Get(ctx, ref)
if err != nil { if err != nil {
return desc, size, return "", "", fmt.Errorf("failed to get image %q from containerd image store: %v", ref, err)
fmt.Errorf("get failed for image:%q err: %v", resolvedImageName, err)
} }
p, err := content.ReadBlob(ctx, c.contentProvider, image.Target.Digest) // Read the image manifest from content store.
manifestDigest := image.Target.Digest
p, err := content.ReadBlob(ctx, c.contentProvider, manifestDigest)
if err != nil { if err != nil {
return desc, size, return "", "", fmt.Errorf("readblob failed for manifest digest %q: %v", manifestDigest, err)
fmt.Errorf("readblob failed for digest:%q err: %v", image.Target.Digest, err)
} }
var manifest imagespec.Manifest var manifest imagespec.Manifest
err = json.Unmarshal(p, &manifest) if err := json.Unmarshal(p, &manifest); err != nil {
if err != nil { return "", "", fmt.Errorf("unmarshal blob to manifest failed for manifest digest %q: %v",
return desc, size, manifestDigest, err)
fmt.Errorf("unmarshal blob to manifest failed for digest:%q %v", image.Target.Digest, err)
} }
_, err = c.rootfsUnpacker.Unpack(ctx, manifest.Layers) // ignoring returned chainID for now
if err != nil { // Unpack the image layers into snapshots.
return desc, size, if _, err = c.rootfsUnpacker.Unpack(ctx, manifest.Layers); err != nil {
fmt.Errorf("unpack failed for manifest layers:%v %v", manifest.Layers, err) return "", "", fmt.Errorf("unpack failed for manifest layers %+v: %v", manifest.Layers, err)
} }
size, err = image.Size(ctx, c.contentProvider) // TODO(random-liu): Considering how to deal with the disk usage of content.
configDesc, err := image.Config(ctx, c.contentProvider)
if err != nil { if err != nil {
return desc, size, return "", "", fmt.Errorf("failed to get config descriptor for image %q: %v", ref, err)
fmt.Errorf("size failed for image:%q %v", image.Target.Digest, err) }
return configDesc.Digest, manifestDigest, nil
}
// insertToStringSlice is a helper function to insert a string into the string slice
// if the string is not in the slice yet.
func insertToStringSlice(ss []string, s string) []string {
found := false
for _, str := range ss {
if s == str {
found = true
break
}
}
if !found {
ss = append(ss, s)
}
return ss
}
// updateImageMetadata updates existing image meta with new repoTag and repoDigest.
func updateImageMetadata(meta *metadata.ImageMetadata, repoTag, repoDigest string) {
if repoTag != "" {
meta.RepoTags = insertToStringSlice(meta.RepoTags, repoTag)
}
if repoDigest != "" {
meta.RepoDigests = insertToStringSlice(meta.RepoDigests, repoDigest)
} }
return desc, size, nil
} }

View File

@ -19,27 +19,56 @@ package server
import ( import (
"testing" "testing"
"github.com/containerd/containerd/reference"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
) )
func TestNormalizeImageRef(t *testing.T) { func TestUpdateImageMetadata(t *testing.T) {
for _, ref := range []string{ meta := metadata.ImageMetadata{
"busybox", // has nothing ID: "test-id",
"busybox:latest", // only has tag ChainID: "test-chain-id",
"busybox@sha256:e6693c20186f837fc393390135d8a598a96a833917917789d63766cab6c59582", // only has digest Size: 1234,
"library/busybox", // only has path }
"docker.io/busybox", // only has hostname for desc, test := range map[string]struct {
"docker.io/library/busybox", // has no tag repoTags []string
"docker.io/busybox:latest", // has no path repoDigests []string
"library/busybox:latest", // has no hostname repoTag string
"docker.io/library/busybox:latest", // full reference repoDigest string
"gcr.io/library/busybox", // gcr reference expectedRepoTags []string
expectedRepoDigests []string
}{
"Add duplicated repo tag and digest": {
repoTags: []string{"a", "b"},
repoDigests: []string{"c", "d"},
repoTag: "a",
repoDigest: "c",
expectedRepoTags: []string{"a", "b"},
expectedRepoDigests: []string{"c", "d"},
},
"Add new repo tag and digest": {
repoTags: []string{"a", "b"},
repoDigests: []string{"c", "d"},
repoTag: "e",
repoDigest: "f",
expectedRepoTags: []string{"a", "b", "e"},
expectedRepoDigests: []string{"c", "d", "f"},
},
"Add empty repo tag and digest": {
repoTags: []string{"a", "b"},
repoDigests: []string{"c", "d"},
repoTag: "",
repoDigest: "",
expectedRepoTags: []string{"a", "b"},
expectedRepoDigests: []string{"c", "d"},
},
} { } {
t.Logf("TestCase %q", ref) t.Logf("TestCase %q", desc)
normalized, err := normalizeImageRef(ref) m := meta
assert.NoError(t, err) m.RepoTags = test.repoTags
_, err = reference.Parse(normalized) m.RepoDigests = test.repoDigests
assert.NoError(t, err, "%q should be containerd supported reference", normalized) updateImageMetadata(&m, test.repoTag, test.repoDigest)
assert.Equal(t, test.expectedRepoTags, m.RepoTags)
assert.Equal(t, test.expectedRepoDigests, m.RepoDigests)
} }
} }

View File

@ -17,17 +17,62 @@ limitations under the License.
package server package server
import ( import (
"golang.org/x/net/context" "fmt"
"github.com/containerd/containerd/images"
"github.com/golang/glog"
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1" runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
) )
// RemoveImage removes the image. // RemoveImage removes the image.
// TODO(mikebrow): harden api // TODO(mikebrow): harden api
func (c *criContainerdService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequest) (*runtime.RemoveImageResponse, error) { // TODO(random-liu): Update CRI to pass image reference instead of ImageSpec. (See
// Only remove image from the internal metadata store for now. // kubernetes/kubernetes#46255)
// Note that the image must be digest here in current implementation. // TODO(random-liu): We should change CRI to distinguish image id and image spec.
// TODO(mikebrow): remove the image via containerd // Remove the whole image no matter the it's image id or reference. This is the
err := c.imageMetadataStore.Delete(r.GetImage().GetImage()) // semantic defined in CRI now.
return &runtime.RemoveImageResponse{}, err func (c *criContainerdService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequest) (retRes *runtime.RemoveImageResponse, retErr error) {
glog.V(2).Infof("RemoveImage %q", r.GetImage().GetImage())
defer func() {
if retErr == nil {
glog.V(2).Infof("RemoveImage %q returns successfully", r.GetImage().GetImage())
}
}()
imageID, err := c.localResolve(ctx, r.GetImage().GetImage())
if err != nil {
return nil, fmt.Errorf("can not resolve %q locally: %v", r.GetImage().GetImage(), err)
}
if imageID == "" {
// return empty without error when image not found.
return &runtime.RemoveImageResponse{}, nil
}
meta, err := c.imageMetadataStore.Get(imageID)
if err != nil {
if metadata.IsNotExistError(err) {
return &runtime.RemoveImageResponse{}, nil
}
return nil, fmt.Errorf("an error occurred when get image %q metadata: %v", imageID, err)
}
// Also include repo digest, because if user pull image with digest,
// there will also be a corresponding repo digest reference.
for _, ref := range append(meta.RepoTags, meta.RepoDigests...) {
// TODO(random-liu): Containerd should schedule a garbage collection immediately,
// and we may want to wait for the garbage collection to be over here.
err = c.imageStoreService.Delete(ctx, ref)
if err == nil || images.IsNotFound(err) {
continue
}
return nil, fmt.Errorf("failed to delete image reference %q for image %q: %v", ref, imageID, err)
}
err = c.imageMetadataStore.Delete(imageID)
if err != nil {
if metadata.IsNotExistError(err) {
return &runtime.RemoveImageResponse{}, nil
}
return nil, fmt.Errorf("an error occurred when delete image %q matadata: %v", imageID, err)
}
return &runtime.RemoveImageResponse{}, nil
} }

View File

@ -17,48 +17,57 @@ limitations under the License.
package server package server
import ( import (
"golang.org/x/net/context" "fmt"
"github.com/golang/glog"
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1" runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
) )
// ImageStatus returns the status of the image, returns nil if the image isn't present. // ImageStatus returns the status of the image, returns nil if the image isn't present.
func (c *criContainerdService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequest) (*runtime.ImageStatusResponse, error) { // TODO(random-liu): We should change CRI to distinguish image id and image spec. (See
ref := r.GetImage().GetImage() // kubernetes/kubernetes#46255)
// TODO(mikebrow): Get the id->tags, and id->digest mapping from our checkpoint; func (c *criContainerdService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequest) (retRes *runtime.ImageStatusResponse, retErr error) {
// Get other information from containerd image/content store glog.V(4).Infof("ImageStatus for image %q", r.GetImage().GetImage())
defer func() {
if retErr == nil {
glog.V(4).Infof("ImageStatus for %q returns image status %+v",
r.GetImage().GetImage(), retRes.GetImage())
}
}()
imageID, err := c.localResolve(ctx, r.GetImage().GetImage())
if err != nil {
return nil, fmt.Errorf("can not resolve %q locally: %v", r.GetImage().GetImage(), err)
}
if imageID == "" {
// return empty without error when image not found.
return &runtime.ImageStatusResponse{}, nil
}
// if the passed ref is a digest.. and is stored the following get should work meta, err := c.imageMetadataStore.Get(imageID)
// note: get returns nil with no err if err != nil {
meta, _ := c.imageMetadataStore.Get(ref) if metadata.IsNotExistError(err) {
if meta != nil { return &runtime.ImageStatusResponse{}, nil
return &runtime.ImageStatusResponse{Image: &runtime.Image{ // TODO(mikebrow): write a ImageMetadata to runtim.Image converter }
return nil, fmt.Errorf("an error occurred during get image %q metadata: %v",
imageID, err)
}
// TODO(random-liu): [P0] Make sure corresponding snapshot exists. What if snapshot
// doesn't exist?
runtimeImage := &runtime.Image{
Id: meta.ID, Id: meta.ID,
RepoTags: meta.RepoTags, RepoTags: meta.RepoTags,
RepoDigests: meta.RepoDigests, RepoDigests: meta.RepoDigests,
Size_: meta.Size, Size_: uint64(meta.Size),
// TODO(mikebrow): Uid and Username?
}}, nil
} }
uid, username := getUserFromImage(meta.Config.User)
if uid != nil {
runtimeImage.Uid = &runtime.Int64Value{Value: *uid}
}
runtimeImage.Username = username
// Search for image by ref in repo tags if found the ID matching ref // TODO(mikebrow): write a ImageMetadata to runtim.Image converter
// is our digest. return &runtime.ImageStatusResponse{Image: runtimeImage}, nil
imageMetadataA, err := c.imageMetadataStore.List()
if err == nil {
for _, meta := range imageMetadataA {
for _, tag := range meta.RepoTags {
if ref == tag {
return &runtime.ImageStatusResponse{Image: &runtime.Image{ // TODO(mikebrow): write a ImageMetadata to runtim.Image converter
Id: meta.ID,
RepoTags: meta.RepoTags,
RepoDigests: meta.RepoDigests,
Size_: meta.Size,
// TODO(mikebrow): Uid and Username?
}}, nil
}
}
}
}
return nil, nil
} }

View File

@ -0,0 +1,69 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"testing"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
)
func TestImageStatus(t *testing.T) {
testID := "sha256:d848ce12891bf78792cda4a23c58984033b0c397a55e93a1556202222ecc5ed4"
meta := metadata.ImageMetadata{
ID: testID,
ChainID: "test-chain-id",
RepoTags: []string{"a", "b"},
RepoDigests: []string{"c", "d"},
Size: 1234,
Config: &imagespec.ImageConfig{
User: "user:group",
},
}
expected := &runtime.Image{
Id: testID,
RepoTags: []string{"a", "b"},
RepoDigests: []string{"c", "d"},
Size_: uint64(1234),
Username: "user",
}
c := newTestCRIContainerdService()
t.Logf("should return nil image spec without error for non-exist image")
resp, err := c.ImageStatus(context.Background(), &runtime.ImageStatusRequest{
Image: &runtime.ImageSpec{Image: testID},
})
assert.NoError(t, err)
require.NotNil(t, resp)
assert.Nil(t, resp.GetImage())
assert.NoError(t, c.imageMetadataStore.Create(meta))
t.Logf("should return correct image status for exist image")
resp, err = c.ImageStatus(context.Background(), &runtime.ImageStatusRequest{
Image: &runtime.ImageSpec{Image: testID},
})
assert.NoError(t, err)
assert.NotNil(t, resp)
assert.Equal(t, expected, resp.GetImage())
}

View File

@ -52,6 +52,7 @@ func newTestCRIContainerdService() *criContainerdService {
rootDir: testRootDir, rootDir: testRootDir,
containerService: servertesting.NewFakeExecutionClient(), containerService: servertesting.NewFakeExecutionClient(),
sandboxStore: metadata.NewSandboxStore(store.NewMetadataStore()), sandboxStore: metadata.NewSandboxStore(store.NewMetadataStore()),
imageMetadataStore: metadata.NewImageMetadataStore(store.NewMetadataStore()),
sandboxNameIndex: registrar.NewRegistrar(), sandboxNameIndex: registrar.NewRegistrar(),
sandboxIDIndex: truncindex.NewTruncIndex(nil), sandboxIDIndex: truncindex.NewTruncIndex(nil),
containerStore: metadata.NewContainerStore(store.NewMetadataStore()), containerStore: metadata.NewContainerStore(store.NewMetadataStore()),