remotes/docker: Add MountedFrom and Exists push status
This makes it possible to check whether content didn't actually need to be pushed to the remote registry and was cross-repo mounted or already existed. Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
This commit is contained in:
parent
7cd72cce99
commit
dfc7590d5a
@ -23,6 +23,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -137,6 +138,9 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
|
|||||||
if exists {
|
if exists {
|
||||||
p.tracker.SetStatus(ref, Status{
|
p.tracker.SetStatus(ref, Status{
|
||||||
Committed: true,
|
Committed: true,
|
||||||
|
PushStatus: PushStatus{
|
||||||
|
Exists: true,
|
||||||
|
},
|
||||||
Status: content.Status{
|
Status: content.Status{
|
||||||
Ref: ref,
|
Ref: ref,
|
||||||
Total: desc.Size,
|
Total: desc.Size,
|
||||||
@ -164,6 +168,7 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
|
|||||||
// Start upload request
|
// Start upload request
|
||||||
req = p.request(host, http.MethodPost, "blobs", "uploads/")
|
req = p.request(host, http.MethodPost, "blobs", "uploads/")
|
||||||
|
|
||||||
|
mountedFrom := ""
|
||||||
var resp *http.Response
|
var resp *http.Response
|
||||||
if fromRepo := selectRepositoryMountCandidate(p.refspec, desc.Annotations); fromRepo != "" {
|
if fromRepo := selectRepositoryMountCandidate(p.refspec, desc.Annotations); fromRepo != "" {
|
||||||
preq := requestWithMountFrom(req, desc.Digest.String(), fromRepo)
|
preq := requestWithMountFrom(req, desc.Digest.String(), fromRepo)
|
||||||
@ -180,11 +185,14 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp.StatusCode == http.StatusUnauthorized {
|
switch resp.StatusCode {
|
||||||
|
case http.StatusUnauthorized:
|
||||||
log.G(ctx).Debugf("failed to mount from repository %s", fromRepo)
|
log.G(ctx).Debugf("failed to mount from repository %s", fromRepo)
|
||||||
|
|
||||||
resp.Body.Close()
|
resp.Body.Close()
|
||||||
resp = nil
|
resp = nil
|
||||||
|
case http.StatusCreated:
|
||||||
|
mountedFrom = path.Join(p.refspec.Locator, fromRepo)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -204,6 +212,9 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
|
|||||||
case http.StatusCreated:
|
case http.StatusCreated:
|
||||||
p.tracker.SetStatus(ref, Status{
|
p.tracker.SetStatus(ref, Status{
|
||||||
Committed: true,
|
Committed: true,
|
||||||
|
PushStatus: PushStatus{
|
||||||
|
MountedFrom: mountedFrom,
|
||||||
|
},
|
||||||
Status: content.Status{
|
Status: content.Status{
|
||||||
Ref: ref,
|
Ref: ref,
|
||||||
Total: desc.Size,
|
Total: desc.Size,
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
|
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
|
"github.com/containerd/containerd/reference"
|
||||||
"github.com/containerd/containerd/remotes"
|
"github.com/containerd/containerd/remotes"
|
||||||
"github.com/opencontainers/go-digest"
|
"github.com/opencontainers/go-digest"
|
||||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||||
@ -69,7 +70,7 @@ func TestGetManifestPath(t *testing.T) {
|
|||||||
func TestPusherErrClosedRetry(t *testing.T) {
|
func TestPusherErrClosedRetry(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
p, reg, done := samplePusher(t)
|
p, reg, _, done := samplePusher(t)
|
||||||
defer done()
|
defer done()
|
||||||
|
|
||||||
layerContent := []byte("test")
|
layerContent := []byte("test")
|
||||||
@ -88,7 +89,7 @@ func TestPusherErrClosedRetry(t *testing.T) {
|
|||||||
// TestPusherErrReset tests the push method if the request needs to be retried
|
// TestPusherErrReset tests the push method if the request needs to be retried
|
||||||
// i.e when ErrReset occurs
|
// i.e when ErrReset occurs
|
||||||
func TestPusherErrReset(t *testing.T) {
|
func TestPusherErrReset(t *testing.T) {
|
||||||
p, reg, done := samplePusher(t)
|
p, reg, _, done := samplePusher(t)
|
||||||
defer done()
|
defer done()
|
||||||
|
|
||||||
p.object = "latest@sha256:55d31f3af94c797b65b310569803cacc1c9f4a34bf61afcdc8138f89345c8308"
|
p.object = "latest@sha256:55d31f3af94c797b65b310569803cacc1c9f4a34bf61afcdc8138f89345c8308"
|
||||||
@ -149,7 +150,7 @@ func tryUpload(ctx context.Context, t *testing.T, p dockerPusher, layerContent [
|
|||||||
return cw.Commit(ctx, 0, "")
|
return cw.Commit(ctx, 0, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, func()) {
|
func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, StatusTrackLocker, func()) {
|
||||||
reg := &uploadableMockRegistry{
|
reg := &uploadableMockRegistry{
|
||||||
availableContents: make([]string, 0),
|
availableContents: make([]string, 0),
|
||||||
}
|
}
|
||||||
@ -158,8 +159,12 @@ func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, func())
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
tracker := NewInMemoryTracker()
|
||||||
return dockerPusher{
|
return dockerPusher{
|
||||||
dockerBase: &dockerBase{
|
dockerBase: &dockerBase{
|
||||||
|
refspec: reference.Spec{
|
||||||
|
Locator: "sample",
|
||||||
|
},
|
||||||
repository: "sample",
|
repository: "sample",
|
||||||
hosts: []RegistryHost{
|
hosts: []RegistryHost{
|
||||||
{
|
{
|
||||||
@ -172,8 +177,8 @@ func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, func())
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
object: "sample",
|
object: "sample",
|
||||||
tracker: NewInMemoryTracker(),
|
tracker: tracker,
|
||||||
}, reg, s.Close
|
}, reg, tracker, s.Close
|
||||||
}
|
}
|
||||||
|
|
||||||
var manifestRegexp = regexp.MustCompile(`/([a-z0-9]+)/manifests/(.*)`)
|
var manifestRegexp = regexp.MustCompile(`/([a-z0-9]+)/manifests/(.*)`)
|
||||||
@ -204,11 +209,21 @@ func (u *uploadableMockRegistry) defaultHandler(w http.ResponseWriter, r *http.R
|
|||||||
} else {
|
} else {
|
||||||
w.Header().Set("Location", "/cannotupload")
|
w.Header().Set("Location", "/cannotupload")
|
||||||
}
|
}
|
||||||
|
|
||||||
dgstr := digest.Canonical.Digester()
|
dgstr := digest.Canonical.Digester()
|
||||||
|
|
||||||
if _, err := io.Copy(dgstr.Hash(), r.Body); err != nil {
|
if _, err := io.Copy(dgstr.Hash(), r.Body); err != nil {
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
query := r.URL.Query()
|
||||||
|
if query.Has("mount") && query.Get("from") == "always-mount" {
|
||||||
|
w.Header().Set("Docker-Content-Digest", dgstr.Digest().String())
|
||||||
|
w.WriteHeader(http.StatusCreated)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
u.availableContents = append(u.availableContents, dgstr.Digest().String())
|
u.availableContents = append(u.availableContents, dgstr.Digest().String())
|
||||||
w.WriteHeader(http.StatusAccepted)
|
w.WriteHeader(http.StatusAccepted)
|
||||||
return
|
return
|
||||||
@ -262,7 +277,7 @@ func (u *uploadableMockRegistry) isContentAlreadyExist(c string) bool {
|
|||||||
|
|
||||||
func Test_dockerPusher_push(t *testing.T) {
|
func Test_dockerPusher_push(t *testing.T) {
|
||||||
|
|
||||||
p, reg, done := samplePusher(t)
|
p, reg, tracker, done := samplePusher(t)
|
||||||
defer done()
|
defer done()
|
||||||
|
|
||||||
reg.uploadable = true
|
reg.uploadable = true
|
||||||
@ -280,6 +295,7 @@ func Test_dockerPusher_push(t *testing.T) {
|
|||||||
mediatype string
|
mediatype string
|
||||||
ref string
|
ref string
|
||||||
unavailableOnFail bool
|
unavailableOnFail bool
|
||||||
|
annotations map[string]string
|
||||||
}
|
}
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
@ -288,6 +304,7 @@ func Test_dockerPusher_push(t *testing.T) {
|
|||||||
args args
|
args args
|
||||||
checkerFunc func(writer *pushWriter) bool
|
checkerFunc func(writer *pushWriter) bool
|
||||||
wantErr error
|
wantErr error
|
||||||
|
wantStatus *PushStatus
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "when a manifest is pushed",
|
name: "when a manifest is pushed",
|
||||||
@ -321,6 +338,68 @@ func Test_dockerPusher_push(t *testing.T) {
|
|||||||
unavailableOnFail: false,
|
unavailableOnFail: false,
|
||||||
},
|
},
|
||||||
wantErr: fmt.Errorf("content %v on remote: %w", digest.FromBytes(manifestContent), errdefs.ErrAlreadyExists),
|
wantErr: fmt.Errorf("content %v on remote: %w", digest.FromBytes(manifestContent), errdefs.ErrAlreadyExists),
|
||||||
|
wantStatus: &PushStatus{
|
||||||
|
Exists: true,
|
||||||
|
MountedFrom: "",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "success cross-repo mount a blob layer",
|
||||||
|
dp: p,
|
||||||
|
// Not needed to set the base object as it is used to generate path only in case of manifests
|
||||||
|
// dockerBaseObject:
|
||||||
|
args: args{
|
||||||
|
content: layerContent,
|
||||||
|
mediatype: ocispec.MediaTypeImageLayer,
|
||||||
|
ref: fmt.Sprintf("layer2-%s", layerContentDigest.String()),
|
||||||
|
unavailableOnFail: false,
|
||||||
|
annotations: map[string]string{
|
||||||
|
distributionSourceLabelKey("sample"): "always-mount",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
checkerFunc: func(writer *pushWriter) bool {
|
||||||
|
select {
|
||||||
|
case resp := <-writer.respC:
|
||||||
|
// 201 should be the response code when uploading a new blob
|
||||||
|
return resp.StatusCode == http.StatusCreated
|
||||||
|
case <-writer.errC:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
wantErr: fmt.Errorf("content %v on remote: %w", digest.FromBytes(layerContent), errdefs.ErrAlreadyExists),
|
||||||
|
wantStatus: &PushStatus{
|
||||||
|
MountedFrom: "sample/always-mount",
|
||||||
|
Exists: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "failed to cross-repo mount a blob layer",
|
||||||
|
dp: p,
|
||||||
|
// Not needed to set the base object as it is used to generate path only in case of manifests
|
||||||
|
// dockerBaseObject:
|
||||||
|
args: args{
|
||||||
|
content: layerContent,
|
||||||
|
mediatype: ocispec.MediaTypeImageLayer,
|
||||||
|
ref: fmt.Sprintf("layer3-%s", layerContentDigest.String()),
|
||||||
|
unavailableOnFail: false,
|
||||||
|
annotations: map[string]string{
|
||||||
|
distributionSourceLabelKey("sample"): "never-mount",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
checkerFunc: func(writer *pushWriter) bool {
|
||||||
|
select {
|
||||||
|
case resp := <-writer.respC:
|
||||||
|
// 201 should be the response code when uploading a new blob
|
||||||
|
return resp.StatusCode == http.StatusCreated
|
||||||
|
case <-writer.errC:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
wantErr: nil,
|
||||||
|
wantStatus: &PushStatus{
|
||||||
|
MountedFrom: "",
|
||||||
|
Exists: false,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "trying to push a blob layer",
|
name: "trying to push a blob layer",
|
||||||
@ -343,14 +422,20 @@ func Test_dockerPusher_push(t *testing.T) {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
wantErr: nil,
|
wantErr: nil,
|
||||||
|
wantStatus: &PushStatus{
|
||||||
|
MountedFrom: "",
|
||||||
|
Exists: false,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
|
test := test
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
desc := ocispec.Descriptor{
|
desc := ocispec.Descriptor{
|
||||||
MediaType: test.args.mediatype,
|
MediaType: test.args.mediatype,
|
||||||
Digest: digest.FromBytes(test.args.content),
|
Digest: digest.FromBytes(test.args.content),
|
||||||
Size: int64(len(test.args.content)),
|
Size: int64(len(test.args.content)),
|
||||||
|
Annotations: test.args.annotations,
|
||||||
}
|
}
|
||||||
|
|
||||||
test.dp.object = test.dockerBaseObject
|
test.dp.object = test.dockerBaseObject
|
||||||
@ -358,6 +443,13 @@ func Test_dockerPusher_push(t *testing.T) {
|
|||||||
got, err := test.dp.push(context.Background(), desc, test.args.ref, test.args.unavailableOnFail)
|
got, err := test.dp.push(context.Background(), desc, test.args.ref, test.args.unavailableOnFail)
|
||||||
|
|
||||||
assert.Equal(t, test.wantErr, err)
|
assert.Equal(t, test.wantErr, err)
|
||||||
|
|
||||||
|
if test.wantStatus != nil {
|
||||||
|
status, err := tracker.GetStatus(test.args.ref)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, *test.wantStatus, status.PushStatus)
|
||||||
|
}
|
||||||
|
|
||||||
// if an error is expected, further comparisons are not required.
|
// if an error is expected, further comparisons are not required.
|
||||||
if test.wantErr != nil {
|
if test.wantErr != nil {
|
||||||
return
|
return
|
||||||
|
@ -36,6 +36,17 @@ type Status struct {
|
|||||||
|
|
||||||
// UploadUUID is used by the Docker registry to reference blob uploads
|
// UploadUUID is used by the Docker registry to reference blob uploads
|
||||||
UploadUUID string
|
UploadUUID string
|
||||||
|
|
||||||
|
// PushStatus contains status related to push.
|
||||||
|
PushStatus
|
||||||
|
}
|
||||||
|
|
||||||
|
type PushStatus struct {
|
||||||
|
// MountedFrom is the source content was cross-repo mounted from (empty if no cross-repo mount was performed).
|
||||||
|
MountedFrom string
|
||||||
|
|
||||||
|
// Exists indicates whether content already exists in the repository and wasn't uploaded.
|
||||||
|
Exists bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// StatusTracker to track status of operations
|
// StatusTracker to track status of operations
|
||||||
|
Loading…
Reference in New Issue
Block a user