From 621760f10aa2960363646f3748b2f41efd91cbee Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Mon, 5 Jun 2017 13:52:36 -0700 Subject: [PATCH 1/3] Add ReaderAt support to content store Signed-off-by: Derek McGowan --- content/content.go | 1 + content/readerat.go | 26 ++++++++++++++++++++++++++ content/store.go | 11 ++++++----- services/content/reader.go | 37 +++++++++++++++++++++++++++++++++++-- services/content/store.go | 8 ++++++++ 5 files changed, 76 insertions(+), 7 deletions(-) create mode 100644 content/readerat.go diff --git a/content/content.go b/content/content.go index 7c02ded53..55c20a9ba 100644 --- a/content/content.go +++ b/content/content.go @@ -30,6 +30,7 @@ var ( type Provider interface { Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) + ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error) } type Ingester interface { diff --git a/content/readerat.go b/content/readerat.go new file mode 100644 index 000000000..74c11bd1f --- /dev/null +++ b/content/readerat.go @@ -0,0 +1,26 @@ +package content + +import ( + "io" + "os" +) + +// readerat implements io.ReaderAt in a completely stateless manner by opening +// the referenced file for each call to ReadAt. +type readerAt struct { + f string +} + +func (ra readerAt) ReadAt(p []byte, offset int64) (int, error) { + fp, err := os.Open(ra.f) + if err != nil { + return 0, err + } + defer fp.Close() + + if _, err := fp.Seek(offset, io.SeekStart); err != nil { + return 0, err + } + + return fp.Read(p) +} diff --git a/content/store.go b/content/store.go index f1d59caf3..58a2c2933 100644 --- a/content/store.go +++ b/content/store.go @@ -58,11 +58,7 @@ func (s *store) info(dgst digest.Digest, fi os.FileInfo) Info { } } -// Open returns an io.ReadCloser for the blob. -// -// TODO(stevvooe): This would work much better as an io.ReaderAt in practice. -// Right now, we are doing type assertion to tease that out, but it won't scale -// well. +// Reader returns an io.ReadCloser for the blob. func (s *store) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) { fp, err := os.Open(s.blobPath(dgst)) if err != nil { @@ -75,6 +71,11 @@ func (s *store) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, return fp, nil } +// ReaderAt returns an io.ReaderAt for the blob. +func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error) { + return readerAt{f: s.blobPath(dgst)}, nil +} + // Delete removes a blob by its digest. // // While this is safe to do concurrently, safe exist-removal logic must hold diff --git a/services/content/reader.go b/services/content/reader.go index fd4200e50..7c22db987 100644 --- a/services/content/reader.go +++ b/services/content/reader.go @@ -1,7 +1,10 @@ package content import ( + "context" + contentapi "github.com/containerd/containerd/api/services/content" + digest "github.com/opencontainers/go-digest" ) type remoteReader struct { @@ -42,8 +45,38 @@ func (rr *remoteReader) Read(p []byte) (n int, err error) { return } -// TODO(stevvooe): Implemente io.ReaderAt. - func (rr *remoteReader) Close() error { return rr.client.CloseSend() } + +type remoteReaderAt struct { + ctx context.Context + digest digest.Digest + client contentapi.ContentClient +} + +func (ra *remoteReaderAt) ReadAt(p []byte, off int64) (n int, err error) { + rr := &contentapi.ReadRequest{ + Digest: ra.digest, + Offset: off, + Size_: int64(len(p)), + } + rc, err := ra.client.Read(ra.ctx, rr) + if err != nil { + return 0, err + } + + for len(p) > 0 { + var resp *contentapi.ReadResponse + // fill our buffer up until we can fill p. + resp, err = rc.Recv() + if err != nil { + return n, err + } + + copied := copy(p, resp.Data) + n += copied + p = p[copied:] + } + return n, nil +} diff --git a/services/content/store.go b/services/content/store.go index e5c19e458..c715b29a0 100644 --- a/services/content/store.go +++ b/services/content/store.go @@ -85,6 +85,14 @@ func (rs *remoteStore) Reader(ctx context.Context, dgst digest.Digest) (io.ReadC }, nil } +func (rs *remoteStore) ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error) { + return &remoteReaderAt{ + ctx: ctx, + digest: dgst, + client: rs.client, + }, nil +} + func (rs *remoteStore) Status(ctx context.Context, re string) ([]content.Status, error) { resp, err := rs.client.Status(ctx, &contentapi.StatusRequest{ Regexp: re, From 5615b68f0697dfe5cbb50d58c1f403de224d3a2f Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Mon, 5 Jun 2017 17:30:17 -0700 Subject: [PATCH 2/3] Update pusher to use content writer Signed-off-by: Derek McGowan --- client.go | 17 ---- cmd/dist/push.go | 33 ++++--- cmd/dist/pushobject.go | 8 +- remotes/docker/pusher.go | 170 ++++++++++++++++++++++++++++--------- remotes/docker/resolver.go | 8 +- remotes/handlers.go | 34 ++++++-- remotes/resolver.go | 7 +- 7 files changed, 190 insertions(+), 87 deletions(-) diff --git a/client.go b/client.go index 08095d5df..6f2f09155 100644 --- a/client.go +++ b/client.go @@ -209,10 +209,6 @@ type RemoteContext struct { // afterwards. Unpacking is required to run an image. Unpack bool - // PushWrapper allows hooking into the push method. This can be used - // track content that is being sent to the remote. - PushWrapper func(remotes.Pusher) remotes.Pusher - // BaseHandlers are a set of handlers which get are called on dispatch. // These handlers always get called before any operation specific // handlers. @@ -251,15 +247,6 @@ func WithImageHandler(h images.Handler) RemoteOpts { } } -// WithPushWrapper is used to wrap a pusher to hook into -// the push content as it is sent to a remote. -func WithPushWrapper(w func(remotes.Pusher) remotes.Pusher) RemoteOpts { - return func(client *Client, c *RemoteContext) error { - c.PushWrapper = w - return nil - } -} - func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpts) (Image, error) { pullCtx := defaultRemoteContext() for _, o := range opts { @@ -318,10 +305,6 @@ func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, return err } - if pushCtx.PushWrapper != nil { - pusher = pushCtx.PushWrapper(pusher) - } - var m sync.Mutex manifestStack := []ocispec.Descriptor{} diff --git a/cmd/dist/push.go b/cmd/dist/push.go index 439b33521..44a450392 100644 --- a/cmd/dist/push.go +++ b/cmd/dist/push.go @@ -92,7 +92,6 @@ var pushCommand = cli.Command{ return client.Push(ctx, ref, desc, containerd.WithResolver(resolver), containerd.WithImageHandler(jobHandler), - containerd.WithPushWrapper(ongoing.wrapPusher), ) }) @@ -156,16 +155,16 @@ func (pt *pushTracker) Close() error { return nil } -type pushWrapper struct { - jobs *pushjobs - pusher remotes.Pusher -} - -func (pw pushWrapper) Push(ctx context.Context, desc ocispec.Descriptor, r io.Reader) error { - tr := pw.jobs.track(remotes.MakeRefKey(ctx, desc), desc.Size) - defer tr.Close() - return pw.pusher.Push(ctx, desc, io.TeeReader(r, tr)) -} +//type pushWrapper struct { +// jobs *pushjobs +// pusher remotes.Pusher +//} +// +//func (pw pushWrapper) Push(ctx context.Context, desc ocispec.Descriptor, r io.Reader) error { +// tr := pw.jobs.track(remotes.MakeRefKey(ctx, desc), desc.Size) +// defer tr.Close() +// return pw.pusher.Push(ctx, desc, io.TeeReader(r, tr)) +//} type pushStatus struct { name string @@ -184,12 +183,12 @@ func newPushJobs() *pushjobs { return &pushjobs{jobs: make(map[string]*pushTracker)} } -func (j *pushjobs) wrapPusher(p remotes.Pusher) remotes.Pusher { - return pushWrapper{ - jobs: j, - pusher: p, - } -} +//func (j *pushjobs) wrapPusher(p remotes.Pusher) remotes.Pusher { +// return pushWrapper{ +// jobs: j, +// pusher: p, +// } +//} func (j *pushjobs) add(ref string) { j.mu.Lock() diff --git a/cmd/dist/pushobject.go b/cmd/dist/pushobject.go index 196036a88..fab26b8fa 100644 --- a/cmd/dist/pushobject.go +++ b/cmd/dist/pushobject.go @@ -3,6 +3,7 @@ package main import ( "fmt" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/log" digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -63,8 +64,13 @@ var pushObjectCommand = cli.Command{ } defer rc.Close() + cw, err := pusher.Push(ctx, desc) + if err != nil { + return err + } + // TODO: Progress reader - if err = pusher.Push(ctx, desc, rc); err != nil { + if err := content.Copy(cw, rc, desc.Size, desc.Digest); err != nil { return err } diff --git a/remotes/docker/pusher.go b/remotes/docker/pusher.go index faeaa09ba..0b87828f0 100644 --- a/remotes/docker/pusher.go +++ b/remotes/docker/pusher.go @@ -1,16 +1,19 @@ package docker import ( - "bytes" "context" "io" "io/ioutil" "net/http" "path" "strings" + "time" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/reference" + digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -20,8 +23,11 @@ type dockerPusher struct { tag string } -func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor, r io.Reader) error { +func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (content.Writer, error) { + // TODO: Check status, return already exists + var ( + ref string isManifest bool existCheck string ) @@ -37,79 +43,64 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor, r io.Re req, err := http.NewRequest(http.MethodHead, p.url(existCheck), nil) if err != nil { - return err + return nil, err } req.Header.Set("Accept", strings.Join([]string{desc.MediaType, `*`}, ", ")) resp, err := p.doRequestWithRetries(ctx, req, nil) if err != nil { if errors.Cause(err) != ErrInvalidAuthorization { - return err + return nil, err } log.G(ctx).WithError(err).Debugf("Unable to check existence, continuing with push") } else { if resp.StatusCode == http.StatusOK { - return nil + return nil, content.ErrExists + // TODO: Update status with total 0 } if resp.StatusCode != http.StatusNotFound { // TODO: log error - return errors.Errorf("unexpected response: %s", resp.Status) + return nil, errors.Errorf("unexpected response: %s", resp.Status) } } // TODO: Lookup related objects for cross repository push if isManifest { - // Read all to use bytes.Reader for using GetBody - b, err := ioutil.ReadAll(r) - if err != nil { - return errors.Wrap(err, "failed to read manifest") - } var putPath string + refspec := reference.Spec{Locator: p.locator} if p.tag != "" { putPath = path.Join("manifests", p.tag) + refspec.Object = p.tag } else { putPath = path.Join("manifests", desc.Digest.String()) + refspec.Object = "@" + desc.Digest.String() } + ref = refspec.String() - req, err := http.NewRequest(http.MethodPut, p.url(putPath), nil) + req, err = http.NewRequest(http.MethodPut, p.url(putPath), nil) if err != nil { - return err - } - req.ContentLength = int64(len(b)) - req.Body = ioutil.NopCloser(bytes.NewReader(b)) - req.GetBody = func() (io.ReadCloser, error) { - return ioutil.NopCloser(bytes.NewReader(b)), nil + return nil, err } req.Header.Add("Content-Type", desc.MediaType) - - resp, err := p.doRequestWithRetries(ctx, req, nil) - if err != nil { - return err - } - if resp.StatusCode != http.StatusCreated { - // TODO: log error - return errors.Errorf("unexpected response: %s", resp.Status) - } } else { // TODO: Do monolithic upload if size is small - // TODO: Turn multi-request blob uploader into ingester - // Start upload request - req, err := http.NewRequest(http.MethodPost, p.url("blobs", "uploads")+"/", nil) + req, err = http.NewRequest(http.MethodPost, p.url("blobs", "uploads")+"/", nil) if err != nil { - return err + return nil, err } resp, err := p.doRequestWithRetries(ctx, req, nil) if err != nil { - return err + return nil, err } if resp.StatusCode != http.StatusAccepted { // TODO: log error - return errors.Errorf("unexpected response: %s", resp.Status) + return nil, errors.Errorf("unexpected response: %s", resp.Status) } + ref = resp.Header.Get("Docker-Upload-Uuid") location := resp.Header.Get("Location") // Support paths without host in location @@ -119,26 +110,127 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor, r io.Re location = u.String() } - // TODO: Support chunked upload - req, err = http.NewRequest(http.MethodPut, location, r) + req, err = http.NewRequest(http.MethodPut, location, nil) if err != nil { - return err + return nil, err } q := req.URL.Query() q.Add("digest", desc.Digest.String()) req.URL.RawQuery = q.Encode() - req.ContentLength = desc.Size + } + // TODO: Support chunked upload + + // TODO: Set status + + pr, pw := io.Pipe() + respC := make(chan *http.Response, 1) + + req.Body = ioutil.NopCloser(pr) + req.ContentLength = desc.Size + + go func() { + defer close(respC) resp, err = p.doRequest(ctx, req) if err != nil { - return err + pr.CloseWithError(err) + return } if resp.StatusCode != http.StatusCreated { // TODO: log error - return errors.Errorf("unexpected response: %s", resp.Status) + pr.CloseWithError(errors.Errorf("unexpected response: %s", resp.Status)) } + respC <- resp + }() + return &pushWriter{ + base: p.dockerBase, + ref: ref, + pipe: pw, + responseC: respC, + isManifest: isManifest, + + // TODO: Move this + status: content.Status{ + Ref: ref, + Total: desc.Size, + Expected: desc.Digest, + StartedAt: time.Now(), + }, + }, nil +} + +type pushWriter struct { + base *dockerBase + ref string + + pipe *io.PipeWriter + responseC <-chan *http.Response + isManifest bool + + // TODO: Move this to lookup from base + status content.Status +} + +func (pw *pushWriter) Write(p []byte) (n int, err error) { + n, err = pw.pipe.Write(p) + pw.status.Offset += int64(n) + pw.status.UpdatedAt = time.Now() + return +} + +func (pw *pushWriter) Close() error { + return pw.pipe.Close() +} + +func (pw *pushWriter) Status() (content.Status, error) { + // TODO: Lookup status from base tracker + return pw.status, nil + +} + +func (pw *pushWriter) Digest() digest.Digest { + // TODO: Get rid of this function? + return pw.status.Expected +} + +func (pw *pushWriter) Commit(size int64, expected digest.Digest) error { + // Check whether read has already thrown an error + if _, err := pw.pipe.Write([]byte{}); err != nil && err != io.ErrClosedPipe { + return errors.Wrap(err, "pipe error before commit") + } + + if err := pw.pipe.Close(); err != nil { + return err + } + // TODO: Update status to determine committing + + // TODO: timeout waiting for response + resp := <-pw.responseC + if resp == nil { + return errors.New("no response") + } + + // TODO: Get status for size check + + if expected == "" { + expected = pw.status.Expected + } + + actual, err := digest.Parse(resp.Header.Get("Docker-Content-Digest")) + if err != nil { + return errors.Wrap(err, "invalid content digest in response") + } + + if actual != expected { + return errors.Errorf("got digest %s, expected %s", actual, expected) } return nil } + +func (pw *pushWriter) Truncate(size int64) error { + // TODO: if blob close request and start new request at offset + // TODO: always error on manifest + return errors.New("cannot truncate remote upload") +} diff --git a/remotes/docker/resolver.go b/remotes/docker/resolver.go index 8b55d7c19..2dbd5bed3 100644 --- a/remotes/docker/resolver.go +++ b/remotes/docker/resolver.go @@ -52,6 +52,8 @@ type ResolverOptions struct { // Client is the http client to used when making registry requests Client *http.Client + + // TODO: Add upload status tracker } // NewResolver returns a new resolver to a Docker registry @@ -216,8 +218,9 @@ func (r *dockerResolver) Pusher(ctx context.Context, ref string) (remotes.Pusher } type dockerBase struct { - base url.URL - token string + base url.URL + locator string + token string client *http.Client useBasic bool @@ -257,6 +260,7 @@ func (r *dockerResolver) base(refspec reference.Spec) (*dockerBase, error) { return &dockerBase{ base: base, + locator: refspec.Locator, client: r.client, username: username, secret: secret, diff --git a/remotes/handlers.go b/remotes/handlers.go index 06d75a5ca..ca99e1d0e 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -76,6 +76,8 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc return content.Copy(cw, rc, desc.Size, desc.Digest) } +// PushHandler returns a handler that will push all content from the provider +// using a writer from the pusher. func PushHandler(provider content.Provider, pusher Pusher) images.HandlerFunc { return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{ @@ -84,13 +86,29 @@ func PushHandler(provider content.Provider, pusher Pusher) images.HandlerFunc { "size": desc.Size, })) - log.G(ctx).Debug("push") - r, err := provider.Reader(ctx, desc.Digest) - if err != nil { - return nil, err - } - defer r.Close() - - return nil, pusher.Push(ctx, desc, r) + err := push(ctx, provider, pusher, desc) + return nil, err } } + +func push(ctx context.Context, provider content.Provider, pusher Pusher, desc ocispec.Descriptor) error { + log.G(ctx).Debug("push") + + cw, err := pusher.Push(ctx, desc) + if err != nil { + if !content.IsExists(err) { + return err + } + + return nil + } + defer cw.Close() + + rc, err := provider.Reader(ctx, desc.Digest) + if err != nil { + return err + } + defer rc.Close() + + return content.Copy(cw, rc, desc.Size, desc.Digest) +} diff --git a/remotes/resolver.go b/remotes/resolver.go index 0dde5bfc8..d5fa60fc6 100644 --- a/remotes/resolver.go +++ b/remotes/resolver.go @@ -4,6 +4,7 @@ import ( "context" "io" + "github.com/containerd/containerd/content" ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) @@ -37,9 +38,9 @@ type Fetcher interface { } type Pusher interface { - // Push pushes the resource identified by the descriptor using the - // passed in reader. - Push(ctx context.Context, d ocispec.Descriptor, r io.Reader) error + // Push returns a content writer for the given resource identified + // by the descriptor. + Push(ctx context.Context, d ocispec.Descriptor) (content.Writer, error) } // FetcherFunc allows package users to implement a Fetcher with just a From 636a24eef6c144a3aea85a3c712258e99ea809ad Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Mon, 5 Jun 2017 22:28:20 -0700 Subject: [PATCH 3/3] Add status tracker for Docker remote push Update push client to use status tracker Signed-off-by: Derek McGowan --- cmd/dist/common.go | 1 + cmd/dist/push.go | 107 ++++++++++---------------------- remotes/docker/pusher.go | 83 ++++++++++++++++--------- remotes/docker/resolver.go | 18 ++++-- remotes/docker/resolver_test.go | 10 +-- remotes/docker/status.go | 46 ++++++++++++++ 6 files changed, 152 insertions(+), 113 deletions(-) create mode 100644 remotes/docker/status.go diff --git a/cmd/dist/common.go b/cmd/dist/common.go index 501a37ab9..7fdd187ae 100644 --- a/cmd/dist/common.go +++ b/cmd/dist/common.go @@ -120,6 +120,7 @@ func getResolver(ctx context.Context, clicontext *cli.Context) (remotes.Resolver } options := docker.ResolverOptions{ PlainHTTP: clicontext.Bool("plain-http"), + Tracker: pushTracker, } if username != "" { if secret == "" { diff --git a/cmd/dist/push.go b/cmd/dist/push.go index 44a450392..ef6dbe36c 100644 --- a/cmd/dist/push.go +++ b/cmd/dist/push.go @@ -2,7 +2,6 @@ package main import ( "context" - "io" "os" "sync" "text/tabwriter" @@ -13,6 +12,7 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/progress" "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -20,6 +20,10 @@ import ( "golang.org/x/sync/errgroup" ) +var ( + pushTracker = docker.NewInMemoryTracker() +) + var pushCommand = cli.Command{ Name: "push", Usage: "push an image to a remote", @@ -77,7 +81,7 @@ var pushCommand = cli.Command{ if err != nil { return err } - ongoing := newPushJobs() + ongoing := newPushJobs(pushTracker) eg, ctx := errgroup.WithContext(ctx) @@ -135,37 +139,6 @@ var pushCommand = cli.Command{ }, } -type pushTracker struct { - closed bool - started time.Time - updated time.Time - written int64 - total int64 -} - -func (pt *pushTracker) Write(p []byte) (int, error) { - pt.written += int64(len(p)) - pt.updated = time.Now() - return len(p), nil -} - -func (pt *pushTracker) Close() error { - pt.closed = true - pt.updated = time.Now() - return nil -} - -//type pushWrapper struct { -// jobs *pushjobs -// pusher remotes.Pusher -//} -// -//func (pw pushWrapper) Push(ctx context.Context, desc ocispec.Descriptor, r io.Reader) error { -// tr := pw.jobs.track(remotes.MakeRefKey(ctx, desc), desc.Size) -// defer tr.Close() -// return pw.pusher.Push(ctx, desc, io.TeeReader(r, tr)) -//} - type pushStatus struct { name string started bool @@ -174,22 +147,19 @@ type pushStatus struct { } type pushjobs struct { - jobs map[string]*pushTracker + jobs map[string]struct{} ordered []string + tracker docker.StatusTracker mu sync.Mutex } -func newPushJobs() *pushjobs { - return &pushjobs{jobs: make(map[string]*pushTracker)} +func newPushJobs(tracker docker.StatusTracker) *pushjobs { + return &pushjobs{ + jobs: make(map[string]struct{}), + tracker: tracker, + } } -//func (j *pushjobs) wrapPusher(p remotes.Pusher) remotes.Pusher { -// return pushWrapper{ -// jobs: j, -// pusher: p, -// } -//} - func (j *pushjobs) add(ref string) { j.mu.Lock() defer j.mu.Unlock() @@ -198,52 +168,39 @@ func (j *pushjobs) add(ref string) { return } j.ordered = append(j.ordered, ref) - j.jobs[ref] = nil -} - -func (j *pushjobs) track(ref string, size int64) io.WriteCloser { - j.mu.Lock() - defer j.mu.Unlock() - - if _, ok := j.jobs[ref]; !ok { - j.ordered = append(j.ordered, ref) - } - - pt := &pushTracker{ - started: time.Now(), - total: size, - } - j.jobs[ref] = pt - return pt + j.jobs[ref] = struct{}{} } func (j *pushjobs) status() []statusInfo { j.mu.Lock() defer j.mu.Unlock() - status := make([]statusInfo, 0, len(j.jobs)) + statuses := make([]statusInfo, 0, len(j.jobs)) for _, name := range j.ordered { - tracker := j.jobs[name] si := statusInfo{ Ref: name, } - if tracker != nil { - si.Offset = tracker.written - si.Total = tracker.total - si.StartedAt = tracker.started - si.UpdatedAt = tracker.updated - if tracker.closed { - si.Status = "done" - } else if tracker.written >= tracker.total { - si.Status = "committing" + + status, err := j.tracker.GetStatus(name) + if err != nil { + si.Status = "waiting" + } else { + si.Offset = status.Offset + si.Total = status.Total + si.StartedAt = status.StartedAt + si.UpdatedAt = status.UpdatedAt + if status.Offset >= status.Total { + if status.UploadUUID == "" { + si.Status = "done" + } else { + si.Status = "committing" + } } else { si.Status = "uploading" } - } else { - si.Status = "waiting" } - status = append(status, si) + statuses = append(statuses, si) } - return status + return statuses } diff --git a/remotes/docker/pusher.go b/remotes/docker/pusher.go index 0b87828f0..f99701820 100644 --- a/remotes/docker/pusher.go +++ b/remotes/docker/pusher.go @@ -12,7 +12,7 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" - "github.com/containerd/containerd/reference" + "github.com/containerd/containerd/remotes" digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -21,13 +21,24 @@ import ( type dockerPusher struct { *dockerBase tag string + + // TODO: namespace tracker + tracker StatusTracker } func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (content.Writer, error) { - // TODO: Check status, return already exists + ref := remotes.MakeRefKey(ctx, desc) + status, err := p.tracker.GetStatus(ref) + if err == nil { + if status.Offset == status.Total { + return nil, content.ErrExists + } + // TODO: Handle incomplete status + } else if !content.IsNotFound(err) { + return nil, errors.Wrap(err, "failed to get status") + } var ( - ref string isManifest bool existCheck string ) @@ -55,8 +66,13 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten log.G(ctx).WithError(err).Debugf("Unable to check existence, continuing with push") } else { if resp.StatusCode == http.StatusOK { + p.tracker.SetStatus(ref, Status{ + Status: content.Status{ + Ref: ref, + // TODO: Set updated time? + }, + }) return nil, content.ErrExists - // TODO: Update status with total 0 } if resp.StatusCode != http.StatusNotFound { // TODO: log error @@ -68,15 +84,11 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten if isManifest { var putPath string - refspec := reference.Spec{Locator: p.locator} if p.tag != "" { putPath = path.Join("manifests", p.tag) - refspec.Object = p.tag } else { putPath = path.Join("manifests", desc.Digest.String()) - refspec.Object = "@" + desc.Digest.String() } - ref = refspec.String() req, err = http.NewRequest(http.MethodPut, p.url(putPath), nil) if err != nil { @@ -100,7 +112,6 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten // TODO: log error return nil, errors.Errorf("unexpected response: %s", resp.Status) } - ref = resp.Header.Get("Docker-Upload-Uuid") location := resp.Header.Get("Location") // Support paths without host in location @@ -119,9 +130,16 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten req.URL.RawQuery = q.Encode() } - // TODO: Support chunked upload + p.tracker.SetStatus(ref, Status{ + Status: content.Status{ + Ref: ref, + Total: desc.Size, + Expected: desc.Digest, + StartedAt: time.Now(), + }, + }) - // TODO: Set status + // TODO: Support chunked upload pr, pw := io.Pipe() respC := make(chan *http.Response, 1) @@ -149,14 +167,8 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten pipe: pw, responseC: respC, isManifest: isManifest, - - // TODO: Move this - status: content.Status{ - Ref: ref, - Total: desc.Size, - Expected: desc.Digest, - StartedAt: time.Now(), - }, + expected: desc.Digest, + tracker: p.tracker, }, nil } @@ -168,14 +180,19 @@ type pushWriter struct { responseC <-chan *http.Response isManifest bool - // TODO: Move this to lookup from base - status content.Status + expected digest.Digest + tracker StatusTracker } func (pw *pushWriter) Write(p []byte) (n int, err error) { + status, err := pw.tracker.GetStatus(pw.ref) + if err != nil { + return n, err + } n, err = pw.pipe.Write(p) - pw.status.Offset += int64(n) - pw.status.UpdatedAt = time.Now() + status.Offset += int64(n) + status.UpdatedAt = time.Now() + pw.tracker.SetStatus(pw.ref, status) return } @@ -184,14 +201,17 @@ func (pw *pushWriter) Close() error { } func (pw *pushWriter) Status() (content.Status, error) { - // TODO: Lookup status from base tracker - return pw.status, nil + status, err := pw.tracker.GetStatus(pw.ref) + if err != nil { + return content.Status{}, err + } + return status.Status, nil } func (pw *pushWriter) Digest() digest.Digest { // TODO: Get rid of this function? - return pw.status.Expected + return pw.expected } func (pw *pushWriter) Commit(size int64, expected digest.Digest) error { @@ -211,10 +231,17 @@ func (pw *pushWriter) Commit(size int64, expected digest.Digest) error { return errors.New("no response") } - // TODO: Get status for size check + status, err := pw.tracker.GetStatus(pw.ref) + if err != nil { + return errors.Wrap(err, "failed to get status") + } + + if size > 0 && size != status.Offset { + return errors.Errorf("unxpected size %d, expected %d", status.Offset, size) + } if expected == "" { - expected = pw.status.Expected + expected = status.Expected } actual, err := digest.Parse(resp.Header.Get("Docker-Content-Digest")) diff --git a/remotes/docker/resolver.go b/remotes/docker/resolver.go index 2dbd5bed3..fe9626858 100644 --- a/remotes/docker/resolver.go +++ b/remotes/docker/resolver.go @@ -38,6 +38,7 @@ type dockerResolver struct { credentials func(string) (string, string, error) plainHTTP bool client *http.Client + tracker StatusTracker } // ResolverOptions are used to configured a new Docker register resolver @@ -53,15 +54,23 @@ type ResolverOptions struct { // Client is the http client to used when making registry requests Client *http.Client - // TODO: Add upload status tracker + // Tracker is used to track uploads to the registry. This is used + // since the registry does not have upload tracking and the existing + // mechanism for getting blob upload status is expensive. + Tracker StatusTracker } // NewResolver returns a new resolver to a Docker registry func NewResolver(options ResolverOptions) remotes.Resolver { + tracker := options.Tracker + if tracker == nil { + tracker = NewInMemoryTracker() + } return &dockerResolver{ credentials: options.Credentials, plainHTTP: options.PlainHTTP, client: options.Client, + tracker: tracker, } } @@ -214,13 +223,13 @@ func (r *dockerResolver) Pusher(ctx context.Context, ref string) (remotes.Pusher return dockerPusher{ dockerBase: base, tag: refspec.Object, + tracker: r.tracker, }, nil } type dockerBase struct { - base url.URL - locator string - token string + base url.URL + token string client *http.Client useBasic bool @@ -260,7 +269,6 @@ func (r *dockerResolver) base(refspec reference.Spec) (*dockerBase, error) { return &dockerBase{ base: base, - locator: refspec.Locator, client: r.client, username: username, secret: secret, diff --git a/remotes/docker/resolver_test.go b/remotes/docker/resolver_test.go index cf6f9fffe..32e79e4a6 100644 --- a/remotes/docker/resolver_test.go +++ b/remotes/docker/resolver_test.go @@ -158,7 +158,7 @@ func TestBadTokenResolver(t *testing.T) { } ctx := context.Background() - h := content(ocispec.MediaTypeImageManifest, []byte("not anything parse-able")) + h := newContent(ocispec.MediaTypeImageManifest, []byte("not anything parse-able")) base, ro, close := withTokenServer(th, creds)(logHandler{t, h}) defer close() @@ -247,10 +247,10 @@ func runBasicTest(t *testing.T, name string, sf func(h http.Handler) (string, Re ) m := newManifest( - content(ocispec.MediaTypeImageConfig, []byte("1")), - content(ocispec.MediaTypeImageLayerGzip, []byte("2")), + newContent(ocispec.MediaTypeImageConfig, []byte("1")), + newContent(ocispec.MediaTypeImageLayerGzip, []byte("2")), ) - mc := content(ocispec.MediaTypeImageManifest, m.OCIManifest()) + mc := newContent(ocispec.MediaTypeImageManifest, m.OCIManifest()) m.RegisterHandler(r, name) r.Handle(fmt.Sprintf("/v2/%s/manifests/%s", name, tag), mc) r.Handle(fmt.Sprintf("/v2/%s/manifests/%s", name, mc.Digest()), mc) @@ -331,7 +331,7 @@ type testContent struct { content []byte } -func content(mediaType string, b []byte) testContent { +func newContent(mediaType string, b []byte) testContent { return testContent{ mediaType: mediaType, content: b, diff --git a/remotes/docker/status.go b/remotes/docker/status.go new file mode 100644 index 000000000..34280f1d1 --- /dev/null +++ b/remotes/docker/status.go @@ -0,0 +1,46 @@ +package docker + +import ( + "sync" + + "github.com/containerd/containerd/content" +) + +type Status struct { + content.Status + + // UploadUUID is used by the Docker registry to reference blob uploads + UploadUUID string +} + +type StatusTracker interface { + GetStatus(string) (Status, error) + SetStatus(string, Status) +} + +type memoryStatusTracker struct { + statuses map[string]Status + m sync.Mutex +} + +func NewInMemoryTracker() StatusTracker { + return &memoryStatusTracker{ + statuses: map[string]Status{}, + } +} + +func (t *memoryStatusTracker) GetStatus(ref string) (Status, error) { + t.m.Lock() + defer t.m.Unlock() + status, ok := t.statuses[ref] + if !ok { + return Status{}, content.ErrNotFound + } + return status, nil +} + +func (t *memoryStatusTracker) SetStatus(ref string, status Status) { + t.m.Lock() + t.statuses[ref] = status + t.m.Unlock() +}