|
|
|
|
@@ -22,12 +22,15 @@ import (
|
|
|
|
|
"crypto/x509"
|
|
|
|
|
"encoding/base64"
|
|
|
|
|
"fmt"
|
|
|
|
|
"io"
|
|
|
|
|
"net"
|
|
|
|
|
"net/http"
|
|
|
|
|
"net/url"
|
|
|
|
|
"os"
|
|
|
|
|
"path/filepath"
|
|
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
|
"sync/atomic"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/containerd/containerd"
|
|
|
|
|
@@ -98,10 +101,20 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
|
|
|
|
|
if ref != imageRef {
|
|
|
|
|
log.G(ctx).Debugf("PullImage using normalized image ref: %q", ref)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
imagePullProgressTimeout, err := time.ParseDuration(c.config.ImagePullProgressTimeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("failed to parse image_pull_progress_timeout %q: %w", c.config.ImagePullProgressTimeout, err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
pctx, pcancel = context.WithCancel(ctx)
|
|
|
|
|
|
|
|
|
|
pullReporter = newPullProgressReporter(ref, pcancel, imagePullProgressTimeout)
|
|
|
|
|
|
|
|
|
|
resolver = docker.NewResolver(docker.ResolverOptions{
|
|
|
|
|
Headers: c.config.Registry.Headers,
|
|
|
|
|
Hosts: c.registryHosts(ctx, r.GetAuth()),
|
|
|
|
|
Hosts: c.registryHosts(ctx, r.GetAuth(), pullReporter.optionUpdateClient),
|
|
|
|
|
})
|
|
|
|
|
isSchema1 bool
|
|
|
|
|
imageHandler containerdimages.HandlerFunc = func(_ context.Context,
|
|
|
|
|
@@ -138,7 +151,9 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
|
|
|
|
|
containerd.WithChildLabelMap(containerdimages.ChildGCLabelsFilterLayers))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
image, err := c.client.Pull(ctx, ref, pullOpts...)
|
|
|
|
|
pullReporter.start(pctx)
|
|
|
|
|
image, err := c.client.Pull(pctx, ref, pullOpts...)
|
|
|
|
|
pcancel()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("failed to pull and unpack image %q: %w", ref, err)
|
|
|
|
|
}
|
|
|
|
|
@@ -332,10 +347,12 @@ func hostDirFromRoots(roots []string) func(string) (string, error) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// registryHosts is the registry hosts to be used by the resolver.
|
|
|
|
|
func (c *criService) registryHosts(ctx context.Context, auth *runtime.AuthConfig) docker.RegistryHosts {
|
|
|
|
|
func (c *criService) registryHosts(ctx context.Context, auth *runtime.AuthConfig, updateClientFn config.UpdateClientFunc) docker.RegistryHosts {
|
|
|
|
|
paths := filepath.SplitList(c.config.Registry.ConfigPath)
|
|
|
|
|
if len(paths) > 0 {
|
|
|
|
|
hostOptions := config.HostOptions{}
|
|
|
|
|
hostOptions := config.HostOptions{
|
|
|
|
|
UpdateClient: updateClientFn,
|
|
|
|
|
}
|
|
|
|
|
hostOptions.Credentials = func(host string) (string, string, error) {
|
|
|
|
|
hostauth := auth
|
|
|
|
|
if hostauth == nil {
|
|
|
|
|
@@ -388,6 +405,13 @@ func (c *criService) registryHosts(ctx context.Context, auth *runtime.AuthConfig
|
|
|
|
|
if auth == nil && config.Auth != nil {
|
|
|
|
|
auth = toRuntimeAuthConfig(*config.Auth)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if updateClientFn != nil {
|
|
|
|
|
if err := updateClientFn(client); err != nil {
|
|
|
|
|
return nil, fmt.Errorf("failed to update http client: %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
authorizer := docker.NewDockerAuthorizer(
|
|
|
|
|
docker.WithAuthClient(client),
|
|
|
|
|
docker.WithAuthCreds(func(host string) (string, string, error) {
|
|
|
|
|
@@ -579,3 +603,186 @@ func getLayers(ctx context.Context, key string, descs []imagespec.Descriptor, va
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
// minPullProgressReportInternal is used to prevent the reporter from
|
|
|
|
|
// eating more CPU resources
|
|
|
|
|
minPullProgressReportInternal = 5 * time.Second
|
|
|
|
|
// defaultPullProgressReportInterval represents that how often the
|
|
|
|
|
// reporter checks that pull progress.
|
|
|
|
|
defaultPullProgressReportInterval = 10 * time.Second
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// pullProgressReporter is used to check single PullImage progress.
|
|
|
|
|
type pullProgressReporter struct {
|
|
|
|
|
ref string
|
|
|
|
|
cancel context.CancelFunc
|
|
|
|
|
reqReporter pullRequestReporter
|
|
|
|
|
timeout time.Duration
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newPullProgressReporter(ref string, cancel context.CancelFunc, timeout time.Duration) *pullProgressReporter {
|
|
|
|
|
return &pullProgressReporter{
|
|
|
|
|
ref: ref,
|
|
|
|
|
cancel: cancel,
|
|
|
|
|
reqReporter: pullRequestReporter{},
|
|
|
|
|
timeout: timeout,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (reporter *pullProgressReporter) optionUpdateClient(client *http.Client) error {
|
|
|
|
|
client.Transport = &pullRequestReporterRoundTripper{
|
|
|
|
|
rt: client.Transport,
|
|
|
|
|
reqReporter: &reporter.reqReporter,
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (reporter *pullProgressReporter) start(ctx context.Context) {
|
|
|
|
|
if reporter.timeout == 0 {
|
|
|
|
|
log.G(ctx).Infof("no timeout and will not start pulling image %s reporter", reporter.ref)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
var (
|
|
|
|
|
reportInterval = defaultPullProgressReportInterval
|
|
|
|
|
|
|
|
|
|
lastSeenBytesRead = uint64(0)
|
|
|
|
|
lastSeenTimestamp = time.Now()
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// check progress more frequently if timeout < default internal
|
|
|
|
|
if reporter.timeout < reportInterval {
|
|
|
|
|
reportInterval = reporter.timeout / 2
|
|
|
|
|
|
|
|
|
|
if reportInterval < minPullProgressReportInternal {
|
|
|
|
|
reportInterval = minPullProgressReportInternal
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var ticker = time.NewTicker(reportInterval)
|
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
activeReqs, bytesRead := reporter.reqReporter.status()
|
|
|
|
|
|
|
|
|
|
log.G(ctx).WithField("ref", reporter.ref).
|
|
|
|
|
WithField("activeReqs", activeReqs).
|
|
|
|
|
WithField("totalBytesRead", bytesRead).
|
|
|
|
|
WithField("lastSeenBytesRead", lastSeenBytesRead).
|
|
|
|
|
WithField("lastSeenTimestamp", lastSeenTimestamp).
|
|
|
|
|
WithField("reportInterval", reportInterval).
|
|
|
|
|
Tracef("progress for image pull")
|
|
|
|
|
|
|
|
|
|
if activeReqs == 0 || bytesRead > lastSeenBytesRead {
|
|
|
|
|
lastSeenBytesRead = bytesRead
|
|
|
|
|
lastSeenTimestamp = time.Now()
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if time.Since(lastSeenTimestamp) > reporter.timeout {
|
|
|
|
|
log.G(ctx).Errorf("cancel pulling image %s because of no progress in %v", reporter.ref, reporter.timeout)
|
|
|
|
|
reporter.cancel()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
activeReqs, bytesRead := reporter.reqReporter.status()
|
|
|
|
|
log.G(ctx).Infof("stop pulling image %s: active requests=%v, bytes read=%v", reporter.ref, activeReqs, bytesRead)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// countingReadCloser wraps http.Response.Body with pull request reporter,
|
|
|
|
|
// which is used by pullRequestReporterRoundTripper.
|
|
|
|
|
type countingReadCloser struct {
|
|
|
|
|
once sync.Once
|
|
|
|
|
|
|
|
|
|
rc io.ReadCloser
|
|
|
|
|
reqReporter *pullRequestReporter
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Read reads bytes from original io.ReadCloser and increases bytes in
|
|
|
|
|
// pull request reporter.
|
|
|
|
|
func (r *countingReadCloser) Read(p []byte) (int, error) {
|
|
|
|
|
n, err := r.rc.Read(p)
|
|
|
|
|
r.reqReporter.incByteRead(uint64(n))
|
|
|
|
|
return n, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Close closes the original io.ReadCloser and only decreases the number of
|
|
|
|
|
// active pull requests once.
|
|
|
|
|
func (r *countingReadCloser) Close() error {
|
|
|
|
|
err := r.rc.Close()
|
|
|
|
|
r.once.Do(r.reqReporter.decRequest)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// pullRequestReporter is used to track the progress per each criapi.PullImage.
|
|
|
|
|
type pullRequestReporter struct {
|
|
|
|
|
// activeReqs indicates that current number of active pulling requests,
|
|
|
|
|
// including auth requests.
|
|
|
|
|
activeReqs int32
|
|
|
|
|
// totalBytesRead indicates that the total bytes has been read from
|
|
|
|
|
// remote registry.
|
|
|
|
|
totalBytesRead uint64
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (reporter *pullRequestReporter) incRequest() {
|
|
|
|
|
atomic.AddInt32(&reporter.activeReqs, 1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (reporter *pullRequestReporter) decRequest() {
|
|
|
|
|
atomic.AddInt32(&reporter.activeReqs, -1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (reporter *pullRequestReporter) incByteRead(nr uint64) {
|
|
|
|
|
atomic.AddUint64(&reporter.totalBytesRead, nr)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (reporter *pullRequestReporter) status() (currentReqs int32, totalBytesRead uint64) {
|
|
|
|
|
currentReqs = atomic.LoadInt32(&reporter.activeReqs)
|
|
|
|
|
totalBytesRead = atomic.LoadUint64(&reporter.totalBytesRead)
|
|
|
|
|
return currentReqs, totalBytesRead
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// pullRequestReporterRoundTripper wraps http.RoundTripper with pull request
|
|
|
|
|
// reporter which is used to track the progress of active http request with
|
|
|
|
|
// counting readable http.Response.Body.
|
|
|
|
|
//
|
|
|
|
|
// NOTE:
|
|
|
|
|
//
|
|
|
|
|
// Although containerd provides ingester manager to track the progress
|
|
|
|
|
// of pulling request, for example `ctr image pull` shows the console progress
|
|
|
|
|
// bar, it needs more CPU resources to open/read the ingested files with
|
|
|
|
|
// acquiring containerd metadata plugin's boltdb lock.
|
|
|
|
|
//
|
|
|
|
|
// Before sending HTTP request to registry, the containerd.Client.Pull library
|
|
|
|
|
// will open writer by containerd ingester manager. Based on this, the
|
|
|
|
|
// http.RoundTripper wrapper can track the active progress with lower overhead
|
|
|
|
|
// even if the ref has been locked in ingester manager by other Pull request.
|
|
|
|
|
type pullRequestReporterRoundTripper struct {
|
|
|
|
|
rt http.RoundTripper
|
|
|
|
|
|
|
|
|
|
reqReporter *pullRequestReporter
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rt *pullRequestReporterRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
|
|
|
|
rt.reqReporter.incRequest()
|
|
|
|
|
|
|
|
|
|
resp, err := rt.rt.RoundTrip(req)
|
|
|
|
|
if err != nil {
|
|
|
|
|
rt.reqReporter.decRequest()
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
resp.Body = &countingReadCloser{
|
|
|
|
|
rc: resp.Body,
|
|
|
|
|
reqReporter: rt.reqReporter,
|
|
|
|
|
}
|
|
|
|
|
return resp, err
|
|
|
|
|
}
|
|
|
|
|
|