Merge pull request #10376 from linxiulei/concurrent_podstats
cri: optimize ListPodSandboxStats with parallelism
This commit is contained in:
commit
78d3e205a5
@ -20,6 +20,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox"
|
sandboxstore "github.com/containerd/containerd/v2/internal/cri/store/sandbox"
|
||||||
"github.com/containerd/errdefs"
|
"github.com/containerd/errdefs"
|
||||||
@ -34,20 +35,33 @@ func (c *criService) ListPodSandboxStats(
|
|||||||
r *runtime.ListPodSandboxStatsRequest,
|
r *runtime.ListPodSandboxStatsRequest,
|
||||||
) (*runtime.ListPodSandboxStatsResponse, error) {
|
) (*runtime.ListPodSandboxStatsResponse, error) {
|
||||||
sandboxes := c.sandboxesForListPodSandboxStatsRequest(r)
|
sandboxes := c.sandboxesForListPodSandboxStatsRequest(r)
|
||||||
|
stats, errs := make([]*runtime.PodSandboxStats, len(sandboxes)), make([]error, len(sandboxes))
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i, sandbox := range sandboxes {
|
||||||
|
i := i
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
sandboxStats, err := c.podSandboxStats(ctx, sandbox)
|
||||||
|
switch {
|
||||||
|
case errdefs.IsUnavailable(err), errdefs.IsNotFound(err):
|
||||||
|
log.G(ctx).WithField("podsandboxid", sandbox.ID).WithError(err).Debug("failed to get pod sandbox stats, this is likely a transient error")
|
||||||
|
case errors.Is(err, ttrpc.ErrClosed):
|
||||||
|
log.G(ctx).WithField("podsandboxid", sandbox.ID).WithError(err).Debug("failed to get pod sandbox stats, connection closed")
|
||||||
|
case err != nil:
|
||||||
|
errs[i] = fmt.Errorf("failed to decode sandbox container metrics for sandbox %q: %w", sandbox.ID, err)
|
||||||
|
default:
|
||||||
|
stats[i] = sandboxStats
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
var errs []error
|
|
||||||
podSandboxStats := new(runtime.ListPodSandboxStatsResponse)
|
podSandboxStats := new(runtime.ListPodSandboxStatsResponse)
|
||||||
for _, sandbox := range sandboxes {
|
for _, stat := range stats {
|
||||||
sandboxStats, err := c.podSandboxStats(ctx, sandbox)
|
if stat != nil {
|
||||||
switch {
|
podSandboxStats.Stats = append(podSandboxStats.Stats, stat)
|
||||||
case errdefs.IsUnavailable(err), errdefs.IsNotFound(err):
|
|
||||||
log.G(ctx).WithField("podsandboxid", sandbox.ID).Debugf("failed to get pod sandbox stats, this is likely a transient error: %v", err)
|
|
||||||
case errors.Is(err, ttrpc.ErrClosed):
|
|
||||||
log.G(ctx).WithField("podsandboxid", sandbox.ID).Debugf("failed to get pod sandbox stats, connection closed: %v", err)
|
|
||||||
case err != nil:
|
|
||||||
errs = append(errs, fmt.Errorf("failed to decode sandbox container metrics for sandbox %q: %w", sandbox.ID, err))
|
|
||||||
default:
|
|
||||||
podSandboxStats.Stats = append(podSandboxStats.Stats, sandboxStats)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user