diff --git a/cmd/dist/common.go b/cmd/dist/common.go index 501a37ab9..7fdd187ae 100644 --- a/cmd/dist/common.go +++ b/cmd/dist/common.go @@ -120,6 +120,7 @@ func getResolver(ctx context.Context, clicontext *cli.Context) (remotes.Resolver } options := docker.ResolverOptions{ PlainHTTP: clicontext.Bool("plain-http"), + Tracker: pushTracker, } if username != "" { if secret == "" { diff --git a/cmd/dist/push.go b/cmd/dist/push.go index 44a450392..ef6dbe36c 100644 --- a/cmd/dist/push.go +++ b/cmd/dist/push.go @@ -2,7 +2,6 @@ package main import ( "context" - "io" "os" "sync" "text/tabwriter" @@ -13,6 +12,7 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/progress" "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -20,6 +20,10 @@ import ( "golang.org/x/sync/errgroup" ) +var ( + pushTracker = docker.NewInMemoryTracker() +) + var pushCommand = cli.Command{ Name: "push", Usage: "push an image to a remote", @@ -77,7 +81,7 @@ var pushCommand = cli.Command{ if err != nil { return err } - ongoing := newPushJobs() + ongoing := newPushJobs(pushTracker) eg, ctx := errgroup.WithContext(ctx) @@ -135,37 +139,6 @@ var pushCommand = cli.Command{ }, } -type pushTracker struct { - closed bool - started time.Time - updated time.Time - written int64 - total int64 -} - -func (pt *pushTracker) Write(p []byte) (int, error) { - pt.written += int64(len(p)) - pt.updated = time.Now() - return len(p), nil -} - -func (pt *pushTracker) Close() error { - pt.closed = true - pt.updated = time.Now() - return nil -} - -//type pushWrapper struct { -// jobs *pushjobs -// pusher remotes.Pusher -//} -// -//func (pw pushWrapper) Push(ctx context.Context, desc ocispec.Descriptor, r io.Reader) error { -// tr := pw.jobs.track(remotes.MakeRefKey(ctx, desc), desc.Size) -// defer tr.Close() -// return pw.pusher.Push(ctx, desc, io.TeeReader(r, tr)) -//} - type pushStatus struct { name string started bool @@ -174,22 +147,19 @@ type pushStatus struct { } type pushjobs struct { - jobs map[string]*pushTracker + jobs map[string]struct{} ordered []string + tracker docker.StatusTracker mu sync.Mutex } -func newPushJobs() *pushjobs { - return &pushjobs{jobs: make(map[string]*pushTracker)} +func newPushJobs(tracker docker.StatusTracker) *pushjobs { + return &pushjobs{ + jobs: make(map[string]struct{}), + tracker: tracker, + } } -//func (j *pushjobs) wrapPusher(p remotes.Pusher) remotes.Pusher { -// return pushWrapper{ -// jobs: j, -// pusher: p, -// } -//} - func (j *pushjobs) add(ref string) { j.mu.Lock() defer j.mu.Unlock() @@ -198,52 +168,39 @@ func (j *pushjobs) add(ref string) { return } j.ordered = append(j.ordered, ref) - j.jobs[ref] = nil -} - -func (j *pushjobs) track(ref string, size int64) io.WriteCloser { - j.mu.Lock() - defer j.mu.Unlock() - - if _, ok := j.jobs[ref]; !ok { - j.ordered = append(j.ordered, ref) - } - - pt := &pushTracker{ - started: time.Now(), - total: size, - } - j.jobs[ref] = pt - return pt + j.jobs[ref] = struct{}{} } func (j *pushjobs) status() []statusInfo { j.mu.Lock() defer j.mu.Unlock() - status := make([]statusInfo, 0, len(j.jobs)) + statuses := make([]statusInfo, 0, len(j.jobs)) for _, name := range j.ordered { - tracker := j.jobs[name] si := statusInfo{ Ref: name, } - if tracker != nil { - si.Offset = tracker.written - si.Total = tracker.total - si.StartedAt = tracker.started - si.UpdatedAt = tracker.updated - if tracker.closed { - si.Status = "done" - } else if tracker.written >= tracker.total { - si.Status = "committing" + + status, err := j.tracker.GetStatus(name) + if err != nil { + si.Status = "waiting" + } else { + si.Offset = status.Offset + si.Total = status.Total + si.StartedAt = status.StartedAt + si.UpdatedAt = status.UpdatedAt + if status.Offset >= status.Total { + if status.UploadUUID == "" { + si.Status = "done" + } else { + si.Status = "committing" + } } else { si.Status = "uploading" } - } else { - si.Status = "waiting" } - status = append(status, si) + statuses = append(statuses, si) } - return status + return statuses } diff --git a/remotes/docker/pusher.go b/remotes/docker/pusher.go index 0b87828f0..f99701820 100644 --- a/remotes/docker/pusher.go +++ b/remotes/docker/pusher.go @@ -12,7 +12,7 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" - "github.com/containerd/containerd/reference" + "github.com/containerd/containerd/remotes" digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -21,13 +21,24 @@ import ( type dockerPusher struct { *dockerBase tag string + + // TODO: namespace tracker + tracker StatusTracker } func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (content.Writer, error) { - // TODO: Check status, return already exists + ref := remotes.MakeRefKey(ctx, desc) + status, err := p.tracker.GetStatus(ref) + if err == nil { + if status.Offset == status.Total { + return nil, content.ErrExists + } + // TODO: Handle incomplete status + } else if !content.IsNotFound(err) { + return nil, errors.Wrap(err, "failed to get status") + } var ( - ref string isManifest bool existCheck string ) @@ -55,8 +66,13 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten log.G(ctx).WithError(err).Debugf("Unable to check existence, continuing with push") } else { if resp.StatusCode == http.StatusOK { + p.tracker.SetStatus(ref, Status{ + Status: content.Status{ + Ref: ref, + // TODO: Set updated time? + }, + }) return nil, content.ErrExists - // TODO: Update status with total 0 } if resp.StatusCode != http.StatusNotFound { // TODO: log error @@ -68,15 +84,11 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten if isManifest { var putPath string - refspec := reference.Spec{Locator: p.locator} if p.tag != "" { putPath = path.Join("manifests", p.tag) - refspec.Object = p.tag } else { putPath = path.Join("manifests", desc.Digest.String()) - refspec.Object = "@" + desc.Digest.String() } - ref = refspec.String() req, err = http.NewRequest(http.MethodPut, p.url(putPath), nil) if err != nil { @@ -100,7 +112,6 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten // TODO: log error return nil, errors.Errorf("unexpected response: %s", resp.Status) } - ref = resp.Header.Get("Docker-Upload-Uuid") location := resp.Header.Get("Location") // Support paths without host in location @@ -119,9 +130,16 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten req.URL.RawQuery = q.Encode() } - // TODO: Support chunked upload + p.tracker.SetStatus(ref, Status{ + Status: content.Status{ + Ref: ref, + Total: desc.Size, + Expected: desc.Digest, + StartedAt: time.Now(), + }, + }) - // TODO: Set status + // TODO: Support chunked upload pr, pw := io.Pipe() respC := make(chan *http.Response, 1) @@ -149,14 +167,8 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten pipe: pw, responseC: respC, isManifest: isManifest, - - // TODO: Move this - status: content.Status{ - Ref: ref, - Total: desc.Size, - Expected: desc.Digest, - StartedAt: time.Now(), - }, + expected: desc.Digest, + tracker: p.tracker, }, nil } @@ -168,14 +180,19 @@ type pushWriter struct { responseC <-chan *http.Response isManifest bool - // TODO: Move this to lookup from base - status content.Status + expected digest.Digest + tracker StatusTracker } func (pw *pushWriter) Write(p []byte) (n int, err error) { + status, err := pw.tracker.GetStatus(pw.ref) + if err != nil { + return n, err + } n, err = pw.pipe.Write(p) - pw.status.Offset += int64(n) - pw.status.UpdatedAt = time.Now() + status.Offset += int64(n) + status.UpdatedAt = time.Now() + pw.tracker.SetStatus(pw.ref, status) return } @@ -184,14 +201,17 @@ func (pw *pushWriter) Close() error { } func (pw *pushWriter) Status() (content.Status, error) { - // TODO: Lookup status from base tracker - return pw.status, nil + status, err := pw.tracker.GetStatus(pw.ref) + if err != nil { + return content.Status{}, err + } + return status.Status, nil } func (pw *pushWriter) Digest() digest.Digest { // TODO: Get rid of this function? - return pw.status.Expected + return pw.expected } func (pw *pushWriter) Commit(size int64, expected digest.Digest) error { @@ -211,10 +231,17 @@ func (pw *pushWriter) Commit(size int64, expected digest.Digest) error { return errors.New("no response") } - // TODO: Get status for size check + status, err := pw.tracker.GetStatus(pw.ref) + if err != nil { + return errors.Wrap(err, "failed to get status") + } + + if size > 0 && size != status.Offset { + return errors.Errorf("unxpected size %d, expected %d", status.Offset, size) + } if expected == "" { - expected = pw.status.Expected + expected = status.Expected } actual, err := digest.Parse(resp.Header.Get("Docker-Content-Digest")) diff --git a/remotes/docker/resolver.go b/remotes/docker/resolver.go index 2dbd5bed3..fe9626858 100644 --- a/remotes/docker/resolver.go +++ b/remotes/docker/resolver.go @@ -38,6 +38,7 @@ type dockerResolver struct { credentials func(string) (string, string, error) plainHTTP bool client *http.Client + tracker StatusTracker } // ResolverOptions are used to configured a new Docker register resolver @@ -53,15 +54,23 @@ type ResolverOptions struct { // Client is the http client to used when making registry requests Client *http.Client - // TODO: Add upload status tracker + // Tracker is used to track uploads to the registry. This is used + // since the registry does not have upload tracking and the existing + // mechanism for getting blob upload status is expensive. + Tracker StatusTracker } // NewResolver returns a new resolver to a Docker registry func NewResolver(options ResolverOptions) remotes.Resolver { + tracker := options.Tracker + if tracker == nil { + tracker = NewInMemoryTracker() + } return &dockerResolver{ credentials: options.Credentials, plainHTTP: options.PlainHTTP, client: options.Client, + tracker: tracker, } } @@ -214,13 +223,13 @@ func (r *dockerResolver) Pusher(ctx context.Context, ref string) (remotes.Pusher return dockerPusher{ dockerBase: base, tag: refspec.Object, + tracker: r.tracker, }, nil } type dockerBase struct { - base url.URL - locator string - token string + base url.URL + token string client *http.Client useBasic bool @@ -260,7 +269,6 @@ func (r *dockerResolver) base(refspec reference.Spec) (*dockerBase, error) { return &dockerBase{ base: base, - locator: refspec.Locator, client: r.client, username: username, secret: secret, diff --git a/remotes/docker/resolver_test.go b/remotes/docker/resolver_test.go index cf6f9fffe..32e79e4a6 100644 --- a/remotes/docker/resolver_test.go +++ b/remotes/docker/resolver_test.go @@ -158,7 +158,7 @@ func TestBadTokenResolver(t *testing.T) { } ctx := context.Background() - h := content(ocispec.MediaTypeImageManifest, []byte("not anything parse-able")) + h := newContent(ocispec.MediaTypeImageManifest, []byte("not anything parse-able")) base, ro, close := withTokenServer(th, creds)(logHandler{t, h}) defer close() @@ -247,10 +247,10 @@ func runBasicTest(t *testing.T, name string, sf func(h http.Handler) (string, Re ) m := newManifest( - content(ocispec.MediaTypeImageConfig, []byte("1")), - content(ocispec.MediaTypeImageLayerGzip, []byte("2")), + newContent(ocispec.MediaTypeImageConfig, []byte("1")), + newContent(ocispec.MediaTypeImageLayerGzip, []byte("2")), ) - mc := content(ocispec.MediaTypeImageManifest, m.OCIManifest()) + mc := newContent(ocispec.MediaTypeImageManifest, m.OCIManifest()) m.RegisterHandler(r, name) r.Handle(fmt.Sprintf("/v2/%s/manifests/%s", name, tag), mc) r.Handle(fmt.Sprintf("/v2/%s/manifests/%s", name, mc.Digest()), mc) @@ -331,7 +331,7 @@ type testContent struct { content []byte } -func content(mediaType string, b []byte) testContent { +func newContent(mediaType string, b []byte) testContent { return testContent{ mediaType: mediaType, content: b, diff --git a/remotes/docker/status.go b/remotes/docker/status.go new file mode 100644 index 000000000..34280f1d1 --- /dev/null +++ b/remotes/docker/status.go @@ -0,0 +1,46 @@ +package docker + +import ( + "sync" + + "github.com/containerd/containerd/content" +) + +type Status struct { + content.Status + + // UploadUUID is used by the Docker registry to reference blob uploads + UploadUUID string +} + +type StatusTracker interface { + GetStatus(string) (Status, error) + SetStatus(string, Status) +} + +type memoryStatusTracker struct { + statuses map[string]Status + m sync.Mutex +} + +func NewInMemoryTracker() StatusTracker { + return &memoryStatusTracker{ + statuses: map[string]Status{}, + } +} + +func (t *memoryStatusTracker) GetStatus(ref string) (Status, error) { + t.m.Lock() + defer t.m.Unlock() + status, ok := t.statuses[ref] + if !ok { + return Status{}, content.ErrNotFound + } + return status, nil +} + +func (t *memoryStatusTracker) SetStatus(ref string, status Status) { + t.m.Lock() + t.statuses[ref] = status + t.m.Unlock() +}