Add image pull waiting.
Signed-off-by: Random-Liu <lantaol@google.com>
This commit is contained in:
parent
bc7dfa2650
commit
c3ac5f7533
@ -17,14 +17,18 @@ limitations under the License.
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
gocontext "context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
containerdimages "github.com/containerd/containerd/images"
|
containerdimages "github.com/containerd/containerd/images"
|
||||||
"github.com/containerd/containerd/remotes"
|
"github.com/containerd/containerd/remotes"
|
||||||
"github.com/containerd/containerd/remotes/docker"
|
"github.com/containerd/containerd/remotes/docker"
|
||||||
|
rootfsservice "github.com/containerd/containerd/services/rootfs"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
imagedigest "github.com/opencontainers/go-digest"
|
imagedigest "github.com/opencontainers/go-digest"
|
||||||
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
|
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
@ -152,6 +156,34 @@ func (c *criContainerdService) PullImage(ctx context.Context, r *runtime.PullIma
|
|||||||
return &runtime.PullImageResponse{ImageRef: imageID}, err
|
return &runtime.PullImageResponse{ImageRef: imageID}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// resourceSet is the helper struct to help tracking all resources associated
|
||||||
|
// with an image.
|
||||||
|
type resourceSet struct {
|
||||||
|
sync.Mutex
|
||||||
|
resources map[string]struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newResourceSet() *resourceSet {
|
||||||
|
return &resourceSet{resources: make(map[string]struct{})}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *resourceSet) add(resource string) {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
r.resources[resource] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// all returns an array of all resources added.
|
||||||
|
func (r *resourceSet) all() map[string]struct{} {
|
||||||
|
r.Lock()
|
||||||
|
defer r.Unlock()
|
||||||
|
resources := make(map[string]struct{})
|
||||||
|
for resource := range r.resources {
|
||||||
|
resources[resource] = struct{}{}
|
||||||
|
}
|
||||||
|
return resources
|
||||||
|
}
|
||||||
|
|
||||||
// pullImage pulls image and returns image id (config digest) and manifest digest.
|
// pullImage pulls image and returns image id (config digest) and manifest digest.
|
||||||
// The ref should be normalized image reference.
|
// The ref should be normalized image reference.
|
||||||
// TODO(random-liu): [P0] Wait for all downloadings to be done before return.
|
// TODO(random-liu): [P0] Wait for all downloadings to be done before return.
|
||||||
@ -183,19 +215,37 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) (
|
|||||||
}
|
}
|
||||||
// TODO(random-liu): What if following operations fail? Do we need to do cleanup?
|
// TODO(random-liu): What if following operations fail? Do we need to do cleanup?
|
||||||
|
|
||||||
|
resources := newResourceSet()
|
||||||
|
|
||||||
|
glog.V(4).Infof("Start downloading resources for image %q", ref)
|
||||||
// Fetch all image resources into content store.
|
// Fetch all image resources into content store.
|
||||||
// Dispatch a handler which will run a sequence of handlers to:
|
// Dispatch a handler which will run a sequence of handlers to:
|
||||||
// 1) fetch the object using a FetchHandler;
|
// 1) track all resources associated using a customized handler;
|
||||||
// 2) recurse through any sub-layers via a ChildrenHandler.
|
// 2) fetch the object using a FetchHandler;
|
||||||
|
// 3) recurse through any sub-layers via a ChildrenHandler.
|
||||||
err = containerdimages.Dispatch(
|
err = containerdimages.Dispatch(
|
||||||
ctx,
|
ctx,
|
||||||
containerdimages.Handlers(
|
containerdimages.Handlers(
|
||||||
|
containerdimages.HandlerFunc(func(ctx gocontext.Context, desc imagespec.Descriptor) (
|
||||||
|
[]imagespec.Descriptor, error) {
|
||||||
|
resources.add(remotes.MakeRefKey(ctx, desc))
|
||||||
|
return nil, nil
|
||||||
|
}),
|
||||||
remotes.FetchHandler(c.contentStoreService, fetcher),
|
remotes.FetchHandler(c.contentStoreService, fetcher),
|
||||||
containerdimages.ChildrenHandler(c.contentStoreService)),
|
containerdimages.ChildrenHandler(c.contentStoreService)),
|
||||||
desc)
|
desc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", "", fmt.Errorf("failed to fetch image %q desc %+v: %v", ref, desc, err)
|
// Dispatch returns error when requested resources are locked.
|
||||||
|
// In that case, we should start waiting and checking the pulling
|
||||||
|
// progress.
|
||||||
|
// TODO(random-liu): Check specific resource locked error type.
|
||||||
|
glog.V(5).Infof("Dispatch for %q returns error: %v", ref, err)
|
||||||
}
|
}
|
||||||
|
// Wait for the image pulling to finish
|
||||||
|
if err := c.waitForResourcesDownloading(ctx, resources.all()); err != nil {
|
||||||
|
return "", "", fmt.Errorf("failed to wait for image %q downloading: %v", ref, err)
|
||||||
|
}
|
||||||
|
glog.V(4).Infof("Finished downloading resources for image %q", ref)
|
||||||
|
|
||||||
image, err := c.imageStoreService.Get(ctx, ref)
|
image, err := c.imageStoreService.Get(ctx, ref)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -214,7 +264,8 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Unpack the image layers into snapshots.
|
// Unpack the image layers into snapshots.
|
||||||
if _, err = c.rootfsUnpacker.Unpack(ctx, manifest.Layers); err != nil {
|
rootfsUnpacker := rootfsservice.NewUnpackerFromClient(c.rootfsService)
|
||||||
|
if _, err = rootfsUnpacker.Unpack(ctx, manifest.Layers); err != nil {
|
||||||
return "", "", fmt.Errorf("unpack failed for manifest layers %+v: %v", manifest.Layers, err)
|
return "", "", fmt.Errorf("unpack failed for manifest layers %+v: %v", manifest.Layers, err)
|
||||||
}
|
}
|
||||||
// TODO(random-liu): Considering how to deal with the disk usage of content.
|
// TODO(random-liu): Considering how to deal with the disk usage of content.
|
||||||
@ -226,6 +277,43 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) (
|
|||||||
return configDesc.Digest, manifestDigest, nil
|
return configDesc.Digest, manifestDigest, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// waitDownloadingPollInterval is the interval to check resource downloading progress.
|
||||||
|
const waitDownloadingPollInterval = 200 * time.Millisecond
|
||||||
|
|
||||||
|
// waitForResourcesDownloading waits for all resource downloading to finish.
|
||||||
|
func (c *criContainerdService) waitForResourcesDownloading(ctx context.Context, resources map[string]struct{}) error {
|
||||||
|
ticker := time.NewTicker(waitDownloadingPollInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
// TODO(random-liu): Use better regexp when containerd `MakeRefKey` contains more
|
||||||
|
// information.
|
||||||
|
statuses, err := c.contentStoreService.Status(ctx, "")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get content status: %v", err)
|
||||||
|
}
|
||||||
|
pulling := false
|
||||||
|
// TODO(random-liu): Move Dispatch into a separate goroutine, so that we could report
|
||||||
|
// image pulling progress concurrently.
|
||||||
|
for _, status := range statuses {
|
||||||
|
_, ok := resources[status.Ref]
|
||||||
|
if ok {
|
||||||
|
glog.V(5).Infof("Pulling resource %q with progress %d/%d",
|
||||||
|
status.Ref, status.Offset, status.Total)
|
||||||
|
pulling = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !pulling {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
// TODO(random-liu): Abort ongoing pulling if cancelled.
|
||||||
|
return fmt.Errorf("image resources pulling is cancelled")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// insertToStringSlice is a helper function to insert a string into the string slice
|
// insertToStringSlice is a helper function to insert a string into the string slice
|
||||||
// if the string is not in the slice yet.
|
// if the string is not in the slice yet.
|
||||||
func insertToStringSlice(ss []string, s string) []string {
|
func insertToStringSlice(ss []string, s string) []string {
|
||||||
|
@ -17,6 +17,8 @@ limitations under the License.
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@ -72,3 +74,22 @@ func TestUpdateImageMetadata(t *testing.T) {
|
|||||||
assert.Equal(t, test.expectedRepoDigests, m.RepoDigests)
|
assert.Equal(t, test.expectedRepoDigests, m.RepoDigests)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestResources(t *testing.T) {
|
||||||
|
const threads = 10
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
r := newResourceSet()
|
||||||
|
for i := 0; i < threads; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(ref string) {
|
||||||
|
r.add(ref)
|
||||||
|
wg.Done()
|
||||||
|
}(fmt.Sprintf("sha256:%d", i))
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
refs := r.all()
|
||||||
|
for i := 0; i < threads; i++ {
|
||||||
|
_, ok := refs[fmt.Sprintf("sha256:%d", i)]
|
||||||
|
assert.True(t, ok)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -29,10 +29,8 @@ import (
|
|||||||
rootfsapi "github.com/containerd/containerd/api/services/rootfs"
|
rootfsapi "github.com/containerd/containerd/api/services/rootfs"
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
"github.com/containerd/containerd/images"
|
"github.com/containerd/containerd/images"
|
||||||
"github.com/containerd/containerd/rootfs"
|
|
||||||
contentservice "github.com/containerd/containerd/services/content"
|
contentservice "github.com/containerd/containerd/services/content"
|
||||||
imagesservice "github.com/containerd/containerd/services/images"
|
imagesservice "github.com/containerd/containerd/services/images"
|
||||||
rootfsservice "github.com/containerd/containerd/services/rootfs"
|
|
||||||
|
|
||||||
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
|
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
|
||||||
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store"
|
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store"
|
||||||
@ -87,9 +85,8 @@ type criContainerdService struct {
|
|||||||
containerService execution.ContainerServiceClient
|
containerService execution.ContainerServiceClient
|
||||||
// contentStoreService is the containerd content service client..
|
// contentStoreService is the containerd content service client..
|
||||||
contentStoreService content.Store
|
contentStoreService content.Store
|
||||||
// rootfsUnpacker is the containerd service to unpack image content
|
// rootfsService is the containerd rootfs service client.
|
||||||
// into snapshots.
|
rootfsService rootfsapi.RootFSClient
|
||||||
rootfsUnpacker rootfs.Unpacker
|
|
||||||
// imageStoreService is the containerd service to store and track
|
// imageStoreService is the containerd service to store and track
|
||||||
// image metadata.
|
// image metadata.
|
||||||
imageStoreService images.Store
|
imageStoreService images.Store
|
||||||
@ -99,7 +96,6 @@ type criContainerdService struct {
|
|||||||
|
|
||||||
// NewCRIContainerdService returns a new instance of CRIContainerdService
|
// NewCRIContainerdService returns a new instance of CRIContainerdService
|
||||||
func NewCRIContainerdService(conn *grpc.ClientConn, rootDir, networkPluginBinDir, networkPluginConfDir string) (CRIContainerdService, error) {
|
func NewCRIContainerdService(conn *grpc.ClientConn, rootDir, networkPluginBinDir, networkPluginConfDir string) (CRIContainerdService, error) {
|
||||||
// TODO: Initialize different containerd clients.
|
|
||||||
// TODO(random-liu): [P2] Recover from runtime state and metadata store.
|
// TODO(random-liu): [P2] Recover from runtime state and metadata store.
|
||||||
c := &criContainerdService{
|
c := &criContainerdService{
|
||||||
os: osinterface.RealOS{},
|
os: osinterface.RealOS{},
|
||||||
@ -116,7 +112,7 @@ func NewCRIContainerdService(conn *grpc.ClientConn, rootDir, networkPluginBinDir
|
|||||||
containerService: execution.NewContainerServiceClient(conn),
|
containerService: execution.NewContainerServiceClient(conn),
|
||||||
imageStoreService: imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)),
|
imageStoreService: imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)),
|
||||||
contentStoreService: contentservice.NewStoreFromClient(contentapi.NewContentClient(conn)),
|
contentStoreService: contentservice.NewStoreFromClient(contentapi.NewContentClient(conn)),
|
||||||
rootfsUnpacker: rootfsservice.NewUnpackerFromClient(rootfsapi.NewRootFSClient(conn)),
|
rootfsService: rootfsapi.NewRootFSClient(conn),
|
||||||
}
|
}
|
||||||
|
|
||||||
netPlugin, err := ocicni.InitCNI(networkPluginBinDir, networkPluginConfDir)
|
netPlugin, err := ocicni.InitCNI(networkPluginBinDir, networkPluginConfDir)
|
||||||
|
Loading…
Reference in New Issue
Block a user