From f0a500a39056de1b862a99ad2fba5022c9ab8e67 Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Tue, 13 Mar 2018 04:41:21 +0000 Subject: [PATCH] Use direct function call. Signed-off-by: Lantao Liu --- Makefile | 3 +- cmd/containerd/containerd.go | 3 - cri.go | 95 +++++++++++++++++++++++++----- integration/test_utils.go | 3 +- pkg/constants/constants.go | 26 ++++++++ pkg/containerd/util/util.go | 17 +++++- pkg/server/container_create.go | 3 +- pkg/server/events.go | 26 ++++---- pkg/server/instrumented_service.go | 59 ++++++++++--------- pkg/server/sandbox_portforward.go | 3 +- pkg/server/service.go | 22 ++----- pkg/server/snapshots.go | 6 +- pkg/server/status.go | 20 ++----- pkg/server/streaming.go | 7 ++- pkg/server/version.go | 18 +++--- pkg/version/version_test.go | 2 - 16 files changed, 195 insertions(+), 118 deletions(-) create mode 100644 pkg/constants/constants.go diff --git a/Makefile b/Makefile index 2da939e8f..6f115132b 100644 --- a/Makefile +++ b/Makefile @@ -28,7 +28,8 @@ VERSION := $(VERSION:v%=%) TARBALL_PREFIX := cri-containerd TARBALL := $(TARBALL_PREFIX)-$(VERSION).$(GOOS)-$(GOARCH).tar.gz BUILD_TAGS := seccomp apparmor -GO_LDFLAGS := -X $(PROJECT)/pkg/version.CRIContainerdVersion=$(VERSION) +GO_LDFLAGS := -X $(PROJECT)/pkg/version.CRIContainerdVersion=$(VERSION) \ + -X $(PROJECT)/vendor/github.com/containerd/containerd/version.Version=$(VERSION)-TEST SOURCES := $(shell find cmd/ pkg/ vendor/ -name '*.go') PLUGIN_SOURCES := $(shell ls *.go) INTEGRATION_SOURCES := $(shell find integration/ -name '*.go') diff --git a/cmd/containerd/containerd.go b/cmd/containerd/containerd.go index bea512838..5aaa12411 100644 --- a/cmd/containerd/containerd.go +++ b/cmd/containerd/containerd.go @@ -41,13 +41,10 @@ import ( "github.com/containerd/containerd/cmd/containerd/command" "github.com/sirupsen/logrus" - - "github.com/containerd/cri-containerd/pkg/version" ) func main() { app := command.App() - app.Version = version.CRIContainerdVersion + "-TEST-with-cri-plugin" logrus.Warn("This customized containerd is only for CI test, DO NOT use it for distribution.") if err := app.Run(os.Args); err != nil { fmt.Fprintf(os.Stderr, "containerd: %s\n", err) diff --git a/cri.go b/cri.go index 4da21e910..bc1d22ce0 100644 --- a/cri.go +++ b/cri.go @@ -20,20 +20,28 @@ import ( "flag" "path/filepath" + "github.com/containerd/containerd" + "github.com/containerd/containerd/api/services/containers/v1" + "github.com/containerd/containerd/api/services/diff/v1" + "github.com/containerd/containerd/api/services/images/v1" + "github.com/containerd/containerd/api/services/leases/v1" + "github.com/containerd/containerd/api/services/namespaces/v1" + "github.com/containerd/containerd/api/services/tasks/v1" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/log" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/services" + "github.com/containerd/containerd/snapshots" imagespec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" criconfig "github.com/containerd/cri-containerd/pkg/config" + "github.com/containerd/cri-containerd/pkg/constants" "github.com/containerd/cri-containerd/pkg/server" ) -// criVersion is the CRI version supported by the CRI plugin. -const criVersion = "v1alpha2" - // TODO(random-liu): Use github.com/pkg/errors for our errors. // Register CRI service plugin func init() { @@ -43,13 +51,7 @@ func init() { ID: "cri", Config: &config, Requires: []plugin.Type{ - plugin.RuntimePlugin, - plugin.SnapshotPlugin, - plugin.TaskMonitorPlugin, - plugin.DiffPlugin, - plugin.MetadataPlugin, - plugin.ContentPlugin, - plugin.GCPlugin, + plugin.ServicePlugin, }, InitFn: initCRIService, }) @@ -57,7 +59,7 @@ func init() { func initCRIService(ic *plugin.InitContext) (interface{}, error) { ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()} - ic.Meta.Exports = map[string]string{"CRIVersion": criVersion} + ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion} ctx := ic.Context pluginConfig := ic.Config.(*criconfig.PluginConfig) c := criconfig.Config{ @@ -75,13 +77,26 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) { return nil, errors.Wrap(err, "failed to set glog level") } - s, err := server.NewCRIContainerdService(c) + servicesOpts, err := getServicesOpts(ic) + if err != nil { + return nil, errors.Wrap(err, "failed to get services") + } + + log.G(ctx).Info("Connect containerd service") + client, err := containerd.New( + "", + containerd.WithDefaultNamespace(constants.K8sContainerdNamespace), + containerd.WithServices(servicesOpts...), + ) + if err != nil { + return nil, errors.Wrap(err, "failed to create containerd client") + } + + s, err := server.NewCRIContainerdService(c, client) if err != nil { return nil, errors.Wrap(err, "failed to create CRI service") } - // Use a goroutine to initialize cri service. The reason is that currently - // cri service requires containerd to be initialize. go func() { if err := s.Run(); err != nil { log.G(ctx).WithError(err).Fatal("Failed to run CRI service") @@ -91,6 +106,58 @@ func initCRIService(ic *plugin.InitContext) (interface{}, error) { return s, nil } +// getServicesOpts get service options from plugin context. +func getServicesOpts(ic *plugin.InitContext) ([]containerd.ServicesOpt, error) { + plugins, err := ic.GetByType(plugin.ServicePlugin) + if err != nil { + return nil, errors.Wrap(err, "failed to get service plugin") + } + + opts := []containerd.ServicesOpt{ + containerd.WithEventService(ic.Events), + } + for s, fn := range map[string]func(interface{}) containerd.ServicesOpt{ + services.ContentService: func(s interface{}) containerd.ServicesOpt { + return containerd.WithContentStore(s.(content.Store)) + }, + services.ImagesService: func(s interface{}) containerd.ServicesOpt { + return containerd.WithImageService(s.(images.ImagesClient)) + }, + services.SnapshotsService: func(s interface{}) containerd.ServicesOpt { + return containerd.WithSnapshotters(s.(map[string]snapshots.Snapshotter)) + }, + services.ContainersService: func(s interface{}) containerd.ServicesOpt { + return containerd.WithContainerService(s.(containers.ContainersClient)) + }, + services.TasksService: func(s interface{}) containerd.ServicesOpt { + return containerd.WithTaskService(s.(tasks.TasksClient)) + }, + services.DiffService: func(s interface{}) containerd.ServicesOpt { + return containerd.WithDiffService(s.(diff.DiffClient)) + }, + services.NamespacesService: func(s interface{}) containerd.ServicesOpt { + return containerd.WithNamespaceService(s.(namespaces.NamespacesClient)) + }, + services.LeasesService: func(s interface{}) containerd.ServicesOpt { + return containerd.WithLeasesService(s.(leases.LeasesClient)) + }, + } { + p := plugins[s] + if p == nil { + return nil, errors.Errorf("service %q not found", s) + } + i, err := p.Instance() + if err != nil { + return nil, errors.Wrapf(err, "failed to get instance of service %q", s) + } + if i == nil { + return nil, errors.Errorf("instance of service %q not found", s) + } + opts = append(opts, fn(i)) + } + return opts, nil +} + // Set glog level. func setGLogLevel() error { l := logrus.GetLevel() diff --git a/integration/test_utils.go b/integration/test_utils.go index 1f29f4ef4..4bb10e9bf 100644 --- a/integration/test_utils.go +++ b/integration/test_utils.go @@ -33,13 +33,14 @@ import ( api "github.com/containerd/cri-containerd/pkg/api/v1" "github.com/containerd/cri-containerd/pkg/client" + "github.com/containerd/cri-containerd/pkg/constants" "github.com/containerd/cri-containerd/pkg/util" ) const ( timeout = 1 * time.Minute pauseImage = "gcr.io/google_containers/pause:3.0" // This is the same with default sandbox image. - k8sNamespace = "k8s.io" // This is the same with server.k8sContainerdNamespace. + k8sNamespace = constants.K8sContainerdNamespace containerdEndpoint = "/run/containerd/containerd.sock" ) diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go new file mode 100644 index 000000000..7b71b6f9a --- /dev/null +++ b/pkg/constants/constants.go @@ -0,0 +1,26 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package constants + +// TODO(random-liu): Merge annotations package into this package. + +const ( + // K8sContainerdNamespace is the namespace we use to connect containerd. + K8sContainerdNamespace = "k8s.io" + // CRIVersion is the CRI version supported by the CRI plugin. + CRIVersion = "v1alpha2" +) diff --git a/pkg/containerd/util/util.go b/pkg/containerd/util/util.go index 70f5f2edf..591c9187f 100644 --- a/pkg/containerd/util/util.go +++ b/pkg/containerd/util/util.go @@ -19,7 +19,10 @@ package util import ( "time" + "github.com/containerd/containerd/namespaces" "golang.org/x/net/context" + + "github.com/containerd/cri-containerd/pkg/constants" ) // deferCleanupTimeout is the default timeout for containerd cleanup operations @@ -28,8 +31,16 @@ const deferCleanupTimeout = 1 * time.Minute // DeferContext returns a context for containerd cleanup operations in defer. // A default timeout is applied to avoid cleanup operation pending forever. -// TODO(random-liu): Add namespace after local services are used. -// (containerd/containerd#2183) func DeferContext() (context.Context, context.CancelFunc) { - return context.WithTimeout(context.Background(), deferCleanupTimeout) + return context.WithTimeout(NamespacedContext(), deferCleanupTimeout) +} + +// NamespacedContext returns a context with kubernetes namespace set. +func NamespacedContext() context.Context { + return WithNamespace(context.Background()) +} + +// WithNamespace adds kubernetes namespace to the context. +func WithNamespace(ctx context.Context) context.Context { + return namespaces.WithNamespace(ctx, constants.K8sContainerdNamespace) } diff --git a/pkg/server/container_create.go b/pkg/server/container_create.go index 174188a84..5b4ddb857 100644 --- a/pkg/server/container_create.go +++ b/pkg/server/container_create.go @@ -29,7 +29,6 @@ import ( "github.com/containerd/containerd/contrib/seccomp" "github.com/containerd/containerd/linux/runctypes" "github.com/containerd/containerd/mount" - "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/oci" "github.com/containerd/typeurl" "github.com/davecgh/go-spew/spew" @@ -730,7 +729,7 @@ func setOCINamespaces(g *generate.Generator, namespaces *runtime.NamespaceOption // defaultRuntimeSpec returns a default runtime spec used in cri-containerd. func defaultRuntimeSpec(id string) (*runtimespec.Spec, error) { // GenerateSpec needs namespace. - ctx := namespaces.WithNamespace(context.Background(), k8sContainerdNamespace) + ctx := ctrdutil.NamespacedContext() spec, err := oci.GenerateSpec(ctx, nil, &containers.Container{ID: id}) if err != nil { return nil, err diff --git a/pkg/server/events.go b/pkg/server/events.go index 42d0fe8a1..1114aa1e0 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -19,15 +19,15 @@ package server import ( "errors" - "github.com/containerd/containerd" eventtypes "github.com/containerd/containerd/api/events" - "github.com/containerd/containerd/api/services/events/v1" containerdio "github.com/containerd/containerd/cio" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" "github.com/containerd/typeurl" "github.com/sirupsen/logrus" "golang.org/x/net/context" + ctrdutil "github.com/containerd/cri-containerd/pkg/containerd/util" "github.com/containerd/cri-containerd/pkg/store" containerstore "github.com/containerd/cri-containerd/pkg/store/container" sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox" @@ -48,6 +48,7 @@ type eventMonitor struct { // Create new event monitor. New event monitor will start subscribing containerd event. All events // happen after it should be monitored. func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store) *eventMonitor { + // event subscribe doesn't need namespace. ctx, cancel := context.WithCancel(context.Background()) return &eventMonitor{ containerStore: c, @@ -58,12 +59,12 @@ func newEventMonitor(c *containerstore.Store, s *sandboxstore.Store) *eventMonit } // subscribe starts to subscribe containerd events. -func (em *eventMonitor) subscribe(client *containerd.Client) { +func (em *eventMonitor) subscribe(subscriber events.Subscriber) { filters := []string{ `topic=="/tasks/exit"`, `topic=="/tasks/oom"`, } - em.ch, em.errCh = client.Subscribe(em.ctx, filters...) + em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...) } // start starts the event monitor which monitors and handles all container events. It returns @@ -98,6 +99,7 @@ func (em *eventMonitor) stop() { // handleEvent handles a containerd event. func (em *eventMonitor) handleEvent(evt *events.Envelope) { + ctx := ctrdutil.NamespacedContext() any, err := typeurl.UnmarshalAny(evt.Event) if err != nil { logrus.WithError(err).Errorf("Failed to convert event envelope %+v", evt) @@ -113,7 +115,7 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) { logrus.Infof("TaskExit event %+v", e) cntr, err := em.containerStore.Get(e.ContainerID) if err == nil { - handleContainerExit(e, cntr) + handleContainerExit(ctx, e, cntr) return } else if err != store.ErrNotExist { logrus.WithError(err).Errorf("Failed to get container %q", e.ContainerID) @@ -122,7 +124,7 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) { // Use GetAll to include sandbox in unknown state. sb, err := em.sandboxStore.GetAll(e.ContainerID) if err == nil { - handleSandboxExit(e, sb) + handleSandboxExit(ctx, e, sb) return } else if err != store.ErrNotExist { logrus.WithError(err).Errorf("Failed to get sandbox %q", e.ContainerID) @@ -151,13 +153,13 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) { } // handleContainerExit handles TaskExit event for container. -func handleContainerExit(e *eventtypes.TaskExit, cntr containerstore.Container) { +func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container) { if e.Pid != cntr.Status.Get().Pid { // Non-init process died, ignore the event. return } // Attach container IO so that `Delete` could cleanup the stream properly. - task, err := cntr.Container.Task(context.Background(), + task, err := cntr.Container.Task(ctx, func(*containerdio.FIFOSet) (containerdio.IO, error) { return cntr.IO, nil }, @@ -169,7 +171,7 @@ func handleContainerExit(e *eventtypes.TaskExit, cntr containerstore.Container) } } else { // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker - if _, err = task.Delete(context.Background()); err != nil { + if _, err = task.Delete(ctx); err != nil { // TODO(random-liu): [P0] Enqueue the event and retry. if !errdefs.IsNotFound(err) { logrus.WithError(err).Errorf("failed to stop container %q", e.ContainerID) @@ -199,13 +201,13 @@ func handleContainerExit(e *eventtypes.TaskExit, cntr containerstore.Container) } // handleSandboxExit handles TaskExit event for sandbox. -func handleSandboxExit(e *eventtypes.TaskExit, sb sandboxstore.Sandbox) { +func handleSandboxExit(ctx context.Context, e *eventtypes.TaskExit, sb sandboxstore.Sandbox) { if e.Pid != sb.Status.Get().Pid { // Non-init process died, ignore the event. return } // No stream attached to sandbox container. - task, err := sb.Container.Task(context.Background(), nil) + task, err := sb.Container.Task(ctx, nil) if err != nil { if !errdefs.IsNotFound(err) { logrus.WithError(err).Errorf("failed to load task for sandbox %q", e.ContainerID) @@ -213,7 +215,7 @@ func handleSandboxExit(e *eventtypes.TaskExit, sb sandboxstore.Sandbox) { } } else { // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker - if _, err = task.Delete(context.Background()); err != nil { + if _, err = task.Delete(ctx); err != nil { // TODO(random-liu): [P0] Enqueue the event and retry. if !errdefs.IsNotFound(err) { logrus.WithError(err).Errorf("failed to stop sandbox %q", e.ContainerID) diff --git a/pkg/server/instrumented_service.go b/pkg/server/instrumented_service.go index 37f57221d..8e33c40e9 100644 --- a/pkg/server/instrumented_service.go +++ b/pkg/server/instrumented_service.go @@ -24,10 +24,11 @@ import ( runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" api "github.com/containerd/cri-containerd/pkg/api/v1" + ctrdutil "github.com/containerd/cri-containerd/pkg/containerd/util" "github.com/containerd/cri-containerd/pkg/log" ) -// instrumentedService wraps service and logs each operation. +// instrumentedService wraps service with containerd namespace and logs. type instrumentedService struct { c *criContainerdService } @@ -59,7 +60,7 @@ func (in *instrumentedService) RunPodSandbox(ctx context.Context, r *runtime.Run logrus.Infof("RunPodSandbox for %+v returns sandbox id %q", r.GetConfig().GetMetadata(), res.GetPodSandboxId()) } }() - return in.c.RunPodSandbox(ctx, r) + return in.c.RunPodSandbox(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) ListPodSandbox(ctx context.Context, r *runtime.ListPodSandboxRequest) (res *runtime.ListPodSandboxResponse, err error) { @@ -74,7 +75,7 @@ func (in *instrumentedService) ListPodSandbox(ctx context.Context, r *runtime.Li log.Tracef("ListPodSandbox returns pod sandboxes %+v", res.GetItems()) } }() - return in.c.ListPodSandbox(ctx, r) + return in.c.ListPodSandbox(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) PodSandboxStatus(ctx context.Context, r *runtime.PodSandboxStatusRequest) (res *runtime.PodSandboxStatusResponse, err error) { @@ -89,7 +90,7 @@ func (in *instrumentedService) PodSandboxStatus(ctx context.Context, r *runtime. log.Tracef("PodSandboxStatus for %q returns status %+v", r.GetPodSandboxId(), res.GetStatus()) } }() - return in.c.PodSandboxStatus(ctx, r) + return in.c.PodSandboxStatus(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandboxRequest) (_ *runtime.StopPodSandboxResponse, err error) { @@ -104,7 +105,7 @@ func (in *instrumentedService) StopPodSandbox(ctx context.Context, r *runtime.St logrus.Infof("StopPodSandbox for %q returns successfully", r.GetPodSandboxId()) } }() - return in.c.StopPodSandbox(ctx, r) + return in.c.StopPodSandbox(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodSandboxRequest) (_ *runtime.RemovePodSandboxResponse, err error) { @@ -119,7 +120,7 @@ func (in *instrumentedService) RemovePodSandbox(ctx context.Context, r *runtime. logrus.Infof("RemovePodSandbox %q returns successfully", r.GetPodSandboxId()) } }() - return in.c.RemovePodSandbox(ctx, r) + return in.c.RemovePodSandbox(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) PortForward(ctx context.Context, r *runtime.PortForwardRequest) (res *runtime.PortForwardResponse, err error) { @@ -134,7 +135,7 @@ func (in *instrumentedService) PortForward(ctx context.Context, r *runtime.PortF logrus.Infof("Portforward for %q returns URL %q", r.GetPodSandboxId(), res.GetUrl()) } }() - return in.c.PortForward(ctx, r) + return in.c.PortForward(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) CreateContainer(ctx context.Context, r *runtime.CreateContainerRequest) (res *runtime.CreateContainerResponse, err error) { @@ -152,7 +153,7 @@ func (in *instrumentedService) CreateContainer(ctx context.Context, r *runtime.C r.GetPodSandboxId(), r.GetConfig().GetMetadata(), res.GetContainerId()) } }() - return in.c.CreateContainer(ctx, r) + return in.c.CreateContainer(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) StartContainer(ctx context.Context, r *runtime.StartContainerRequest) (_ *runtime.StartContainerResponse, err error) { @@ -167,7 +168,7 @@ func (in *instrumentedService) StartContainer(ctx context.Context, r *runtime.St logrus.Infof("StartContainer for %q returns successfully", r.GetContainerId()) } }() - return in.c.StartContainer(ctx, r) + return in.c.StartContainer(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) ListContainers(ctx context.Context, r *runtime.ListContainersRequest) (res *runtime.ListContainersResponse, err error) { @@ -183,7 +184,7 @@ func (in *instrumentedService) ListContainers(ctx context.Context, r *runtime.Li r.GetFilter(), res.GetContainers()) } }() - return in.c.ListContainers(ctx, r) + return in.c.ListContainers(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) ContainerStatus(ctx context.Context, r *runtime.ContainerStatusRequest) (res *runtime.ContainerStatusResponse, err error) { @@ -198,7 +199,7 @@ func (in *instrumentedService) ContainerStatus(ctx context.Context, r *runtime.C log.Tracef("ContainerStatus for %q returns status %+v", r.GetContainerId(), res.GetStatus()) } }() - return in.c.ContainerStatus(ctx, r) + return in.c.ContainerStatus(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) StopContainer(ctx context.Context, r *runtime.StopContainerRequest) (res *runtime.StopContainerResponse, err error) { @@ -213,7 +214,7 @@ func (in *instrumentedService) StopContainer(ctx context.Context, r *runtime.Sto logrus.Infof("StopContainer for %q returns successfully", r.GetContainerId()) } }() - return in.c.StopContainer(ctx, r) + return in.c.StopContainer(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) RemoveContainer(ctx context.Context, r *runtime.RemoveContainerRequest) (res *runtime.RemoveContainerResponse, err error) { @@ -228,7 +229,7 @@ func (in *instrumentedService) RemoveContainer(ctx context.Context, r *runtime.R logrus.Infof("RemoveContainer for %q returns successfully", r.GetContainerId()) } }() - return in.c.RemoveContainer(ctx, r) + return in.c.RemoveContainer(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (res *runtime.ExecSyncResponse, err error) { @@ -245,7 +246,7 @@ func (in *instrumentedService) ExecSync(ctx context.Context, r *runtime.ExecSync res.GetStdout(), res.GetStderr()) } }() - return in.c.ExecSync(ctx, r) + return in.c.ExecSync(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) Exec(ctx context.Context, r *runtime.ExecRequest) (res *runtime.ExecResponse, err error) { @@ -261,7 +262,7 @@ func (in *instrumentedService) Exec(ctx context.Context, r *runtime.ExecRequest) logrus.Infof("Exec for %q returns URL %q", r.GetContainerId(), res.GetUrl()) } }() - return in.c.Exec(ctx, r) + return in.c.Exec(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) Attach(ctx context.Context, r *runtime.AttachRequest) (res *runtime.AttachResponse, err error) { @@ -276,7 +277,7 @@ func (in *instrumentedService) Attach(ctx context.Context, r *runtime.AttachRequ logrus.Infof("Attach for %q returns URL %q", r.GetContainerId(), res.Url) } }() - return in.c.Attach(ctx, r) + return in.c.Attach(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) UpdateContainerResources(ctx context.Context, r *runtime.UpdateContainerResourcesRequest) (res *runtime.UpdateContainerResourcesResponse, err error) { @@ -291,7 +292,7 @@ func (in *instrumentedService) UpdateContainerResources(ctx context.Context, r * logrus.Infof("UpdateContainerResources for %q returns successfully", r.GetContainerId()) } }() - return in.c.UpdateContainerResources(ctx, r) + return in.c.UpdateContainerResources(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullImageRequest) (res *runtime.PullImageResponse, err error) { @@ -307,7 +308,7 @@ func (in *instrumentedService) PullImage(ctx context.Context, r *runtime.PullIma r.GetImage().GetImage(), res.GetImageRef()) } }() - return in.c.PullImage(ctx, r) + return in.c.PullImage(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListImagesRequest) (res *runtime.ListImagesResponse, err error) { @@ -323,7 +324,7 @@ func (in *instrumentedService) ListImages(ctx context.Context, r *runtime.ListIm r.GetFilter(), res.GetImages()) } }() - return in.c.ListImages(ctx, r) + return in.c.ListImages(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.ImageStatusRequest) (res *runtime.ImageStatusResponse, err error) { @@ -339,7 +340,7 @@ func (in *instrumentedService) ImageStatus(ctx context.Context, r *runtime.Image r.GetImage().GetImage(), res.GetImage()) } }() - return in.c.ImageStatus(ctx, r) + return in.c.ImageStatus(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.RemoveImageRequest) (_ *runtime.RemoveImageResponse, err error) { @@ -354,7 +355,7 @@ func (in *instrumentedService) RemoveImage(ctx context.Context, r *runtime.Remov logrus.Infof("RemoveImage %q returns successfully", r.GetImage().GetImage()) } }() - return in.c.RemoveImage(ctx, r) + return in.c.RemoveImage(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.ImageFsInfoRequest) (res *runtime.ImageFsInfoResponse, err error) { @@ -369,7 +370,7 @@ func (in *instrumentedService) ImageFsInfo(ctx context.Context, r *runtime.Image logrus.Debugf("ImageFsInfo returns filesystem info %+v", res.ImageFilesystems) } }() - return in.c.ImageFsInfo(ctx, r) + return in.c.ImageFsInfo(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) ContainerStats(ctx context.Context, r *runtime.ContainerStatsRequest) (res *runtime.ContainerStatsResponse, err error) { @@ -384,7 +385,7 @@ func (in *instrumentedService) ContainerStats(ctx context.Context, r *runtime.Co logrus.Debugf("ContainerStats for %q returns stats %+v", r.GetContainerId(), res.GetStats()) } }() - return in.c.ContainerStats(ctx, r) + return in.c.ContainerStats(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) ListContainerStats(ctx context.Context, r *runtime.ListContainerStatsRequest) (res *runtime.ListContainerStatsResponse, err error) { @@ -399,7 +400,7 @@ func (in *instrumentedService) ListContainerStats(ctx context.Context, r *runtim log.Tracef("ListContainerStats returns stats %+v", res.GetStats()) } }() - return in.c.ListContainerStats(ctx, r) + return in.c.ListContainerStats(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) Status(ctx context.Context, r *runtime.StatusRequest) (res *runtime.StatusResponse, err error) { @@ -414,7 +415,7 @@ func (in *instrumentedService) Status(ctx context.Context, r *runtime.StatusRequ log.Tracef("Status returns status %+v", res.GetStatus()) } }() - return in.c.Status(ctx, r) + return in.c.Status(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) Version(ctx context.Context, r *runtime.VersionRequest) (res *runtime.VersionResponse, err error) { @@ -429,7 +430,7 @@ func (in *instrumentedService) Version(ctx context.Context, r *runtime.VersionRe log.Tracef("Version returns %+v", res) } }() - return in.c.Version(ctx, r) + return in.c.Version(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) UpdateRuntimeConfig(ctx context.Context, r *runtime.UpdateRuntimeConfigRequest) (res *runtime.UpdateRuntimeConfigResponse, err error) { @@ -444,7 +445,7 @@ func (in *instrumentedService) UpdateRuntimeConfig(ctx context.Context, r *runti logrus.Debug("UpdateRuntimeConfig returns returns successfully") } }() - return in.c.UpdateRuntimeConfig(ctx, r) + return in.c.UpdateRuntimeConfig(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) LoadImage(ctx context.Context, r *api.LoadImageRequest) (res *api.LoadImageResponse, err error) { @@ -459,7 +460,7 @@ func (in *instrumentedService) LoadImage(ctx context.Context, r *api.LoadImageRe logrus.Debugf("LoadImage returns images %+v", res.GetImages()) } }() - return in.c.LoadImage(ctx, r) + return in.c.LoadImage(ctrdutil.WithNamespace(ctx), r) } func (in *instrumentedService) ReopenContainerLog(ctx context.Context, r *runtime.ReopenContainerLogRequest) (res *runtime.ReopenContainerLogResponse, err error) { @@ -474,5 +475,5 @@ func (in *instrumentedService) ReopenContainerLog(ctx context.Context, r *runtim logrus.Debugf("ReopenContainerLog for %q returns successfully", r.GetContainerId()) } }() - return in.c.ReopenContainerLog(ctx, r) + return in.c.ReopenContainerLog(ctrdutil.WithNamespace(ctx), r) } diff --git a/pkg/server/sandbox_portforward.go b/pkg/server/sandbox_portforward.go index 23ed9ecc0..ab78f328a 100644 --- a/pkg/server/sandbox_portforward.go +++ b/pkg/server/sandbox_portforward.go @@ -28,6 +28,7 @@ import ( "golang.org/x/net/context" runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" + ctrdutil "github.com/containerd/cri-containerd/pkg/containerd/util" sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox" ) @@ -53,7 +54,7 @@ func (c *criContainerdService) portForward(id string, port int32, stream io.Read if err != nil { return fmt.Errorf("failed to find sandbox %q in store: %v", id, err) } - t, err := s.Container.Task(context.Background(), nil) + t, err := s.Container.Task(ctrdutil.NamespacedContext(), nil) if err != nil { return fmt.Errorf("failed to get sandbox container task: %v", err) } diff --git a/pkg/server/service.go b/pkg/server/service.go index f5930800e..fe9a67c09 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -29,7 +29,6 @@ import ( runcseccomp "github.com/opencontainers/runc/libcontainer/seccomp" "github.com/opencontainers/selinux/go-selinux" "github.com/sirupsen/logrus" - "golang.org/x/net/context" "google.golang.org/grpc" runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" "k8s.io/kubernetes/pkg/kubelet/server/streaming" @@ -37,6 +36,7 @@ import ( api "github.com/containerd/cri-containerd/pkg/api/v1" "github.com/containerd/cri-containerd/pkg/atomic" criconfig "github.com/containerd/cri-containerd/pkg/config" + ctrdutil "github.com/containerd/cri-containerd/pkg/containerd/util" osinterface "github.com/containerd/cri-containerd/pkg/os" "github.com/containerd/cri-containerd/pkg/registrar" containerstore "github.com/containerd/cri-containerd/pkg/store/container" @@ -45,9 +45,6 @@ import ( snapshotstore "github.com/containerd/cri-containerd/pkg/store/snapshot" ) -// k8sContainerdNamespace is the namespace we use to connect containerd. -const k8sContainerdNamespace = "k8s.io" - // grpcServices are all the grpc services provided by cri containerd. type grpcServices interface { runtime.RuntimeServiceServer @@ -104,10 +101,11 @@ type criContainerdService struct { } // NewCRIContainerdService returns a new instance of CRIContainerdService -func NewCRIContainerdService(config criconfig.Config) (CRIContainerdService, error) { +func NewCRIContainerdService(config criconfig.Config, client *containerd.Client) (CRIContainerdService, error) { var err error c := &criContainerdService{ config: config, + client: client, apparmorEnabled: runcapparmor.IsEnabled(), seccompEnabled: runcseccomp.IsEnabled(), os: osinterface.RealOS{}, @@ -159,23 +157,11 @@ func (c *criContainerdService) Register(s *grpc.Server) error { // Run starts the cri-containerd service. func (c *criContainerdService) Run() error { - logrus.Info("Start cri-containerd service") - - // Connect containerd service here, to get rid of the containerd dependency - // in `NewCRIContainerdService`. This is required for plugin mode bootstrapping. - logrus.Info("Connect containerd service") - client, err := containerd.New(c.config.ContainerdEndpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace)) - if err != nil { - return fmt.Errorf("failed to initialize containerd client with endpoint %q: %v", - c.config.ContainerdEndpoint, err) - } - c.client = client - logrus.Info("Start subscribing containerd event") c.eventMonitor.subscribe(c.client) logrus.Infof("Start recovering state") - if err := c.recover(context.Background()); err != nil { + if err := c.recover(ctrdutil.NamespacedContext()); err != nil { return fmt.Errorf("failed to recover state: %v", err) } diff --git a/pkg/server/snapshots.go b/pkg/server/snapshots.go index 846f1d75f..4719e3e1a 100644 --- a/pkg/server/snapshots.go +++ b/pkg/server/snapshots.go @@ -25,6 +25,7 @@ import ( snapshot "github.com/containerd/containerd/snapshots" "github.com/sirupsen/logrus" + ctrdutil "github.com/containerd/cri-containerd/pkg/containerd/util" snapshotstore "github.com/containerd/cri-containerd/pkg/store/snapshot" ) @@ -68,13 +69,14 @@ func (s *snapshotsSyncer) start() { // sync updates all snapshots stats. func (s *snapshotsSyncer) sync() error { + ctx := ctrdutil.NamespacedContext() start := time.Now().UnixNano() var snapshots []snapshot.Info // Do not call `Usage` directly in collect function, because // `Usage` takes time, we don't want `Walk` to hold read lock // of snapshot metadata store for too long time. // TODO(random-liu): Set timeout for the following 2 contexts. - if err := s.snapshotter.Walk(context.Background(), func(ctx context.Context, info snapshot.Info) error { + if err := s.snapshotter.Walk(ctx, func(ctx context.Context, info snapshot.Info) error { snapshots = append(snapshots, info) return nil }); err != nil { @@ -96,7 +98,7 @@ func (s *snapshotsSyncer) sync() error { Kind: info.Kind, Timestamp: time.Now().UnixNano(), } - usage, err := s.snapshotter.Usage(context.Background(), info.Name) + usage, err := s.snapshotter.Usage(ctx, info.Name) if err != nil { if !errdefs.IsNotFound(err) { logrus.WithError(err).Errorf("Failed to get usage for snapshot %q", info.Name) diff --git a/pkg/server/status.go b/pkg/server/status.go index 018c92b12..b52842a22 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -25,29 +25,17 @@ import ( runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" ) -const ( - // runtimeNotReadyReason is the reason reported when runtime is not ready. - runtimeNotReadyReason = "ContainerdNotReady" - // networkNotReadyReason is the reason reported when network is not ready. - networkNotReadyReason = "NetworkPluginNotReady" -) +// networkNotReadyReason is the reason reported when network is not ready. +const networkNotReadyReason = "NetworkPluginNotReady" // Status returns the status of the runtime. func (c *criContainerdService) Status(ctx context.Context, r *runtime.StatusRequest) (*runtime.StatusResponse, error) { + // As a containerd plugin, if CRI plugin is serving request, + // containerd must be ready. runtimeCondition := &runtime.RuntimeCondition{ Type: runtime.RuntimeReady, Status: true, } - serving, err := c.client.IsServing(ctx) - if err != nil || !serving { - runtimeCondition.Status = false - runtimeCondition.Reason = runtimeNotReadyReason - if err != nil { - runtimeCondition.Message = fmt.Sprintf("Containerd healthcheck returns error: %v", err) - } else { - runtimeCondition.Message = "Containerd grpc server is not serving" - } - } networkCondition := &runtime.RuntimeCondition{ Type: runtime.NetworkReady, Status: true, diff --git a/pkg/server/streaming.go b/pkg/server/streaming.go index d089db848..e721f6532 100644 --- a/pkg/server/streaming.go +++ b/pkg/server/streaming.go @@ -22,12 +22,13 @@ import ( "math" "net" - "golang.org/x/net/context" k8snet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/remotecommand" "k8s.io/kubernetes/pkg/kubelet/server/streaming" "k8s.io/utils/exec" + + ctrdutil "github.com/containerd/cri-containerd/pkg/containerd/util" ) func newStreamServer(c *criContainerdService, addr, port string) (streaming.Server, error) { @@ -56,7 +57,7 @@ func newStreamRuntime(c *criContainerdService) streaming.Runtime { // returns non-zero exit code. func (s *streamRuntime) Exec(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { - exitCode, err := s.c.execInContainer(context.Background(), containerID, execOptions{ + exitCode, err := s.c.execInContainer(ctrdutil.NamespacedContext(), containerID, execOptions{ cmd: cmd, stdin: stdin, stdout: stdout, @@ -78,7 +79,7 @@ func (s *streamRuntime) Exec(containerID string, cmd []string, stdin io.Reader, func (s *streamRuntime) Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { - return s.c.attachContainer(context.Background(), containerID, in, out, err, tty, resize) + return s.c.attachContainer(ctrdutil.NamespacedContext(), containerID, in, out, err, tty, resize) } func (s *streamRuntime) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error { diff --git a/pkg/server/version.go b/pkg/server/version.go index 365c522cc..921e238ef 100644 --- a/pkg/server/version.go +++ b/pkg/server/version.go @@ -17,10 +17,11 @@ limitations under the License. package server import ( - "fmt" - + "github.com/containerd/containerd/version" "golang.org/x/net/context" runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" + + "github.com/containerd/cri-containerd/pkg/constants" ) const ( @@ -32,15 +33,10 @@ const ( // Version returns the runtime name, runtime version and runtime API version. func (c *criContainerdService) Version(ctx context.Context, r *runtime.VersionRequest) (*runtime.VersionResponse, error) { - resp, err := c.client.Version(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get containerd version: %v", err) - } return &runtime.VersionResponse{ - Version: kubeAPIVersion, - RuntimeName: containerName, - RuntimeVersion: resp.Version, - // Containerd doesn't have an api version use version instead. - RuntimeApiVersion: resp.Version, + Version: kubeAPIVersion, + RuntimeName: containerName, + RuntimeVersion: version.Version, + RuntimeApiVersion: constants.CRIVersion, }, nil } diff --git a/pkg/version/version_test.go b/pkg/version/version_test.go index 0a40304b6..a494a8841 100644 --- a/pkg/version/version_test.go +++ b/pkg/version/version_test.go @@ -28,6 +28,4 @@ func TestValidateSemver(t *testing.T) { assert.NotNil(err) err = validateSemver("0.0.0-1-gdf6a1cc-dirty") assert.Nil(err) - err = validateSemver(CRIContainerdVersion) - assert.Nil(err) }