From c3ac5f75331d158412da2257375306b4464b3d38 Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Mon, 22 May 2017 17:21:17 -0700 Subject: [PATCH] Add image pull waiting. Signed-off-by: Random-Liu --- pkg/server/image_pull.go | 96 +++++++++++++++++++++++++++++++++-- pkg/server/image_pull_test.go | 21 ++++++++ pkg/server/service.go | 10 ++-- 3 files changed, 116 insertions(+), 11 deletions(-) diff --git a/pkg/server/image_pull.go b/pkg/server/image_pull.go index 37abbdad6..698ab8054 100644 --- a/pkg/server/image_pull.go +++ b/pkg/server/image_pull.go @@ -17,14 +17,18 @@ limitations under the License. package server import ( + gocontext "context" "encoding/json" "fmt" "net/http" + "sync" + "time" "github.com/containerd/containerd/content" containerdimages "github.com/containerd/containerd/images" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" + rootfsservice "github.com/containerd/containerd/services/rootfs" "github.com/golang/glog" imagedigest "github.com/opencontainers/go-digest" imagespec "github.com/opencontainers/image-spec/specs-go/v1" @@ -152,6 +156,34 @@ func (c *criContainerdService) PullImage(ctx context.Context, r *runtime.PullIma return &runtime.PullImageResponse{ImageRef: imageID}, err } +// resourceSet is the helper struct to help tracking all resources associated +// with an image. +type resourceSet struct { + sync.Mutex + resources map[string]struct{} +} + +func newResourceSet() *resourceSet { + return &resourceSet{resources: make(map[string]struct{})} +} + +func (r *resourceSet) add(resource string) { + r.Lock() + defer r.Unlock() + r.resources[resource] = struct{}{} +} + +// all returns an array of all resources added. +func (r *resourceSet) all() map[string]struct{} { + r.Lock() + defer r.Unlock() + resources := make(map[string]struct{}) + for resource := range r.resources { + resources[resource] = struct{}{} + } + return resources +} + // pullImage pulls image and returns image id (config digest) and manifest digest. // The ref should be normalized image reference. // TODO(random-liu): [P0] Wait for all downloadings to be done before return. @@ -183,19 +215,37 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) ( } // TODO(random-liu): What if following operations fail? Do we need to do cleanup? + resources := newResourceSet() + + glog.V(4).Infof("Start downloading resources for image %q", ref) // Fetch all image resources into content store. // Dispatch a handler which will run a sequence of handlers to: - // 1) fetch the object using a FetchHandler; - // 2) recurse through any sub-layers via a ChildrenHandler. + // 1) track all resources associated using a customized handler; + // 2) fetch the object using a FetchHandler; + // 3) recurse through any sub-layers via a ChildrenHandler. err = containerdimages.Dispatch( ctx, containerdimages.Handlers( + containerdimages.HandlerFunc(func(ctx gocontext.Context, desc imagespec.Descriptor) ( + []imagespec.Descriptor, error) { + resources.add(remotes.MakeRefKey(ctx, desc)) + return nil, nil + }), remotes.FetchHandler(c.contentStoreService, fetcher), containerdimages.ChildrenHandler(c.contentStoreService)), desc) if err != nil { - return "", "", fmt.Errorf("failed to fetch image %q desc %+v: %v", ref, desc, err) + // Dispatch returns error when requested resources are locked. + // In that case, we should start waiting and checking the pulling + // progress. + // TODO(random-liu): Check specific resource locked error type. + glog.V(5).Infof("Dispatch for %q returns error: %v", ref, err) } + // Wait for the image pulling to finish + if err := c.waitForResourcesDownloading(ctx, resources.all()); err != nil { + return "", "", fmt.Errorf("failed to wait for image %q downloading: %v", ref, err) + } + glog.V(4).Infof("Finished downloading resources for image %q", ref) image, err := c.imageStoreService.Get(ctx, ref) if err != nil { @@ -214,7 +264,8 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) ( } // Unpack the image layers into snapshots. - if _, err = c.rootfsUnpacker.Unpack(ctx, manifest.Layers); err != nil { + rootfsUnpacker := rootfsservice.NewUnpackerFromClient(c.rootfsService) + if _, err = rootfsUnpacker.Unpack(ctx, manifest.Layers); err != nil { return "", "", fmt.Errorf("unpack failed for manifest layers %+v: %v", manifest.Layers, err) } // TODO(random-liu): Considering how to deal with the disk usage of content. @@ -226,6 +277,43 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) ( return configDesc.Digest, manifestDigest, nil } +// waitDownloadingPollInterval is the interval to check resource downloading progress. +const waitDownloadingPollInterval = 200 * time.Millisecond + +// waitForResourcesDownloading waits for all resource downloading to finish. +func (c *criContainerdService) waitForResourcesDownloading(ctx context.Context, resources map[string]struct{}) error { + ticker := time.NewTicker(waitDownloadingPollInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + // TODO(random-liu): Use better regexp when containerd `MakeRefKey` contains more + // information. + statuses, err := c.contentStoreService.Status(ctx, "") + if err != nil { + return fmt.Errorf("failed to get content status: %v", err) + } + pulling := false + // TODO(random-liu): Move Dispatch into a separate goroutine, so that we could report + // image pulling progress concurrently. + for _, status := range statuses { + _, ok := resources[status.Ref] + if ok { + glog.V(5).Infof("Pulling resource %q with progress %d/%d", + status.Ref, status.Offset, status.Total) + pulling = true + } + } + if !pulling { + return nil + } + case <-ctx.Done(): + // TODO(random-liu): Abort ongoing pulling if cancelled. + return fmt.Errorf("image resources pulling is cancelled") + } + } +} + // insertToStringSlice is a helper function to insert a string into the string slice // if the string is not in the slice yet. func insertToStringSlice(ss []string, s string) []string { diff --git a/pkg/server/image_pull_test.go b/pkg/server/image_pull_test.go index b7f1e242f..256855ddc 100644 --- a/pkg/server/image_pull_test.go +++ b/pkg/server/image_pull_test.go @@ -17,6 +17,8 @@ limitations under the License. package server import ( + "fmt" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -72,3 +74,22 @@ func TestUpdateImageMetadata(t *testing.T) { assert.Equal(t, test.expectedRepoDigests, m.RepoDigests) } } + +func TestResources(t *testing.T) { + const threads = 10 + var wg sync.WaitGroup + r := newResourceSet() + for i := 0; i < threads; i++ { + wg.Add(1) + go func(ref string) { + r.add(ref) + wg.Done() + }(fmt.Sprintf("sha256:%d", i)) + } + wg.Wait() + refs := r.all() + for i := 0; i < threads; i++ { + _, ok := refs[fmt.Sprintf("sha256:%d", i)] + assert.True(t, ok) + } +} diff --git a/pkg/server/service.go b/pkg/server/service.go index 81ab53eda..77ff5a144 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -29,10 +29,8 @@ import ( rootfsapi "github.com/containerd/containerd/api/services/rootfs" "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" - "github.com/containerd/containerd/rootfs" contentservice "github.com/containerd/containerd/services/content" imagesservice "github.com/containerd/containerd/services/images" - rootfsservice "github.com/containerd/containerd/services/rootfs" "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" "github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store" @@ -87,9 +85,8 @@ type criContainerdService struct { containerService execution.ContainerServiceClient // contentStoreService is the containerd content service client.. contentStoreService content.Store - // rootfsUnpacker is the containerd service to unpack image content - // into snapshots. - rootfsUnpacker rootfs.Unpacker + // rootfsService is the containerd rootfs service client. + rootfsService rootfsapi.RootFSClient // imageStoreService is the containerd service to store and track // image metadata. imageStoreService images.Store @@ -99,7 +96,6 @@ type criContainerdService struct { // NewCRIContainerdService returns a new instance of CRIContainerdService func NewCRIContainerdService(conn *grpc.ClientConn, rootDir, networkPluginBinDir, networkPluginConfDir string) (CRIContainerdService, error) { - // TODO: Initialize different containerd clients. // TODO(random-liu): [P2] Recover from runtime state and metadata store. c := &criContainerdService{ os: osinterface.RealOS{}, @@ -116,7 +112,7 @@ func NewCRIContainerdService(conn *grpc.ClientConn, rootDir, networkPluginBinDir containerService: execution.NewContainerServiceClient(conn), imageStoreService: imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)), contentStoreService: contentservice.NewStoreFromClient(contentapi.NewContentClient(conn)), - rootfsUnpacker: rootfsservice.NewUnpackerFromClient(rootfsapi.NewRootFSClient(conn)), + rootfsService: rootfsapi.NewRootFSClient(conn), } netPlugin, err := ocicni.InitCNI(networkPluginBinDir, networkPluginConfDir)