From 8524a4ef30fb0a743d946350ba7140497fbf644b Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Sat, 17 Jun 2017 00:22:02 +0000 Subject: [PATCH] Add schema1 support, and use namespace `k8s.io`. Signed-off-by: Lantao Liu --- pkg/server/image_pull.go | 64 ++++++++++++++------- pkg/server/server.go | 33 ++++++++++- pkg/server/testing/fake_execution_client.go | 6 ++ 3 files changed, 79 insertions(+), 24 deletions(-) diff --git a/pkg/server/image_pull.go b/pkg/server/image_pull.go index eab5eb5eb..bb38af391 100644 --- a/pkg/server/image_pull.go +++ b/pkg/server/image_pull.go @@ -28,6 +28,7 @@ import ( containerdimages "github.com/containerd/containerd/images" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" + "github.com/containerd/containerd/remotes/docker/schema1" containerdrootfs "github.com/containerd/containerd/rootfs" "github.com/golang/glog" imagedigest "github.com/opencontainers/go-digest" @@ -207,35 +208,37 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) ( // TODO(random-liu): Always resolve image reference and use resolved image name in // the system. - // Put the image information into containerd image store. - // In the future, containerd will rely on the information in the image store to perform image - // garbage collection. - // For now, we simply use it to store and retrieve information required for pulling an image. - if err = c.imageStoreService.Put(ctx, ref, desc); err != nil { - return "", "", fmt.Errorf("failed to put image %q desc %v into containerd image store: %v", - ref, desc, err) - } - // Do not cleanup if following operations fail so as to make resumable download possible. - glog.V(4).Infof("Start downloading resources for image %q", ref) resources := newResourceSet() + resourceTrackHandler := containerdimages.HandlerFunc(func(ctx gocontext.Context, desc imagespec.Descriptor) ( + []imagespec.Descriptor, error) { + resources.add(remotes.MakeRefKey(ctx, desc)) + return nil, nil + }) // Fetch all image resources into content store. // Dispatch a handler which will run a sequence of handlers to: // 1) track all resources associated using a customized handler; // 2) fetch the object using a FetchHandler; // 3) recurse through any sub-layers via a ChildrenHandler. - err = containerdimages.Dispatch( - ctx, - containerdimages.Handlers( - containerdimages.HandlerFunc(func(ctx gocontext.Context, desc imagespec.Descriptor) ( - []imagespec.Descriptor, error) { - resources.add(remotes.MakeRefKey(ctx, desc)) - return nil, nil - }), + // Support schema1 image. + var ( + schema1Converter *schema1.Converter + handler containerdimages.Handler + ) + if desc.MediaType == containerdimages.MediaTypeDockerSchema1Manifest { + schema1Converter = schema1.NewConverter(c.contentStoreService, fetcher) + handler = containerdimages.Handlers( + resourceTrackHandler, + schema1Converter, + ) + } else { + handler = containerdimages.Handlers( + resourceTrackHandler, remotes.FetchHandler(c.contentStoreService, fetcher), - containerdimages.ChildrenHandler(c.contentStoreService)), - desc) - if err != nil { + containerdimages.ChildrenHandler(c.contentStoreService), + ) + } + if err = containerdimages.Dispatch(ctx, handler, desc); err != nil { // Dispatch returns error when requested resources are locked. // In that case, we should start waiting and checking the pulling // progress. @@ -247,7 +250,26 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) ( return "", "", fmt.Errorf("failed to wait for image %q downloading: %v", ref, err) } glog.V(4).Infof("Finished downloading resources for image %q", ref) + if schema1Converter != nil { + desc, err = schema1Converter.Convert(ctx) + if err != nil { + return "", "", fmt.Errorf("failed to convert schema 1 image %q: %v", ref, err) + } + } + // In the future, containerd will rely on the information in the image store to perform image + // garbage collection. + // For now, we simply use it to store and retrieve information required for pulling an image. + // @stevvooe said we should `Put` before downloading content, However: + // 1) Containerd client put image metadata after downloading; + // 2) We need desc returned by schema1 converter. + // So just put the image metadata after downloading now. + // TODO(random-liu): Fix the potential garbage collection race. + if err = c.imageStoreService.Put(ctx, ref, desc); err != nil { + return "", "", fmt.Errorf("failed to put image %q desc %v into containerd image store: %v", + ref, desc, err) + } + // Do not cleanup if following operations fail so as to make resumable download possible. // TODO(random-liu): Replace with image.Unpack. // Unpack the image layers into snapshots. image, err := c.imageStoreService.Get(ctx, ref) diff --git a/pkg/server/server.go b/pkg/server/server.go index 7158fee70..bf15bd7b8 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -23,15 +23,20 @@ import ( "syscall" "time" + "github.com/containerd/containerd/namespaces" "github.com/golang/glog" + "golang.org/x/net/context" "google.golang.org/grpc" - runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1" "k8s.io/kubernetes/pkg/util/interrupt" ) -// unixProtocol is the network protocol of unix socket. -const unixProtocol = "unix" +const ( + // unixProtocol is the network protocol of unix socket. + unixProtocol = "unix" + // k8sContainerdNamespace is the namespace we use to connect containerd. + k8sContainerdNamespace = "k8s.io" +) // CRIContainerdServer is the grpc server of cri-containerd. type CRIContainerdServer struct { @@ -84,6 +89,28 @@ func ConnectToContainerd(path string, connectionTimeout time.Duration) (*grpc.Cl grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { return net.DialTimeout(unixProtocol, path, timeout) }), + grpc.WithUnaryInterceptor(grpc.UnaryClientInterceptor(unary)), + grpc.WithStreamInterceptor(grpc.StreamClientInterceptor(stream)), } return grpc.Dial(fmt.Sprintf("%s://%s", unixProtocol, path), dialOpts...) } + +// TODO(random-liu): Get rid of following functions after switching to containerd client. +// unary is a wrapper to apply kubernetes namespace in each grpc unary call. +func unary(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + _, ok := namespaces.Namespace(ctx) + if !ok { + ctx = namespaces.WithNamespace(ctx, k8sContainerdNamespace) + } + return invoker(ctx, method, req, reply, cc, opts...) +} + +// stream is a wrapper to apply kubernetes namespace in each grpc stream call. +func stream(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + _, ok := namespaces.Namespace(ctx) + if !ok { + ctx = namespaces.WithNamespace(ctx, k8sContainerdNamespace) + } + + return streamer(ctx, desc, cc, method, opts...) +} diff --git a/pkg/server/testing/fake_execution_client.go b/pkg/server/testing/fake_execution_client.go index 5e9dfa416..76821372b 100644 --- a/pkg/server/testing/fake_execution_client.go +++ b/pkg/server/testing/fake_execution_client.go @@ -384,3 +384,9 @@ func (f *FakeExecutionClient) Processes(ctx context.Context, in *execution.Proce // TODO: implement Processes() return nil, nil } + +// DeleteProcess is a test implementation of execution.DeleteProcess +func (f *FakeExecutionClient) DeleteProcess(ctx context.Context, in *execution.DeleteProcessRequest, opts ...grpc.CallOption) (*execution.DeleteResponse, error) { + // TODO: implement DeleteProcess() + return nil, nil +}