remotes: fix dockerPusher to handle abort correctly

`dockerPusher` provides `pushWriter` which implements `content.Writer`.
However, even if `pushWriter` become abort status (i.e. `Close()` is called
before `Commit()`), `dockerPusher` doesn't recognise that status and treats that
writer as on-going.
This behaviour doesn't allow the client to retry an aborted push.

This commit fixes this issue.
This commit also adds an test to ensure that the issue is fixed.

Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
This commit is contained in:
Kohei Tokunaga 2021-11-13 21:09:03 +09:00
parent aa1b073616
commit a97564411c
3 changed files with 121 additions and 1 deletions

View File

@ -78,7 +78,7 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
if status.Committed && status.Offset == status.Total {
return nil, errors.Wrapf(errdefs.ErrAlreadyExists, "ref %v", ref)
}
if unavailableOnFail {
if unavailableOnFail && status.ErrClosed == nil {
// Another push of this ref is happening elsewhere. The rest of function
// will continue only when `errdefs.IsNotFound(err) == true` (i.e. there
// is no actively-tracked ref already).
@ -354,6 +354,12 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
}
func (pw *pushWriter) Close() error {
status, err := pw.tracker.GetStatus(pw.ref)
if err == nil && !status.Committed {
// Closing an incomplete writer. Record this as an error so that following write can retry it.
status.ErrClosed = errors.New("closed incomplete writer")
pw.tracker.SetStatus(pw.ref, status)
}
return pw.pipe.Close()
}

View File

@ -17,10 +17,20 @@
package docker
import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"regexp"
"strings"
"testing"
"github.com/containerd/containerd/content"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
func TestGetManifestPath(t *testing.T) {
@ -50,3 +60,104 @@ func TestGetManifestPath(t *testing.T) {
}
}
}
// TestPusherErrClosedRetry tests if retrying work when error occurred on close.
func TestPusherErrClosedRetry(t *testing.T) {
ctx := context.Background()
p, reg, done := samplePusher(t)
defer done()
layerContent := []byte("test")
reg.uploadable = false
if err := tryUpload(ctx, t, p, layerContent); err == nil {
t.Errorf("upload should fail but succeeded")
}
// retry
reg.uploadable = true
if err := tryUpload(ctx, t, p, layerContent); err != nil {
t.Errorf("upload should succeed but got %v", err)
}
}
func tryUpload(ctx context.Context, t *testing.T, p dockerPusher, layerContent []byte) error {
desc := ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageLayerGzip,
Digest: digest.FromBytes(layerContent),
Size: int64(len(layerContent)),
}
cw, err := p.Writer(ctx, content.WithRef("test-1"), content.WithDescriptor(desc))
if err != nil {
return err
}
defer cw.Close()
if _, err := cw.Write(layerContent); err != nil {
return err
}
return cw.Commit(ctx, 0, "")
}
func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, func()) {
reg := &uploadableMockRegistry{}
s := httptest.NewServer(reg)
u, err := url.Parse(s.URL)
if err != nil {
t.Fatal(err)
}
return dockerPusher{
dockerBase: &dockerBase{
repository: "sample",
hosts: []RegistryHost{
{
Client: s.Client(),
Host: u.Host,
Scheme: u.Scheme,
Path: u.Path,
Capabilities: HostCapabilityPush | HostCapabilityResolve,
},
},
},
object: "sample",
tracker: NewInMemoryTracker(),
}, reg, s.Close
}
var manifestRegexp = regexp.MustCompile(`/([a-z0-9]+)/manifests/(.*)`)
var blobUploadRegexp = regexp.MustCompile(`/([a-z0-9]+)/blobs/uploads/`)
// uploadableMockRegistry provides minimal registry APIs which are enough to serve requests from dockerPusher.
type uploadableMockRegistry struct {
uploadable bool
}
func (u *uploadableMockRegistry) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" {
if matches := blobUploadRegexp.FindStringSubmatch(r.URL.Path); len(matches) != 0 {
if u.uploadable {
w.Header().Set("Location", "/upload")
} else {
w.Header().Set("Location", "/cannotupload")
}
w.WriteHeader(202)
return
}
} else if r.Method == "PUT" {
mfstMatches := manifestRegexp.FindStringSubmatch(r.URL.Path)
if len(mfstMatches) != 0 || strings.HasPrefix(r.URL.Path, "/upload") {
dgstr := digest.Canonical.Digester()
if _, err := io.Copy(dgstr.Hash(), r.Body); err != nil {
w.WriteHeader(500)
return
}
w.Header().Set("Docker-Content-Digest", dgstr.Digest().String())
w.WriteHeader(201)
return
} else if r.URL.Path == "/cannotupload" {
w.WriteHeader(500)
return
}
}
fmt.Println(r)
w.WriteHeader(404)
}

View File

@ -31,6 +31,9 @@ type Status struct {
Committed bool
// ErrClosed contains error encountered on close.
ErrClosed error
// UploadUUID is used by the Docker registry to reference blob uploads
UploadUUID string
}