diff --git a/client.go b/client.go index 08095d5df..6f2f09155 100644 --- a/client.go +++ b/client.go @@ -209,10 +209,6 @@ type RemoteContext struct { // afterwards. Unpacking is required to run an image. Unpack bool - // PushWrapper allows hooking into the push method. This can be used - // track content that is being sent to the remote. - PushWrapper func(remotes.Pusher) remotes.Pusher - // BaseHandlers are a set of handlers which get are called on dispatch. // These handlers always get called before any operation specific // handlers. @@ -251,15 +247,6 @@ func WithImageHandler(h images.Handler) RemoteOpts { } } -// WithPushWrapper is used to wrap a pusher to hook into -// the push content as it is sent to a remote. -func WithPushWrapper(w func(remotes.Pusher) remotes.Pusher) RemoteOpts { - return func(client *Client, c *RemoteContext) error { - c.PushWrapper = w - return nil - } -} - func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpts) (Image, error) { pullCtx := defaultRemoteContext() for _, o := range opts { @@ -318,10 +305,6 @@ func (c *Client) Push(ctx context.Context, ref string, desc ocispec.Descriptor, return err } - if pushCtx.PushWrapper != nil { - pusher = pushCtx.PushWrapper(pusher) - } - var m sync.Mutex manifestStack := []ocispec.Descriptor{} diff --git a/cmd/dist/push.go b/cmd/dist/push.go index 439b33521..44a450392 100644 --- a/cmd/dist/push.go +++ b/cmd/dist/push.go @@ -92,7 +92,6 @@ var pushCommand = cli.Command{ return client.Push(ctx, ref, desc, containerd.WithResolver(resolver), containerd.WithImageHandler(jobHandler), - containerd.WithPushWrapper(ongoing.wrapPusher), ) }) @@ -156,16 +155,16 @@ func (pt *pushTracker) Close() error { return nil } -type pushWrapper struct { - jobs *pushjobs - pusher remotes.Pusher -} - -func (pw pushWrapper) Push(ctx context.Context, desc ocispec.Descriptor, r io.Reader) error { - tr := pw.jobs.track(remotes.MakeRefKey(ctx, desc), desc.Size) - defer tr.Close() - return pw.pusher.Push(ctx, desc, io.TeeReader(r, tr)) -} +//type pushWrapper struct { +// jobs *pushjobs +// pusher remotes.Pusher +//} +// +//func (pw pushWrapper) Push(ctx context.Context, desc ocispec.Descriptor, r io.Reader) error { +// tr := pw.jobs.track(remotes.MakeRefKey(ctx, desc), desc.Size) +// defer tr.Close() +// return pw.pusher.Push(ctx, desc, io.TeeReader(r, tr)) +//} type pushStatus struct { name string @@ -184,12 +183,12 @@ func newPushJobs() *pushjobs { return &pushjobs{jobs: make(map[string]*pushTracker)} } -func (j *pushjobs) wrapPusher(p remotes.Pusher) remotes.Pusher { - return pushWrapper{ - jobs: j, - pusher: p, - } -} +//func (j *pushjobs) wrapPusher(p remotes.Pusher) remotes.Pusher { +// return pushWrapper{ +// jobs: j, +// pusher: p, +// } +//} func (j *pushjobs) add(ref string) { j.mu.Lock() diff --git a/cmd/dist/pushobject.go b/cmd/dist/pushobject.go index 196036a88..fab26b8fa 100644 --- a/cmd/dist/pushobject.go +++ b/cmd/dist/pushobject.go @@ -3,6 +3,7 @@ package main import ( "fmt" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/log" digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -63,8 +64,13 @@ var pushObjectCommand = cli.Command{ } defer rc.Close() + cw, err := pusher.Push(ctx, desc) + if err != nil { + return err + } + // TODO: Progress reader - if err = pusher.Push(ctx, desc, rc); err != nil { + if err := content.Copy(cw, rc, desc.Size, desc.Digest); err != nil { return err } diff --git a/remotes/docker/pusher.go b/remotes/docker/pusher.go index faeaa09ba..0b87828f0 100644 --- a/remotes/docker/pusher.go +++ b/remotes/docker/pusher.go @@ -1,16 +1,19 @@ package docker import ( - "bytes" "context" "io" "io/ioutil" "net/http" "path" "strings" + "time" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/reference" + digest "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -20,8 +23,11 @@ type dockerPusher struct { tag string } -func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor, r io.Reader) error { +func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (content.Writer, error) { + // TODO: Check status, return already exists + var ( + ref string isManifest bool existCheck string ) @@ -37,79 +43,64 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor, r io.Re req, err := http.NewRequest(http.MethodHead, p.url(existCheck), nil) if err != nil { - return err + return nil, err } req.Header.Set("Accept", strings.Join([]string{desc.MediaType, `*`}, ", ")) resp, err := p.doRequestWithRetries(ctx, req, nil) if err != nil { if errors.Cause(err) != ErrInvalidAuthorization { - return err + return nil, err } log.G(ctx).WithError(err).Debugf("Unable to check existence, continuing with push") } else { if resp.StatusCode == http.StatusOK { - return nil + return nil, content.ErrExists + // TODO: Update status with total 0 } if resp.StatusCode != http.StatusNotFound { // TODO: log error - return errors.Errorf("unexpected response: %s", resp.Status) + return nil, errors.Errorf("unexpected response: %s", resp.Status) } } // TODO: Lookup related objects for cross repository push if isManifest { - // Read all to use bytes.Reader for using GetBody - b, err := ioutil.ReadAll(r) - if err != nil { - return errors.Wrap(err, "failed to read manifest") - } var putPath string + refspec := reference.Spec{Locator: p.locator} if p.tag != "" { putPath = path.Join("manifests", p.tag) + refspec.Object = p.tag } else { putPath = path.Join("manifests", desc.Digest.String()) + refspec.Object = "@" + desc.Digest.String() } + ref = refspec.String() - req, err := http.NewRequest(http.MethodPut, p.url(putPath), nil) + req, err = http.NewRequest(http.MethodPut, p.url(putPath), nil) if err != nil { - return err - } - req.ContentLength = int64(len(b)) - req.Body = ioutil.NopCloser(bytes.NewReader(b)) - req.GetBody = func() (io.ReadCloser, error) { - return ioutil.NopCloser(bytes.NewReader(b)), nil + return nil, err } req.Header.Add("Content-Type", desc.MediaType) - - resp, err := p.doRequestWithRetries(ctx, req, nil) - if err != nil { - return err - } - if resp.StatusCode != http.StatusCreated { - // TODO: log error - return errors.Errorf("unexpected response: %s", resp.Status) - } } else { // TODO: Do monolithic upload if size is small - // TODO: Turn multi-request blob uploader into ingester - // Start upload request - req, err := http.NewRequest(http.MethodPost, p.url("blobs", "uploads")+"/", nil) + req, err = http.NewRequest(http.MethodPost, p.url("blobs", "uploads")+"/", nil) if err != nil { - return err + return nil, err } resp, err := p.doRequestWithRetries(ctx, req, nil) if err != nil { - return err + return nil, err } if resp.StatusCode != http.StatusAccepted { // TODO: log error - return errors.Errorf("unexpected response: %s", resp.Status) + return nil, errors.Errorf("unexpected response: %s", resp.Status) } + ref = resp.Header.Get("Docker-Upload-Uuid") location := resp.Header.Get("Location") // Support paths without host in location @@ -119,26 +110,127 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor, r io.Re location = u.String() } - // TODO: Support chunked upload - req, err = http.NewRequest(http.MethodPut, location, r) + req, err = http.NewRequest(http.MethodPut, location, nil) if err != nil { - return err + return nil, err } q := req.URL.Query() q.Add("digest", desc.Digest.String()) req.URL.RawQuery = q.Encode() - req.ContentLength = desc.Size + } + // TODO: Support chunked upload + + // TODO: Set status + + pr, pw := io.Pipe() + respC := make(chan *http.Response, 1) + + req.Body = ioutil.NopCloser(pr) + req.ContentLength = desc.Size + + go func() { + defer close(respC) resp, err = p.doRequest(ctx, req) if err != nil { - return err + pr.CloseWithError(err) + return } if resp.StatusCode != http.StatusCreated { // TODO: log error - return errors.Errorf("unexpected response: %s", resp.Status) + pr.CloseWithError(errors.Errorf("unexpected response: %s", resp.Status)) } + respC <- resp + }() + return &pushWriter{ + base: p.dockerBase, + ref: ref, + pipe: pw, + responseC: respC, + isManifest: isManifest, + + // TODO: Move this + status: content.Status{ + Ref: ref, + Total: desc.Size, + Expected: desc.Digest, + StartedAt: time.Now(), + }, + }, nil +} + +type pushWriter struct { + base *dockerBase + ref string + + pipe *io.PipeWriter + responseC <-chan *http.Response + isManifest bool + + // TODO: Move this to lookup from base + status content.Status +} + +func (pw *pushWriter) Write(p []byte) (n int, err error) { + n, err = pw.pipe.Write(p) + pw.status.Offset += int64(n) + pw.status.UpdatedAt = time.Now() + return +} + +func (pw *pushWriter) Close() error { + return pw.pipe.Close() +} + +func (pw *pushWriter) Status() (content.Status, error) { + // TODO: Lookup status from base tracker + return pw.status, nil + +} + +func (pw *pushWriter) Digest() digest.Digest { + // TODO: Get rid of this function? + return pw.status.Expected +} + +func (pw *pushWriter) Commit(size int64, expected digest.Digest) error { + // Check whether read has already thrown an error + if _, err := pw.pipe.Write([]byte{}); err != nil && err != io.ErrClosedPipe { + return errors.Wrap(err, "pipe error before commit") + } + + if err := pw.pipe.Close(); err != nil { + return err + } + // TODO: Update status to determine committing + + // TODO: timeout waiting for response + resp := <-pw.responseC + if resp == nil { + return errors.New("no response") + } + + // TODO: Get status for size check + + if expected == "" { + expected = pw.status.Expected + } + + actual, err := digest.Parse(resp.Header.Get("Docker-Content-Digest")) + if err != nil { + return errors.Wrap(err, "invalid content digest in response") + } + + if actual != expected { + return errors.Errorf("got digest %s, expected %s", actual, expected) } return nil } + +func (pw *pushWriter) Truncate(size int64) error { + // TODO: if blob close request and start new request at offset + // TODO: always error on manifest + return errors.New("cannot truncate remote upload") +} diff --git a/remotes/docker/resolver.go b/remotes/docker/resolver.go index 8b55d7c19..2dbd5bed3 100644 --- a/remotes/docker/resolver.go +++ b/remotes/docker/resolver.go @@ -52,6 +52,8 @@ type ResolverOptions struct { // Client is the http client to used when making registry requests Client *http.Client + + // TODO: Add upload status tracker } // NewResolver returns a new resolver to a Docker registry @@ -216,8 +218,9 @@ func (r *dockerResolver) Pusher(ctx context.Context, ref string) (remotes.Pusher } type dockerBase struct { - base url.URL - token string + base url.URL + locator string + token string client *http.Client useBasic bool @@ -257,6 +260,7 @@ func (r *dockerResolver) base(refspec reference.Spec) (*dockerBase, error) { return &dockerBase{ base: base, + locator: refspec.Locator, client: r.client, username: username, secret: secret, diff --git a/remotes/handlers.go b/remotes/handlers.go index 06d75a5ca..ca99e1d0e 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -76,6 +76,8 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc return content.Copy(cw, rc, desc.Size, desc.Digest) } +// PushHandler returns a handler that will push all content from the provider +// using a writer from the pusher. func PushHandler(provider content.Provider, pusher Pusher) images.HandlerFunc { return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{ @@ -84,13 +86,29 @@ func PushHandler(provider content.Provider, pusher Pusher) images.HandlerFunc { "size": desc.Size, })) - log.G(ctx).Debug("push") - r, err := provider.Reader(ctx, desc.Digest) - if err != nil { - return nil, err - } - defer r.Close() - - return nil, pusher.Push(ctx, desc, r) + err := push(ctx, provider, pusher, desc) + return nil, err } } + +func push(ctx context.Context, provider content.Provider, pusher Pusher, desc ocispec.Descriptor) error { + log.G(ctx).Debug("push") + + cw, err := pusher.Push(ctx, desc) + if err != nil { + if !content.IsExists(err) { + return err + } + + return nil + } + defer cw.Close() + + rc, err := provider.Reader(ctx, desc.Digest) + if err != nil { + return err + } + defer rc.Close() + + return content.Copy(cw, rc, desc.Size, desc.Digest) +} diff --git a/remotes/resolver.go b/remotes/resolver.go index 0dde5bfc8..d5fa60fc6 100644 --- a/remotes/resolver.go +++ b/remotes/resolver.go @@ -4,6 +4,7 @@ import ( "context" "io" + "github.com/containerd/containerd/content" ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) @@ -37,9 +38,9 @@ type Fetcher interface { } type Pusher interface { - // Push pushes the resource identified by the descriptor using the - // passed in reader. - Push(ctx context.Context, d ocispec.Descriptor, r io.Reader) error + // Push returns a content writer for the given resource identified + // by the descriptor. + Push(ctx context.Context, d ocispec.Descriptor) (content.Writer, error) } // FetcherFunc allows package users to implement a Fetcher with just a