Merge pull request #438 from yanxuean/import-lease
add lease for importer
This commit is contained in:
commit
200ba370a3
@ -22,8 +22,8 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
|
||||||
|
|
||||||
|
"github.com/containerd/containerd"
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
"github.com/containerd/containerd/images"
|
"github.com/containerd/containerd/images"
|
||||||
@ -72,29 +72,22 @@ type imageConfig struct {
|
|||||||
// Also, the current implementation assumes the implicit file name convention,
|
// Also, the current implementation assumes the implicit file name convention,
|
||||||
// which is not explicitly documented in the spec. (e.g. deadbeef/layer.tar)
|
// which is not explicitly documented in the spec. (e.g. deadbeef/layer.tar)
|
||||||
// It returns a group of image references successfully loaded.
|
// It returns a group of image references successfully loaded.
|
||||||
func Import(ctx context.Context, cs content.Store, is images.Store, reader io.Reader) (_ []string, retErr error) {
|
func Import(ctx context.Context, client *containerd.Client, reader io.Reader) (_ []string, retErr error) {
|
||||||
|
ctx, done, err := client.WithLease(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer done() // nolint: errcheck
|
||||||
|
|
||||||
|
cs := client.ContentStore()
|
||||||
|
is := client.ImageService()
|
||||||
|
|
||||||
tr := tar.NewReader(reader)
|
tr := tar.NewReader(reader)
|
||||||
var (
|
var (
|
||||||
mfsts []manifestDotJSON
|
mfsts []manifestDotJSON
|
||||||
layers = make(map[string]ocispec.Descriptor) // key: filename (deadbeeddeadbeef/layer.tar)
|
layers = make(map[string]ocispec.Descriptor) // key: filename (deadbeeddeadbeef/layer.tar)
|
||||||
configs = make(map[string]imageConfig) // key: filename (deadbeeddeadbeef.json)
|
configs = make(map[string]imageConfig) // key: filename (deadbeeddeadbeef.json)
|
||||||
)
|
)
|
||||||
// Either image is successfully imported or not, we should cleanup gc.root
|
|
||||||
// for all image layers.
|
|
||||||
defer func() {
|
|
||||||
for _, desc := range layers {
|
|
||||||
// Remove root tag from layers now that manifest refers to it
|
|
||||||
if _, err := cs.Update(ctx, content.Info{Digest: desc.Digest}, "labels.containerd.io/gc.root"); err != nil {
|
|
||||||
log.G(ctx).WithError(err).Errorf("Failed to remove layer %q root tag", desc.Digest)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, cfg := range configs {
|
|
||||||
// Remove root tag from config now that manifest refers to it
|
|
||||||
if _, err := cs.Update(ctx, content.Info{Digest: cfg.desc.Digest}, "labels.containerd.io/gc.root"); err != nil {
|
|
||||||
log.G(ctx).WithError(err).Errorf("Failed to remove config %q root tag", cfg.desc.Digest)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
for {
|
for {
|
||||||
hdr, err := tr.Next()
|
hdr, err := tr.Next()
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
@ -156,12 +149,6 @@ func Import(ctx context.Context, cs content.Store, is images.Store, reader io.Re
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return refs, errors.Wrap(err, "write docker manifest")
|
return refs, errors.Wrap(err, "write docker manifest")
|
||||||
}
|
}
|
||||||
defer func() {
|
|
||||||
// Remove root tag from manifest.
|
|
||||||
if _, err := cs.Update(ctx, content.Info{Digest: desc.Digest}, "labels.containerd.io/gc.root"); err != nil {
|
|
||||||
log.G(ctx).WithError(err).Error("Failed to remove manifest root tag")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
for _, ref := range mfst.RepoTags {
|
for _, ref := range mfst.RepoTags {
|
||||||
normalized, err := util.NormalizeImageRef(ref)
|
normalized, err := util.NormalizeImageRef(ref)
|
||||||
@ -214,7 +201,6 @@ func writeDockerSchema2Manifest(ctx context.Context, cs content.Ingester, manife
|
|||||||
manifestBytesR := bytes.NewReader(manifestBytes)
|
manifestBytesR := bytes.NewReader(manifestBytes)
|
||||||
manifestDigest := digest.FromBytes(manifestBytes)
|
manifestDigest := digest.FromBytes(manifestBytes)
|
||||||
labels := map[string]string{}
|
labels := map[string]string{}
|
||||||
labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339)
|
|
||||||
labels["containerd.io/gc.ref.content.0"] = manifest.Config.Digest.String()
|
labels["containerd.io/gc.ref.content.0"] = manifest.Config.Digest.String()
|
||||||
for i, ch := range manifest.Layers {
|
for i, ch := range manifest.Layers {
|
||||||
labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i+1)] = ch.Digest.String()
|
labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i+1)] = ch.Digest.String()
|
||||||
@ -260,9 +246,7 @@ func onUntarLayerTar(ctx context.Context, r io.Reader, cs content.Ingester, name
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer cw.Close()
|
defer cw.Close()
|
||||||
if err := content.Copy(ctx, cw, r, size, "", content.WithLabels(map[string]string{
|
if err := content.Copy(ctx, cw, r, size, ""); err != nil {
|
||||||
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
|
|
||||||
})); err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &ocispec.Descriptor{
|
return &ocispec.Descriptor{
|
||||||
@ -285,9 +269,7 @@ func onUntarDotJSON(ctx context.Context, r io.Reader, cs content.Ingester, name
|
|||||||
defer cw.Close()
|
defer cw.Close()
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
tr := io.TeeReader(r, &buf)
|
tr := io.TeeReader(r, &buf)
|
||||||
if err := content.Copy(ctx, cw, tr, size, "", content.WithLabels(map[string]string{
|
if err := content.Copy(ctx, cw, tr, size, ""); err != nil {
|
||||||
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
|
|
||||||
})); err != nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
config.desc.Digest = cw.Digest()
|
config.desc.Digest = cw.Digest()
|
||||||
|
@ -38,7 +38,7 @@ func (c *criContainerdService) LoadImage(ctx context.Context, r *api.LoadImageRe
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to open file: %v", err)
|
return nil, fmt.Errorf("failed to open file: %v", err)
|
||||||
}
|
}
|
||||||
repoTags, err := importer.Import(ctx, c.client.ContentStore(), c.client.ImageService(), f)
|
repoTags, err := importer.Import(ctx, c.client, f)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to import image: %v", err)
|
return nil, fmt.Errorf("failed to import image: %v", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user