Merge pull request #6680 from linxiulei/fast_load
Optimize loading performance for cri recover
This commit is contained in:
commit
40a16a02b6
@ -22,6 +22,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
goruntime "runtime"
|
goruntime "runtime"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/containerd"
|
"github.com/containerd/containerd"
|
||||||
@ -31,6 +32,7 @@ import (
|
|||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
"github.com/containerd/containerd/platforms"
|
"github.com/containerd/containerd/platforms"
|
||||||
"github.com/containerd/typeurl"
|
"github.com/containerd/typeurl"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||||
|
|
||||||
cio "github.com/containerd/containerd/pkg/cri/io"
|
cio "github.com/containerd/containerd/pkg/cri/io"
|
||||||
@ -56,19 +58,28 @@ func (c *criService) recover(ctx context.Context) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to list sandbox containers: %w", err)
|
return fmt.Errorf("failed to list sandbox containers: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
eg, ctx2 := errgroup.WithContext(ctx)
|
||||||
for _, sandbox := range sandboxes {
|
for _, sandbox := range sandboxes {
|
||||||
sb, err := c.loadSandbox(ctx, sandbox)
|
sandbox := sandbox
|
||||||
if err != nil {
|
eg.Go(func() error {
|
||||||
log.G(ctx).WithError(err).Errorf("Failed to load sandbox %q", sandbox.ID())
|
sb, err := c.loadSandbox(ctx2, sandbox)
|
||||||
continue
|
if err != nil {
|
||||||
}
|
log.G(ctx2).WithError(err).Errorf("Failed to load sandbox %q", sandbox.ID())
|
||||||
log.G(ctx).Debugf("Loaded sandbox %+v", sb)
|
return nil
|
||||||
if err := c.sandboxStore.Add(sb); err != nil {
|
}
|
||||||
return fmt.Errorf("failed to add sandbox %q to store: %w", sandbox.ID(), err)
|
log.G(ctx2).Debugf("Loaded sandbox %+v", sb)
|
||||||
}
|
if err := c.sandboxStore.Add(sb); err != nil {
|
||||||
if err := c.sandboxNameIndex.Reserve(sb.Name, sb.ID); err != nil {
|
return fmt.Errorf("failed to add sandbox %q to store: %w", sandbox.ID(), err)
|
||||||
return fmt.Errorf("failed to reserve sandbox name %q: %w", sb.Name, err)
|
}
|
||||||
}
|
if err := c.sandboxNameIndex.Reserve(sb.Name, sb.ID); err != nil {
|
||||||
|
return fmt.Errorf("failed to reserve sandbox name %q: %w", sb.Name, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Recover all containers.
|
// Recover all containers.
|
||||||
@ -76,19 +87,27 @@ func (c *criService) recover(ctx context.Context) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to list containers: %w", err)
|
return fmt.Errorf("failed to list containers: %w", err)
|
||||||
}
|
}
|
||||||
|
eg, ctx2 = errgroup.WithContext(ctx)
|
||||||
for _, container := range containers {
|
for _, container := range containers {
|
||||||
cntr, err := c.loadContainer(ctx, container)
|
container := container
|
||||||
if err != nil {
|
eg.Go(func() error {
|
||||||
log.G(ctx).WithError(err).Errorf("Failed to load container %q", container.ID())
|
cntr, err := c.loadContainer(ctx2, container)
|
||||||
continue
|
if err != nil {
|
||||||
}
|
log.G(ctx2).WithError(err).Errorf("Failed to load container %q", container.ID())
|
||||||
log.G(ctx).Debugf("Loaded container %+v", cntr)
|
return nil
|
||||||
if err := c.containerStore.Add(cntr); err != nil {
|
}
|
||||||
return fmt.Errorf("failed to add container %q to store: %w", container.ID(), err)
|
log.G(ctx2).Debugf("Loaded container %+v", cntr)
|
||||||
}
|
if err := c.containerStore.Add(cntr); err != nil {
|
||||||
if err := c.containerNameIndex.Reserve(cntr.Name, cntr.ID); err != nil {
|
return fmt.Errorf("failed to add container %q to store: %w", container.ID(), err)
|
||||||
return fmt.Errorf("failed to reserve container name %q: %w", cntr.Name, err)
|
}
|
||||||
}
|
if err := c.containerNameIndex.Reserve(cntr.Name, cntr.ID); err != nil {
|
||||||
|
return fmt.Errorf("failed to reserve container name %q: %w", cntr.Name, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Recover all images.
|
// Recover all images.
|
||||||
@ -427,32 +446,39 @@ func (c *criService) loadSandbox(ctx context.Context, cntr containerd.Container)
|
|||||||
// loadImages loads images from containerd.
|
// loadImages loads images from containerd.
|
||||||
func (c *criService) loadImages(ctx context.Context, cImages []containerd.Image) {
|
func (c *criService) loadImages(ctx context.Context, cImages []containerd.Image) {
|
||||||
snapshotter := c.config.ContainerdConfig.Snapshotter
|
snapshotter := c.config.ContainerdConfig.Snapshotter
|
||||||
|
var wg sync.WaitGroup
|
||||||
for _, i := range cImages {
|
for _, i := range cImages {
|
||||||
ok, _, _, _, err := containerdimages.Check(ctx, i.ContentStore(), i.Target(), platforms.Default())
|
wg.Add(1)
|
||||||
if err != nil {
|
i := i
|
||||||
log.G(ctx).WithError(err).Errorf("Failed to check image content readiness for %q", i.Name())
|
go func() {
|
||||||
continue
|
defer wg.Done()
|
||||||
}
|
ok, _, _, _, err := containerdimages.Check(ctx, i.ContentStore(), i.Target(), platforms.Default())
|
||||||
if !ok {
|
if err != nil {
|
||||||
log.G(ctx).Warnf("The image content readiness for %q is not ok", i.Name())
|
log.G(ctx).WithError(err).Errorf("Failed to check image content readiness for %q", i.Name())
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
// Checking existence of top-level snapshot for each image being recovered.
|
if !ok {
|
||||||
unpacked, err := i.IsUnpacked(ctx, snapshotter)
|
log.G(ctx).Warnf("The image content readiness for %q is not ok", i.Name())
|
||||||
if err != nil {
|
return
|
||||||
log.G(ctx).WithError(err).Warnf("Failed to check whether image is unpacked for image %s", i.Name())
|
}
|
||||||
continue
|
// Checking existence of top-level snapshot for each image being recovered.
|
||||||
}
|
unpacked, err := i.IsUnpacked(ctx, snapshotter)
|
||||||
if !unpacked {
|
if err != nil {
|
||||||
log.G(ctx).Warnf("The image %s is not unpacked.", i.Name())
|
log.G(ctx).WithError(err).Warnf("Failed to check whether image is unpacked for image %s", i.Name())
|
||||||
// TODO(random-liu): Consider whether we should try unpack here.
|
return
|
||||||
}
|
}
|
||||||
if err := c.updateImage(ctx, i.Name()); err != nil {
|
if !unpacked {
|
||||||
log.G(ctx).WithError(err).Warnf("Failed to update reference for image %q", i.Name())
|
log.G(ctx).Warnf("The image %s is not unpacked.", i.Name())
|
||||||
continue
|
// TODO(random-liu): Consider whether we should try unpack here.
|
||||||
}
|
}
|
||||||
log.G(ctx).Debugf("Loaded image %q", i.Name())
|
if err := c.updateImage(ctx, i.Name()); err != nil {
|
||||||
|
log.G(ctx).WithError(err).Warnf("Failed to update reference for image %q", i.Name())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.G(ctx).Debugf("Loaded image %q", i.Name())
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func cleanupOrphanedIDDirs(ctx context.Context, cntrs []containerd.Container, base string) error {
|
func cleanupOrphanedIDDirs(ctx context.Context, cntrs []containerd.Container, base string) error {
|
||||||
|
Loading…
Reference in New Issue
Block a user