From a5dfbfcf5acaa1692b266b7e81ce05bada003b2b Mon Sep 17 00:00:00 2001 From: Eric Lin Date: Mon, 14 Mar 2022 21:54:31 +0000 Subject: [PATCH] cri: load sandboxes/containers/images in parallel Parallelizing them decreases loading duration. Time to complete recover(): * Without competing IOs + without opt: 21s * Without competing IOs + with opt: 14s * Competing IOs + without opt: 3m44s * Competing IOs + with opt: 33s Signed-off-by: Eric Lin --- pkg/cri/server/restart.go | 122 +++++++++++++++++++++++--------------- 1 file changed, 74 insertions(+), 48 deletions(-) diff --git a/pkg/cri/server/restart.go b/pkg/cri/server/restart.go index 66247c74e..55a725e49 100644 --- a/pkg/cri/server/restart.go +++ b/pkg/cri/server/restart.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" goruntime "runtime" + "sync" "time" "github.com/containerd/containerd" @@ -31,6 +32,7 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/platforms" "github.com/containerd/typeurl" + "golang.org/x/sync/errgroup" runtime "k8s.io/cri-api/pkg/apis/runtime/v1" cio "github.com/containerd/containerd/pkg/cri/io" @@ -56,19 +58,28 @@ func (c *criService) recover(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to list sandbox containers: %w", err) } + + eg, ctx2 := errgroup.WithContext(ctx) for _, sandbox := range sandboxes { - sb, err := c.loadSandbox(ctx, sandbox) - if err != nil { - log.G(ctx).WithError(err).Errorf("Failed to load sandbox %q", sandbox.ID()) - continue - } - log.G(ctx).Debugf("Loaded sandbox %+v", sb) - if err := c.sandboxStore.Add(sb); err != nil { - return fmt.Errorf("failed to add sandbox %q to store: %w", sandbox.ID(), err) - } - if err := c.sandboxNameIndex.Reserve(sb.Name, sb.ID); err != nil { - return fmt.Errorf("failed to reserve sandbox name %q: %w", sb.Name, err) - } + sandbox := sandbox + eg.Go(func() error { + sb, err := c.loadSandbox(ctx2, sandbox) + if err != nil { + log.G(ctx2).WithError(err).Errorf("Failed to load sandbox %q", sandbox.ID()) + return nil + } + log.G(ctx2).Debugf("Loaded sandbox %+v", sb) + if err := c.sandboxStore.Add(sb); err != nil { + return fmt.Errorf("failed to add sandbox %q to store: %w", sandbox.ID(), err) + } + if err := c.sandboxNameIndex.Reserve(sb.Name, sb.ID); err != nil { + return fmt.Errorf("failed to reserve sandbox name %q: %w", sb.Name, err) + } + return nil + }) + } + if err := eg.Wait(); err != nil { + return err } // Recover all containers. @@ -76,19 +87,27 @@ func (c *criService) recover(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to list containers: %w", err) } + eg, ctx2 = errgroup.WithContext(ctx) for _, container := range containers { - cntr, err := c.loadContainer(ctx, container) - if err != nil { - log.G(ctx).WithError(err).Errorf("Failed to load container %q", container.ID()) - continue - } - log.G(ctx).Debugf("Loaded container %+v", cntr) - if err := c.containerStore.Add(cntr); err != nil { - return fmt.Errorf("failed to add container %q to store: %w", container.ID(), err) - } - if err := c.containerNameIndex.Reserve(cntr.Name, cntr.ID); err != nil { - return fmt.Errorf("failed to reserve container name %q: %w", cntr.Name, err) - } + container := container + eg.Go(func() error { + cntr, err := c.loadContainer(ctx2, container) + if err != nil { + log.G(ctx2).WithError(err).Errorf("Failed to load container %q", container.ID()) + return nil + } + log.G(ctx2).Debugf("Loaded container %+v", cntr) + if err := c.containerStore.Add(cntr); err != nil { + return fmt.Errorf("failed to add container %q to store: %w", container.ID(), err) + } + if err := c.containerNameIndex.Reserve(cntr.Name, cntr.ID); err != nil { + return fmt.Errorf("failed to reserve container name %q: %w", cntr.Name, err) + } + return nil + }) + } + if err := eg.Wait(); err != nil { + return err } // Recover all images. @@ -427,32 +446,39 @@ func (c *criService) loadSandbox(ctx context.Context, cntr containerd.Container) // loadImages loads images from containerd. func (c *criService) loadImages(ctx context.Context, cImages []containerd.Image) { snapshotter := c.config.ContainerdConfig.Snapshotter + var wg sync.WaitGroup for _, i := range cImages { - ok, _, _, _, err := containerdimages.Check(ctx, i.ContentStore(), i.Target(), platforms.Default()) - if err != nil { - log.G(ctx).WithError(err).Errorf("Failed to check image content readiness for %q", i.Name()) - continue - } - if !ok { - log.G(ctx).Warnf("The image content readiness for %q is not ok", i.Name()) - continue - } - // Checking existence of top-level snapshot for each image being recovered. - unpacked, err := i.IsUnpacked(ctx, snapshotter) - if err != nil { - log.G(ctx).WithError(err).Warnf("Failed to check whether image is unpacked for image %s", i.Name()) - continue - } - if !unpacked { - log.G(ctx).Warnf("The image %s is not unpacked.", i.Name()) - // TODO(random-liu): Consider whether we should try unpack here. - } - if err := c.updateImage(ctx, i.Name()); err != nil { - log.G(ctx).WithError(err).Warnf("Failed to update reference for image %q", i.Name()) - continue - } - log.G(ctx).Debugf("Loaded image %q", i.Name()) + wg.Add(1) + i := i + go func() { + defer wg.Done() + ok, _, _, _, err := containerdimages.Check(ctx, i.ContentStore(), i.Target(), platforms.Default()) + if err != nil { + log.G(ctx).WithError(err).Errorf("Failed to check image content readiness for %q", i.Name()) + return + } + if !ok { + log.G(ctx).Warnf("The image content readiness for %q is not ok", i.Name()) + return + } + // Checking existence of top-level snapshot for each image being recovered. + unpacked, err := i.IsUnpacked(ctx, snapshotter) + if err != nil { + log.G(ctx).WithError(err).Warnf("Failed to check whether image is unpacked for image %s", i.Name()) + return + } + if !unpacked { + log.G(ctx).Warnf("The image %s is not unpacked.", i.Name()) + // TODO(random-liu): Consider whether we should try unpack here. + } + if err := c.updateImage(ctx, i.Name()); err != nil { + log.G(ctx).WithError(err).Warnf("Failed to update reference for image %q", i.Name()) + return + } + log.G(ctx).Debugf("Loaded image %q", i.Name()) + }() } + wg.Wait() } func cleanupOrphanedIDDirs(ctx context.Context, cntrs []containerd.Container, base string) error {