diff --git a/remotes/docker/pusher.go b/remotes/docker/pusher.go index 593aa5d93..b705c34fa 100644 --- a/remotes/docker/pusher.go +++ b/remotes/docker/pusher.go @@ -78,7 +78,7 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str if status.Committed && status.Offset == status.Total { return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "ref %v", ref) } - if unavailableOnFail { + if unavailableOnFail && status.ErrClosed == nil { // Another push of this ref is happening elsewhere. The rest of function // will continue only when `errdefs.IsNotFound(err) == true` (i.e. there // is no actively-tracked ref already). @@ -354,6 +354,12 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) { } func (pw *pushWriter) Close() error { + status, err := pw.tracker.GetStatus(pw.ref) + if err == nil && !status.Committed { + // Closing an incomplete writer. Record this as an error so that following write can retry it. + status.ErrClosed = errors.New("closed incomplete writer") + pw.tracker.SetStatus(pw.ref, status) + } return pw.pipe.Close() } diff --git a/remotes/docker/pusher_test.go b/remotes/docker/pusher_test.go index 8b81eb3b2..2dfe9a8d4 100644 --- a/remotes/docker/pusher_test.go +++ b/remotes/docker/pusher_test.go @@ -17,10 +17,20 @@ package docker import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "net/url" "reflect" + "regexp" + "strings" "testing" + "github.com/containerd/containerd/content" digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) func TestGetManifestPath(t *testing.T) { @@ -50,3 +60,104 @@ func TestGetManifestPath(t *testing.T) { } } } + +// TestPusherErrClosedRetry tests if retrying work when error occurred on close. +func TestPusherErrClosedRetry(t *testing.T) { + ctx := context.Background() + + p, reg, done := samplePusher(t) + defer done() + + layerContent := []byte("test") + reg.uploadable = false + if err := tryUpload(ctx, t, p, layerContent); err == nil { + t.Errorf("upload should fail but succeeded") + } + + // retry + reg.uploadable = true + if err := tryUpload(ctx, t, p, layerContent); err != nil { + t.Errorf("upload should succeed but got %v", err) + } +} + +func tryUpload(ctx context.Context, t *testing.T, p dockerPusher, layerContent []byte) error { + desc := ocispec.Descriptor{ + MediaType: ocispec.MediaTypeImageLayerGzip, + Digest: digest.FromBytes(layerContent), + Size: int64(len(layerContent)), + } + cw, err := p.Writer(ctx, content.WithRef("test-1"), content.WithDescriptor(desc)) + if err != nil { + return err + } + defer cw.Close() + if _, err := cw.Write(layerContent); err != nil { + return err + } + return cw.Commit(ctx, 0, "") +} + +func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, func()) { + reg := &uploadableMockRegistry{} + s := httptest.NewServer(reg) + u, err := url.Parse(s.URL) + if err != nil { + t.Fatal(err) + } + return dockerPusher{ + dockerBase: &dockerBase{ + repository: "sample", + hosts: []RegistryHost{ + { + Client: s.Client(), + Host: u.Host, + Scheme: u.Scheme, + Path: u.Path, + Capabilities: HostCapabilityPush | HostCapabilityResolve, + }, + }, + }, + object: "sample", + tracker: NewInMemoryTracker(), + }, reg, s.Close +} + +var manifestRegexp = regexp.MustCompile(`/([a-z0-9]+)/manifests/(.*)`) +var blobUploadRegexp = regexp.MustCompile(`/([a-z0-9]+)/blobs/uploads/`) + +// uploadableMockRegistry provides minimal registry APIs which are enough to serve requests from dockerPusher. +type uploadableMockRegistry struct { + uploadable bool +} + +func (u *uploadableMockRegistry) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method == "POST" { + if matches := blobUploadRegexp.FindStringSubmatch(r.URL.Path); len(matches) != 0 { + if u.uploadable { + w.Header().Set("Location", "/upload") + } else { + w.Header().Set("Location", "/cannotupload") + } + w.WriteHeader(202) + return + } + } else if r.Method == "PUT" { + mfstMatches := manifestRegexp.FindStringSubmatch(r.URL.Path) + if len(mfstMatches) != 0 || strings.HasPrefix(r.URL.Path, "/upload") { + dgstr := digest.Canonical.Digester() + if _, err := io.Copy(dgstr.Hash(), r.Body); err != nil { + w.WriteHeader(500) + return + } + w.Header().Set("Docker-Content-Digest", dgstr.Digest().String()) + w.WriteHeader(201) + return + } else if r.URL.Path == "/cannotupload" { + w.WriteHeader(500) + return + } + } + fmt.Println(r) + w.WriteHeader(404) +} diff --git a/remotes/docker/status.go b/remotes/docker/status.go index 9751edac7..08768c297 100644 --- a/remotes/docker/status.go +++ b/remotes/docker/status.go @@ -31,6 +31,9 @@ type Status struct { Committed bool + // ErrClosed contains error encountered on close. + ErrClosed error + // UploadUUID is used by the Docker registry to reference blob uploads UploadUUID string }