diff --git a/remotes/docker/pusher.go b/remotes/docker/pusher.go index ef6e8056a..2dfe42d4f 100644 --- a/remotes/docker/pusher.go +++ b/remotes/docker/pusher.go @@ -23,6 +23,7 @@ import ( "io" "net/http" "net/url" + "path" "strings" "sync" "time" @@ -137,6 +138,9 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str if exists { p.tracker.SetStatus(ref, Status{ Committed: true, + PushStatus: PushStatus{ + Exists: true, + }, Status: content.Status{ Ref: ref, Total: desc.Size, @@ -164,6 +168,7 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str // Start upload request req = p.request(host, http.MethodPost, "blobs", "uploads/") + mountedFrom := "" var resp *http.Response if fromRepo := selectRepositoryMountCandidate(p.refspec, desc.Annotations); fromRepo != "" { preq := requestWithMountFrom(req, desc.Digest.String(), fromRepo) @@ -180,11 +185,14 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str return nil, err } - if resp.StatusCode == http.StatusUnauthorized { + switch resp.StatusCode { + case http.StatusUnauthorized: log.G(ctx).Debugf("failed to mount from repository %s", fromRepo) resp.Body.Close() resp = nil + case http.StatusCreated: + mountedFrom = path.Join(p.refspec.Locator, fromRepo) } } @@ -204,6 +212,9 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str case http.StatusCreated: p.tracker.SetStatus(ref, Status{ Committed: true, + PushStatus: PushStatus{ + MountedFrom: mountedFrom, + }, Status: content.Status{ Ref: ref, Total: desc.Size, diff --git a/remotes/docker/pusher_test.go b/remotes/docker/pusher_test.go index d982a7de5..57ded5bea 100644 --- a/remotes/docker/pusher_test.go +++ b/remotes/docker/pusher_test.go @@ -31,6 +31,7 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/reference" "github.com/containerd/containerd/remotes" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -69,7 +70,7 @@ func TestGetManifestPath(t *testing.T) { func TestPusherErrClosedRetry(t *testing.T) { ctx := context.Background() - p, reg, done := samplePusher(t) + p, reg, _, done := samplePusher(t) defer done() layerContent := []byte("test") @@ -88,7 +89,7 @@ func TestPusherErrClosedRetry(t *testing.T) { // TestPusherErrReset tests the push method if the request needs to be retried // i.e when ErrReset occurs func TestPusherErrReset(t *testing.T) { - p, reg, done := samplePusher(t) + p, reg, _, done := samplePusher(t) defer done() p.object = "latest@sha256:55d31f3af94c797b65b310569803cacc1c9f4a34bf61afcdc8138f89345c8308" @@ -149,7 +150,7 @@ func tryUpload(ctx context.Context, t *testing.T, p dockerPusher, layerContent [ return cw.Commit(ctx, 0, "") } -func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, func()) { +func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, StatusTrackLocker, func()) { reg := &uploadableMockRegistry{ availableContents: make([]string, 0), } @@ -158,8 +159,12 @@ func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, func()) if err != nil { t.Fatal(err) } + tracker := NewInMemoryTracker() return dockerPusher{ dockerBase: &dockerBase{ + refspec: reference.Spec{ + Locator: "sample", + }, repository: "sample", hosts: []RegistryHost{ { @@ -172,8 +177,8 @@ func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, func()) }, }, object: "sample", - tracker: NewInMemoryTracker(), - }, reg, s.Close + tracker: tracker, + }, reg, tracker, s.Close } var manifestRegexp = regexp.MustCompile(`/([a-z0-9]+)/manifests/(.*)`) @@ -204,11 +209,21 @@ func (u *uploadableMockRegistry) defaultHandler(w http.ResponseWriter, r *http.R } else { w.Header().Set("Location", "/cannotupload") } + dgstr := digest.Canonical.Digester() + if _, err := io.Copy(dgstr.Hash(), r.Body); err != nil { w.WriteHeader(http.StatusInternalServerError) return } + + query := r.URL.Query() + if query.Has("mount") && query.Get("from") == "always-mount" { + w.Header().Set("Docker-Content-Digest", dgstr.Digest().String()) + w.WriteHeader(http.StatusCreated) + return + } + u.availableContents = append(u.availableContents, dgstr.Digest().String()) w.WriteHeader(http.StatusAccepted) return @@ -262,7 +277,7 @@ func (u *uploadableMockRegistry) isContentAlreadyExist(c string) bool { func Test_dockerPusher_push(t *testing.T) { - p, reg, done := samplePusher(t) + p, reg, tracker, done := samplePusher(t) defer done() reg.uploadable = true @@ -280,6 +295,7 @@ func Test_dockerPusher_push(t *testing.T) { mediatype string ref string unavailableOnFail bool + annotations map[string]string } tests := []struct { name string @@ -288,6 +304,7 @@ func Test_dockerPusher_push(t *testing.T) { args args checkerFunc func(writer *pushWriter) bool wantErr error + wantStatus *PushStatus }{ { name: "when a manifest is pushed", @@ -321,6 +338,68 @@ func Test_dockerPusher_push(t *testing.T) { unavailableOnFail: false, }, wantErr: fmt.Errorf("content %v on remote: %w", digest.FromBytes(manifestContent), errdefs.ErrAlreadyExists), + wantStatus: &PushStatus{ + Exists: true, + MountedFrom: "", + }, + }, + { + name: "success cross-repo mount a blob layer", + dp: p, + // Not needed to set the base object as it is used to generate path only in case of manifests + // dockerBaseObject: + args: args{ + content: layerContent, + mediatype: ocispec.MediaTypeImageLayer, + ref: fmt.Sprintf("layer2-%s", layerContentDigest.String()), + unavailableOnFail: false, + annotations: map[string]string{ + distributionSourceLabelKey("sample"): "always-mount", + }, + }, + checkerFunc: func(writer *pushWriter) bool { + select { + case resp := <-writer.respC: + // 201 should be the response code when uploading a new blob + return resp.StatusCode == http.StatusCreated + case <-writer.errC: + return false + } + }, + wantErr: fmt.Errorf("content %v on remote: %w", digest.FromBytes(layerContent), errdefs.ErrAlreadyExists), + wantStatus: &PushStatus{ + MountedFrom: "sample/always-mount", + Exists: false, + }, + }, + { + name: "failed to cross-repo mount a blob layer", + dp: p, + // Not needed to set the base object as it is used to generate path only in case of manifests + // dockerBaseObject: + args: args{ + content: layerContent, + mediatype: ocispec.MediaTypeImageLayer, + ref: fmt.Sprintf("layer3-%s", layerContentDigest.String()), + unavailableOnFail: false, + annotations: map[string]string{ + distributionSourceLabelKey("sample"): "never-mount", + }, + }, + checkerFunc: func(writer *pushWriter) bool { + select { + case resp := <-writer.respC: + // 201 should be the response code when uploading a new blob + return resp.StatusCode == http.StatusCreated + case <-writer.errC: + return false + } + }, + wantErr: nil, + wantStatus: &PushStatus{ + MountedFrom: "", + Exists: false, + }, }, { name: "trying to push a blob layer", @@ -343,14 +422,20 @@ func Test_dockerPusher_push(t *testing.T) { } }, wantErr: nil, + wantStatus: &PushStatus{ + MountedFrom: "", + Exists: false, + }, }, } for _, test := range tests { + test := test t.Run(test.name, func(t *testing.T) { desc := ocispec.Descriptor{ - MediaType: test.args.mediatype, - Digest: digest.FromBytes(test.args.content), - Size: int64(len(test.args.content)), + MediaType: test.args.mediatype, + Digest: digest.FromBytes(test.args.content), + Size: int64(len(test.args.content)), + Annotations: test.args.annotations, } test.dp.object = test.dockerBaseObject @@ -358,6 +443,13 @@ func Test_dockerPusher_push(t *testing.T) { got, err := test.dp.push(context.Background(), desc, test.args.ref, test.args.unavailableOnFail) assert.Equal(t, test.wantErr, err) + + if test.wantStatus != nil { + status, err := tracker.GetStatus(test.args.ref) + assert.NoError(t, err) + assert.Equal(t, *test.wantStatus, status.PushStatus) + } + // if an error is expected, further comparisons are not required. if test.wantErr != nil { return diff --git a/remotes/docker/status.go b/remotes/docker/status.go index 1f7b278ae..1a9227725 100644 --- a/remotes/docker/status.go +++ b/remotes/docker/status.go @@ -36,6 +36,17 @@ type Status struct { // UploadUUID is used by the Docker registry to reference blob uploads UploadUUID string + + // PushStatus contains status related to push. + PushStatus +} + +type PushStatus struct { + // MountedFrom is the source content was cross-repo mounted from (empty if no cross-repo mount was performed). + MountedFrom string + + // Exists indicates whether content already exists in the repository and wasn't uploaded. + Exists bool } // StatusTracker to track status of operations