commit
cbda4256cd
@ -32,7 +32,7 @@ func (c *criContainerdService) ContainerStats(ctx context.Context, in *runtime.C
|
||||
return nil, fmt.Errorf("failed to find container: %v", err)
|
||||
}
|
||||
request := &tasks.MetricsRequest{Filters: []string{"id==" + cntr.ID}}
|
||||
resp, err := c.taskService.Metrics(ctx, request)
|
||||
resp, err := c.client.TaskService().Metrics(ctx, request)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch metrics for task: %v", err)
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ func (c *criContainerdService) ListContainerStats(
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to build metrics request: %v", err)
|
||||
}
|
||||
resp, err := c.taskService.Metrics(ctx, &request)
|
||||
resp, err := c.client.TaskService().Metrics(ctx, &request)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch metrics for tasks: %v", err)
|
||||
}
|
||||
|
@ -315,7 +315,7 @@ type imageInfo struct {
|
||||
}
|
||||
|
||||
// getImageInfo gets image info from containerd.
|
||||
func getImageInfo(ctx context.Context, image containerd.Image, provider content.Provider) (*imageInfo, error) {
|
||||
func getImageInfo(ctx context.Context, image containerd.Image) (*imageInfo, error) {
|
||||
// Get image information.
|
||||
diffIDs, err := image.RootFS(ctx)
|
||||
if err != nil {
|
||||
@ -334,7 +334,7 @@ func getImageInfo(ctx context.Context, image containerd.Image, provider content.
|
||||
}
|
||||
id := desc.Digest.String()
|
||||
|
||||
rb, err := content.ReadBlob(ctx, provider, desc.Digest)
|
||||
rb, err := content.ReadBlob(ctx, image.ContentStore(), desc.Digest)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read image config from content store: %v", err)
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ func (c *criContainerdService) LoadImage(ctx context.Context, r *api.LoadImageRe
|
||||
glog.Warningf("Failed to unpack image %q: %v", repoTag, err)
|
||||
// Do not fail image importing. Unpack will be retried when container creation.
|
||||
}
|
||||
info, err := getImageInfo(ctx, image, c.client.ContentStore())
|
||||
info, err := getImageInfo(ctx, image)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get image %q info: %v", repoTag, err)
|
||||
}
|
||||
|
@ -59,6 +59,9 @@ import (
|
||||
// if we saw an image without snapshots or with in-complete contents during startup,
|
||||
// should we re-pull the image? Or should we remove the entry?
|
||||
//
|
||||
// yanxuean: We cann't delete image directly, because we don't know if the image
|
||||
// is pulled by us. There are resource leakage.
|
||||
//
|
||||
// 2) Containerd suggests user to add entry before pulling the image. However if
|
||||
// an error occurrs during the pulling, should we remove the entry from metadata
|
||||
// store? Or should we leave it there until next startup (resource leakage)?
|
||||
@ -114,8 +117,15 @@ func (c *criContainerdService) PullImage(ctx context.Context, r *runtime.PullIma
|
||||
// Do not fail image pulling. Unpack will be retried before container creation.
|
||||
}
|
||||
|
||||
// Get image information.
|
||||
info, err := getImageInfo(ctx, image)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get image information: %v", err)
|
||||
}
|
||||
imageID := info.id
|
||||
|
||||
repoDigest, repoTag := getRepoDigestAndTag(namedRef, image.Target().Digest, isSchema1)
|
||||
for _, r := range []string{repoTag, repoDigest} {
|
||||
for _, r := range []string{repoTag, repoDigest, imageID} {
|
||||
if r == "" {
|
||||
continue
|
||||
}
|
||||
@ -123,16 +133,7 @@ func (c *criContainerdService) PullImage(ctx context.Context, r *runtime.PullIma
|
||||
return nil, fmt.Errorf("failed to update image reference %q: %v", r, err)
|
||||
}
|
||||
}
|
||||
// Get image information.
|
||||
info, err := getImageInfo(ctx, image, c.client.ContentStore())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get image information: %v", err)
|
||||
}
|
||||
imageID := info.id
|
||||
|
||||
if err := c.createImageReference(ctx, imageID, image.Target()); err != nil {
|
||||
return nil, fmt.Errorf("failed to update image reference %q: %v", imageID, err)
|
||||
}
|
||||
glog.V(4).Infof("Pulled image %q with image id %q, repo tag %q, repo digest %q", imageRef, imageID,
|
||||
repoTag, repoDigest)
|
||||
img := imagestore.Image{
|
||||
@ -158,7 +159,7 @@ func (c *criContainerdService) PullImage(ctx context.Context, r *runtime.PullIma
|
||||
// by someone else anytime, before/during/after we create the metadata. We should always
|
||||
// check the actual state in containerd before using the image or returning status of the
|
||||
// image.
|
||||
return &runtime.PullImageResponse{ImageRef: img.ID}, err
|
||||
return &runtime.PullImageResponse{ImageRef: img.ID}, nil
|
||||
}
|
||||
|
||||
// ParseAuth parses AuthConfig and returns username and password/secret required by containerd.
|
||||
@ -201,13 +202,13 @@ func (c *criContainerdService) createImageReference(ctx context.Context, name st
|
||||
}
|
||||
// TODO(random-liu): Figure out which is the more performant sequence create then update or
|
||||
// update then create.
|
||||
_, err := c.imageStoreService.Create(ctx, img)
|
||||
_, err := c.client.ImageService().Create(ctx, img)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if err != nil && !errdefs.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
_, err = c.imageStoreService.Update(ctx, img, "target")
|
||||
_, err = c.client.ImageService().Update(ctx, img, "target")
|
||||
return err
|
||||
}
|
||||
|
@ -42,7 +42,7 @@ func (c *criContainerdService) RemoveImage(ctx context.Context, r *runtime.Remov
|
||||
return &runtime.RemoveImageResponse{}, nil
|
||||
}
|
||||
|
||||
// Exclude out dated image tag.
|
||||
// Exclude outdated image tag.
|
||||
for i, tag := range image.RepoTags {
|
||||
cImage, err := c.client.GetImage(ctx, tag)
|
||||
if err != nil {
|
||||
@ -53,26 +53,39 @@ func (c *criContainerdService) RemoveImage(ctx context.Context, r *runtime.Remov
|
||||
}
|
||||
desc, err := cImage.Config(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get image %q config descriptor: %v", tag, err)
|
||||
}
|
||||
cID := desc.Digest.String()
|
||||
if cID == image.ID {
|
||||
// We can only get image id by reading Config from content.
|
||||
// If the config is missing, we will fail to get image id,
|
||||
// So we will can't remove the image forever,
|
||||
// and cri-containerd always report the image is ok.
|
||||
// But we also don't check it by manifest,
|
||||
// It's possible that two manifest digests have the same image ID in theory.
|
||||
// In theory it's possible that an image is compressed with different algorithms,
|
||||
// then they'll have the same uncompressed id - image id,
|
||||
// but different ids generated from compressed contents - manifest digest.
|
||||
// So we decide to leave it.
|
||||
// After all, the user can override the repoTag by pulling image again.
|
||||
glog.Errorf("can't remove image,failed to get config for Image tag %q,id %q: %v", tag, image.ID, err)
|
||||
image.RepoTags = append(image.RepoTags[:i], image.RepoTags[i+1:]...)
|
||||
continue
|
||||
}
|
||||
glog.V(4).Infof("Image tag %q for %q is out dated, it's currently used by %q", tag, image.ID, cID)
|
||||
cID := desc.Digest.String()
|
||||
if cID != image.ID {
|
||||
glog.V(4).Infof("Image tag %q for %q is outdated, it's currently used by %q", tag, image.ID, cID)
|
||||
image.RepoTags = append(image.RepoTags[:i], image.RepoTags[i+1:]...)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Include all image references, including RepoTag, RepoDigest and id.
|
||||
for _, ref := range append(image.RepoTags, image.RepoDigests...) {
|
||||
err = c.imageStoreService.Delete(ctx, ref)
|
||||
err = c.client.ImageService().Delete(ctx, ref)
|
||||
if err == nil || errdefs.IsNotFound(err) {
|
||||
continue
|
||||
}
|
||||
return nil, fmt.Errorf("failed to delete image reference %q for image %q: %v", ref, image.ID, err)
|
||||
}
|
||||
// Delete image id synchronously to trigger garbage collection.
|
||||
err = c.imageStoreService.Delete(ctx, image.ID, images.SynchronousDelete())
|
||||
err = c.client.ImageService().Delete(ctx, image.ID, images.SynchronousDelete())
|
||||
if err != nil && !errdefs.IsNotFound(err) {
|
||||
return nil, fmt.Errorf("failed to delete image id %q: %v", image.ID, err)
|
||||
}
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
containerdio "github.com/containerd/containerd/cio"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
containerdimages "github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/platforms"
|
||||
@ -99,7 +98,7 @@ func (c *criContainerdService) recover(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to list images: %v", err)
|
||||
}
|
||||
images, err := loadImages(ctx, cImages, c.client.ContentStore(), c.config.ContainerdConfig.Snapshotter)
|
||||
images, err := loadImages(ctx, cImages, c.config.ContainerdConfig.Snapshotter)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load images: %v", err)
|
||||
}
|
||||
@ -322,7 +321,7 @@ func loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.S
|
||||
// loadImages loads images from containerd.
|
||||
// TODO(random-liu): Check whether image is unpacked, because containerd put image reference
|
||||
// into store before image is unpacked.
|
||||
func loadImages(ctx context.Context, cImages []containerd.Image, provider content.Provider,
|
||||
func loadImages(ctx context.Context, cImages []containerd.Image,
|
||||
snapshotter string) ([]imagestore.Image, error) {
|
||||
// Group images by image id.
|
||||
imageMap := make(map[string][]containerd.Image)
|
||||
@ -340,7 +339,7 @@ func loadImages(ctx context.Context, cImages []containerd.Image, provider conten
|
||||
// imgs len must be > 0, or else the entry will not be created in
|
||||
// previous loop.
|
||||
i := imgs[0]
|
||||
ok, _, _, _, err := containerdimages.Check(ctx, provider, i.Target(), platforms.Default())
|
||||
ok, _, _, _, err := containerdimages.Check(ctx, i.ContentStore(), i.Target(), platforms.Default())
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to check image content readiness for %q: %v", i.Name(), err)
|
||||
continue
|
||||
@ -360,7 +359,7 @@ func loadImages(ctx context.Context, cImages []containerd.Image, provider conten
|
||||
// TODO(random-liu): Consider whether we should try unpack here.
|
||||
}
|
||||
|
||||
info, err := getImageInfo(ctx, i, provider)
|
||||
info, err := getImageInfo(ctx, i)
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to get image info for %q: %v", i.Name(), err)
|
||||
continue
|
||||
|
@ -34,7 +34,7 @@ func (c *criContainerdService) ListPodSandbox(ctx context.Context, r *runtime.Li
|
||||
// List all sandboxes from store.
|
||||
sandboxesInStore := c.sandboxStore.List()
|
||||
|
||||
response, err := c.taskService.List(ctx, &tasks.ListTasksRequest{})
|
||||
response, err := c.client.TaskService().List(ctx, &tasks.ListTasksRequest{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list sandbox containers: %v", err)
|
||||
}
|
||||
|
@ -25,9 +25,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd"
|
||||
"github.com/containerd/containerd/api/services/tasks/v1"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
"github.com/containerd/containerd/sys"
|
||||
"github.com/cri-o/ocicni/pkg/ocicni"
|
||||
@ -94,13 +91,6 @@ type criContainerdService struct {
|
||||
imageStore *imagestore.Store
|
||||
// snapshotStore stores information of all snapshots.
|
||||
snapshotStore *snapshotstore.Store
|
||||
// taskService is containerd tasks client.
|
||||
taskService tasks.TasksClient
|
||||
// contentStoreService is the containerd content service client.
|
||||
contentStoreService content.Store
|
||||
// imageStoreService is the containerd service to store and track
|
||||
// image metadata.
|
||||
imageStoreService images.Store
|
||||
// netPlugin is used to setup and teardown network when run/stop pod sandbox.
|
||||
netPlugin ocicni.CNIPlugin
|
||||
// client is an instance of the containerd client
|
||||
@ -141,9 +131,6 @@ func NewCRIContainerdService(config options.Config) (CRIContainerdService, error
|
||||
snapshotStore: snapshotstore.NewStore(),
|
||||
sandboxNameIndex: registrar.NewRegistrar(),
|
||||
containerNameIndex: registrar.NewRegistrar(),
|
||||
taskService: client.TaskService(),
|
||||
imageStoreService: client.ImageService(),
|
||||
contentStoreService: client.ContentStore(),
|
||||
client: client,
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user