Add image load.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu
2017-10-26 05:59:15 +00:00
parent c6fd18ddc3
commit 25fdf72692
20 changed files with 1316 additions and 104 deletions

View File

@@ -0,0 +1,298 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package importer
import (
"archive/tar"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"strings"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/specs-go"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/kubernetes-incubator/cri-containerd/pkg/util"
)
// This code reuses the docker import code from containerd/containerd#1602.
// It has been simplified a bit and garbage collection support was added.
// If a library/helper is added to containerd in the future, we should switch to it.
// manifestDotJSON is an entry in manifest.json.
type manifestDotJSON struct {
Config string
RepoTags []string
Layers []string
// Parent is unsupported
Parent string
}
// isLayerTar returns true if name is like "deadbeeddeadbeef/layer.tar"
func isLayerTar(name string) bool {
slashes := len(strings.Split(name, "/"))
return slashes == 2 && strings.HasSuffix(name, "/layer.tar")
}
// isDotJSON returns true if name is like "deadbeefdeadbeef.json"
func isDotJSON(name string) bool {
slashes := len(strings.Split(name, "/"))
return slashes == 1 && strings.HasSuffix(name, ".json")
}
type imageConfig struct {
desc ocispec.Descriptor
img ocispec.Image
}
// Import implements Docker Image Spec v1.1.
// An image MUST have `manifest.json`.
// `repositories` file in Docker Image Spec v1.0 is not supported (yet).
// Also, the current implementation assumes the implicit file name convention,
// which is not explicitly documented in the spec. (e.g. deadbeef/layer.tar)
// It returns a group of image references successfully loaded.
func Import(ctx context.Context, cs content.Store, is images.Store, reader io.Reader) (_ []string, retErr error) {
tr := tar.NewReader(reader)
var (
mfsts []manifestDotJSON
layers = make(map[string]ocispec.Descriptor) // key: filename (deadbeeddeadbeef/layer.tar)
configs = make(map[string]imageConfig) // key: filename (deadbeeddeadbeef.json)
)
// Either image is successfully imported or not, we should cleanup gc.root
// for all image layers.
defer func() {
for _, desc := range layers {
// Remove root tag from layers now that manifest refers to it
if _, err := cs.Update(ctx, content.Info{Digest: desc.Digest}, "labels.containerd.io/gc.root"); err != nil {
log.G(ctx).WithError(err).Error("Failed to remove layer %q root tag", desc.Digest)
}
}
for _, cfg := range configs {
// Remove root tag from config now that manifest refers to it
if _, err := cs.Update(ctx, content.Info{Digest: cfg.desc.Digest}, "labels.containerd.io/gc.root"); err != nil {
log.G(ctx).WithError(err).Error("Failed to remove config %q root tag", cfg.desc.Digest)
}
}
}()
for {
hdr, err := tr.Next()
if err == io.EOF {
break
}
if err != nil {
return nil, errors.Wrap(err, "get next file")
}
if hdr.Typeflag != tar.TypeReg && hdr.Typeflag != tar.TypeRegA {
continue
}
if hdr.Name == "manifest.json" {
mfsts, err = onUntarManifestJSON(tr)
if err != nil {
return nil, errors.Wrapf(err, "untar manifest %q", hdr.Name)
}
continue
}
if isLayerTar(hdr.Name) {
desc, err := onUntarLayerTar(ctx, tr, cs, hdr.Name, hdr.Size)
if err != nil {
return nil, errors.Wrapf(err, "untar layer %q", hdr.Name)
}
layers[hdr.Name] = *desc
continue
}
if isDotJSON(hdr.Name) {
c, err := onUntarDotJSON(ctx, tr, cs, hdr.Name, hdr.Size)
if err != nil {
return nil, errors.Wrapf(err, "untar config %q", hdr.Name)
}
configs[hdr.Name] = *c
continue
}
}
var refs []string
defer func() {
if retErr == nil {
return
}
// TODO(random-liu): Consider whether we should keep images already imported
// even when there is an error.
for _, ref := range refs {
if err := is.Delete(ctx, ref); err != nil {
log.G(ctx).WithError(err).Errorf("Failed to remove image %q", ref)
}
}
}()
for _, mfst := range mfsts {
config, ok := configs[mfst.Config]
if !ok {
return refs, errors.Errorf("image config %q not found", mfst.Config)
}
schema2Manifest, err := makeDockerSchema2Manifest(mfst, config, layers)
if err != nil {
return refs, errors.Wrap(err, "create docker manifest")
}
desc, err := writeDockerSchema2Manifest(ctx, cs, *schema2Manifest, config.img.Architecture, config.img.OS)
if err != nil {
return refs, errors.Wrap(err, "write docker manifest")
}
defer func() {
// Remove root tag from manifest.
if _, err := cs.Update(ctx, content.Info{Digest: desc.Digest}, "labels.containerd.io/gc.root"); err != nil {
log.G(ctx).WithError(err).Error("Failed to remove manifest root tag")
}
}()
for _, ref := range mfst.RepoTags {
normalized, err := util.NormalizeImageRef(ref)
if err != nil {
return refs, errors.Wrapf(err, "normalize image ref %q", ref)
}
ref = normalized.String()
imgrec := images.Image{
Name: ref,
Target: *desc,
}
if _, err := is.Create(ctx, imgrec); err != nil {
if !errdefs.IsAlreadyExists(err) {
return refs, errors.Wrapf(err, "create image ref %+v", imgrec)
}
_, err := is.Update(ctx, imgrec)
if err != nil {
return refs, errors.Wrapf(err, "update image ref %+v", imgrec)
}
}
refs = append(refs, ref)
}
}
return refs, nil
}
func makeDockerSchema2Manifest(mfst manifestDotJSON, config imageConfig, layers map[string]ocispec.Descriptor) (*ocispec.Manifest, error) {
manifest := ocispec.Manifest{
Versioned: specs.Versioned{
SchemaVersion: 2,
},
Config: config.desc,
}
for _, f := range mfst.Layers {
desc, ok := layers[f]
if !ok {
return nil, errors.Errorf("layer %q not found", f)
}
manifest.Layers = append(manifest.Layers, desc)
}
return &manifest, nil
}
func writeDockerSchema2Manifest(ctx context.Context, cs content.Ingester, manifest ocispec.Manifest, arch, os string) (*ocispec.Descriptor, error) {
manifestBytes, err := json.Marshal(manifest)
if err != nil {
return nil, err
}
manifestBytesR := bytes.NewReader(manifestBytes)
manifestDigest := digest.FromBytes(manifestBytes)
labels := map[string]string{}
labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339)
labels["containerd.io/gc.ref.content.0"] = manifest.Config.Digest.String()
for i, ch := range manifest.Layers {
labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i+1)] = ch.Digest.String()
}
if err := content.WriteBlob(ctx, cs, "manifest-"+manifestDigest.String(), manifestBytesR,
int64(len(manifestBytes)), manifestDigest, content.WithLabels(labels)); err != nil {
return nil, err
}
desc := &ocispec.Descriptor{
MediaType: images.MediaTypeDockerSchema2Manifest,
Digest: manifestDigest,
Size: int64(len(manifestBytes)),
}
if arch != "" || os != "" {
desc.Platform = &ocispec.Platform{
Architecture: arch,
OS: os,
}
}
return desc, nil
}
func onUntarManifestJSON(r io.Reader) ([]manifestDotJSON, error) {
// name: "manifest.json"
b, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
var mfsts []manifestDotJSON
if err := json.Unmarshal(b, &mfsts); err != nil {
return nil, err
}
return mfsts, nil
}
func onUntarLayerTar(ctx context.Context, r io.Reader, cs content.Ingester, name string, size int64) (*ocispec.Descriptor, error) {
// name is like "deadbeeddeadbeef/layer.tar" ( guaranteed by isLayerTar() )
split := strings.Split(name, "/")
// note: split[0] is not expected digest here
cw, err := cs.Writer(ctx, "layer-"+split[0], size, "")
if err != nil {
return nil, err
}
defer cw.Close()
if err := content.Copy(ctx, cw, r, size, "", content.WithLabels(map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
})); err != nil {
return nil, err
}
return &ocispec.Descriptor{
MediaType: images.MediaTypeDockerSchema2Layer,
Size: size,
Digest: cw.Digest(),
}, nil
}
func onUntarDotJSON(ctx context.Context, r io.Reader, cs content.Ingester, name string, size int64) (*imageConfig, error) {
config := imageConfig{}
config.desc.MediaType = images.MediaTypeDockerSchema2Config
config.desc.Size = size
// name is like "deadbeeddeadbeef.json" ( guaranteed by is DotJSON() )
split := strings.Split(name, ".")
cw, err := cs.Writer(ctx, "config-"+split[0], size, "")
if err != nil {
return nil, err
}
defer cw.Close()
var buf bytes.Buffer
tr := io.TeeReader(r, &buf)
if err := content.Copy(ctx, cw, tr, size, "", content.WithLabels(map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
})); err != nil {
return nil, err
}
config.desc.Digest = cw.Digest()
if err := json.Unmarshal(buf.Bytes(), &config.img); err != nil {
return nil, err
}
return &config, nil
}

View File

@@ -0,0 +1,156 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package opts
import (
"context"
"io/ioutil"
"os"
"path/filepath"
"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/docker/docker/pkg/chrootarchive"
"github.com/docker/docker/pkg/system"
"github.com/opencontainers/image-spec/identity"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
)
// WithImageUnpack guarantees that the image used by the container is unpacked.
func WithImageUnpack(i containerd.Image) containerd.NewContainerOpts {
return func(ctx context.Context, client *containerd.Client, c *containers.Container) error {
if c.Snapshotter == "" {
return errors.New("no snapshotter set for container")
}
snapshotter := client.SnapshotService(c.Snapshotter)
diffIDs, err := i.RootFS(ctx)
if err != nil {
return errors.Wrap(err, "get image diff IDs")
}
chainID := identity.ChainID(diffIDs)
_, err = snapshotter.Stat(ctx, chainID.String())
if err == nil {
return nil
}
if !errdefs.IsNotFound(err) {
return errors.Wrap(err, "stat snapshot")
}
// Unpack the snapshot.
if err := i.Unpack(ctx, c.Snapshotter); err != nil {
return errors.Wrap(err, "unpack snapshot")
}
return nil
}
}
// WithVolumes copies ownership of volume in rootfs to its corresponding host path.
// It doesn't update runtime spec.
// The passed in map is a host path to container path map for all volumes.
// TODO(random-liu): Figure out whether we need to copy volume content.
func WithVolumes(volumeMounts map[string]string) containerd.NewContainerOpts {
return func(ctx context.Context, client *containerd.Client, c *containers.Container) error {
if c.Snapshotter == "" {
return errors.New("no snapshotter set for container")
}
if c.SnapshotKey == "" {
return errors.New("rootfs not created for container")
}
snapshotter := client.SnapshotService(c.Snapshotter)
mounts, err := snapshotter.Mounts(ctx, c.SnapshotKey)
if err != nil {
return err
}
root, err := ioutil.TempDir("", "ctd-volume")
if err != nil {
return err
}
defer os.RemoveAll(root) // nolint: errcheck
for _, m := range mounts {
if err := m.Mount(root); err != nil {
return err
}
}
defer unix.Unmount(root, 0) // nolint: errcheck
for host, volume := range volumeMounts {
src := filepath.Join(root, volume)
if _, err := os.Stat(src); err != nil {
if os.IsNotExist(err) {
// Skip copying directory if it does not exist.
continue
}
return errors.Wrap(err, "stat volume in rootfs")
}
if err := copyExistingContents(src, host); err != nil {
return errors.Wrap(err, "taking runtime copy of volume")
}
}
return nil
}
}
// copyExistingContents copies from the source to the destination and
// ensures the ownership is appropriately set.
func copyExistingContents(source, destination string) error {
srcList, err := ioutil.ReadDir(source)
if err != nil {
return err
}
if len(srcList) > 0 {
dstList, err := ioutil.ReadDir(destination)
if err != nil {
return err
}
if len(dstList) != 0 {
return errors.Errorf("volume at %q is not initially empty", destination)
}
if err := chrootarchive.NewArchiver(nil).CopyWithTar(source, destination); err != nil {
return err
}
}
return copyOwnership(source, destination)
}
// copyOwnership copies the permissions and uid:gid of the src file
// to the dst file
func copyOwnership(src, dst string) error {
stat, err := system.Stat(src)
if err != nil {
return err
}
dstStat, err := system.Stat(dst)
if err != nil {
return err
}
// In some cases, even though UID/GID match and it would effectively be a no-op,
// this can return a permission denied error... for example if this is an NFS
// mount.
// Since it's not really an error that we can't chown to the same UID/GID, don't
// even bother trying in such cases.
if stat.UID() != dstStat.UID() || stat.GID() != dstStat.GID() {
if err := os.Chown(dst, int(stat.UID()), int(stat.GID())); err != nil {
return err
}
}
if stat.Mode() != dstStat.Mode() {
return os.Chmod(dst, os.FileMode(stat.Mode()))
}
return nil
}