Merge pull request #6995 from akhilerm/retry-on-reset

This commit is contained in:
Samuel Karp
2022-09-28 10:36:55 -07:00
committed by GitHub
4 changed files with 499 additions and 88 deletions

View File

@@ -264,27 +264,20 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
// TODO: Support chunked upload
pr, pw := io.Pipe()
respC := make(chan response, 1)
body := io.NopCloser(pr)
pushw := newPushWriter(p.dockerBase, ref, desc.Digest, p.tracker, isManifest)
req.body = func() (io.ReadCloser, error) {
if body == nil {
return nil, errors.New("cannot reuse body, request must be retried")
}
// Only use the body once since pipe cannot be seeked
ob := body
body = nil
return ob, nil
pr, pw := io.Pipe()
pushw.setPipe(pw)
return io.NopCloser(pr), nil
}
req.size = desc.Size
go func() {
defer close(respC)
resp, err := req.doWithRetries(ctx, nil)
if err != nil {
respC <- response{err: err}
pr.CloseWithError(err)
pushw.setError(err)
pushw.Close()
return
}
@@ -293,20 +286,13 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
default:
err := remoteserrors.NewUnexpectedStatusErr(resp)
log.G(ctx).WithField("resp", resp).WithField("body", string(err.(remoteserrors.ErrUnexpectedStatus).Body)).Debug("unexpected response")
pr.CloseWithError(err)
pushw.setError(err)
pushw.Close()
}
respC <- response{Response: resp}
pushw.setResponse(resp)
}()
return &pushWriter{
base: p.dockerBase,
ref: ref,
pipe: pw,
responseC: respC,
isManifest: isManifest,
expected: desc.Digest,
tracker: p.tracker,
}, nil
return pushw, nil
}
func getManifestPath(object string, dgst digest.Digest) []string {
@@ -328,28 +314,80 @@ func getManifestPath(object string, dgst digest.Digest) []string {
return []string{"manifests", object}
}
type response struct {
*http.Response
err error
}
type pushWriter struct {
base *dockerBase
ref string
pipe *io.PipeWriter
responseC <-chan response
pipe *io.PipeWriter
pipeC chan *io.PipeWriter
respC chan *http.Response
errC chan error
isManifest bool
expected digest.Digest
tracker StatusTracker
}
func newPushWriter(db *dockerBase, ref string, expected digest.Digest, tracker StatusTracker, isManifest bool) *pushWriter {
// Initialize and create response
return &pushWriter{
base: db,
ref: ref,
expected: expected,
tracker: tracker,
pipeC: make(chan *io.PipeWriter, 1),
respC: make(chan *http.Response, 1),
errC: make(chan error, 1),
isManifest: isManifest,
}
}
func (pw *pushWriter) setPipe(p *io.PipeWriter) {
pw.pipeC <- p
}
func (pw *pushWriter) setError(err error) {
pw.errC <- err
}
func (pw *pushWriter) setResponse(resp *http.Response) {
pw.respC <- resp
}
func (pw *pushWriter) Write(p []byte) (n int, err error) {
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return n, err
}
if pw.pipe == nil {
p, ok := <-pw.pipeC
if !ok {
return 0, io.ErrClosedPipe
}
pw.pipe = p
} else {
select {
case p, ok := <-pw.pipeC:
if !ok {
return 0, io.ErrClosedPipe
}
pw.pipe.CloseWithError(content.ErrReset)
pw.pipe = p
// If content has already been written, the bytes
// cannot be written and the caller must reset
if status.Offset > 0 {
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return 0, content.ErrReset
}
default:
}
}
n, err = pw.pipe.Write(p)
status.Offset += int64(n)
status.UpdatedAt = time.Now()
@@ -358,13 +396,26 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
}
func (pw *pushWriter) Close() error {
status, err := pw.tracker.GetStatus(pw.ref)
if err == nil && !status.Committed {
// Closing an incomplete writer. Record this as an error so that following write can retry it.
status.ErrClosed = errors.New("closed incomplete writer")
pw.tracker.SetStatus(pw.ref, status)
// Ensure pipeC is closed but handle `Close()` being
// called multiple times without panicking
select {
case _, ok := <-pw.pipeC:
if ok {
close(pw.pipeC)
}
default:
close(pw.pipeC)
}
return pw.pipe.Close()
if pw.pipe != nil {
status, err := pw.tracker.GetStatus(pw.ref)
if err == nil && !status.Committed {
// Closing an incomplete writer. Record this as an error so that following write can retry it.
status.ErrClosed = errors.New("closed incomplete writer")
pw.tracker.SetStatus(pw.ref, status)
}
return pw.pipe.Close()
}
return nil
}
func (pw *pushWriter) Status() (content.Status, error) {
@@ -391,18 +442,43 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
return err
}
// TODO: timeout waiting for response
resp := <-pw.responseC
if resp.err != nil {
return resp.err
var resp *http.Response
select {
case err := <-pw.errC:
if err != nil {
return err
}
case resp = <-pw.respC:
defer resp.Body.Close()
case p, ok := <-pw.pipeC:
// check whether the pipe has changed in the commit, because sometimes Write
// can complete successfully, but the pipe may have changed. In that case, the
// content needs to be reset.
if !ok {
return io.ErrClosedPipe
}
pw.pipe.CloseWithError(content.ErrReset)
pw.pipe = p
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return err
}
// If content has already been written, the bytes
// cannot be written again and the caller must reset
if status.Offset > 0 {
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return content.ErrReset
}
}
defer resp.Response.Body.Close()
// 201 is specified return status, some registries return
// 200, 202 or 204.
switch resp.StatusCode {
case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted:
default:
return remoteserrors.NewUnexpectedStatusErr(resp.Response)
return remoteserrors.NewUnexpectedStatusErr(resp)
}
status, err := pw.tracker.GetStatus(pw.ref)

View File

@@ -18,6 +18,7 @@ package docker
import (
"context"
"errors"
"fmt"
"io"
"net/http"
@@ -29,8 +30,11 @@ import (
"testing"
"github.com/containerd/containerd/content"
digest "github.com/opencontainers/go-digest"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/remotes"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/stretchr/testify/assert"
)
func TestGetManifestPath(t *testing.T) {
@@ -81,6 +85,60 @@ func TestPusherErrClosedRetry(t *testing.T) {
}
}
// TestPusherErrReset tests the push method if the request needs to be retried
// i.e when ErrReset occurs
func TestPusherErrReset(t *testing.T) {
p, reg, done := samplePusher(t)
defer done()
p.object = "latest@sha256:55d31f3af94c797b65b310569803cacc1c9f4a34bf61afcdc8138f89345c8308"
reg.uploadable = true
reg.putHandlerFunc = func() func(w http.ResponseWriter, r *http.Request) bool {
// sets whether the request should timeout so that a reset error can occur and
// request will be retried
shouldTimeout := true
return func(w http.ResponseWriter, r *http.Request) bool {
if shouldTimeout {
shouldTimeout = !shouldTimeout
w.WriteHeader(http.StatusRequestTimeout)
return true
}
return false
}
}()
ct := []byte("manifest-content")
desc := ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageManifest,
Digest: digest.FromBytes(ct),
Size: int64(len(ct)),
}
w, err := p.push(context.Background(), desc, remotes.MakeRefKey(context.Background(), desc), false)
assert.Equal(t, err, nil, "no error should be there")
w.Write(ct)
pw, _ := w.(*pushWriter)
select {
case p := <-pw.pipeC:
p.Write(ct)
case e := <-pw.errC:
assert.Failf(t, "error: %v while retrying request", e.Error())
}
select {
case resp := <-pw.respC:
assert.Equalf(t, resp.StatusCode, http.StatusCreated,
"201 should be the response code when uploading new content")
case <-pw.errC:
assert.Fail(t, "should not give error")
}
}
func tryUpload(ctx context.Context, t *testing.T, p dockerPusher, layerContent []byte) error {
desc := ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageLayerGzip,
@@ -99,7 +157,9 @@ func tryUpload(ctx context.Context, t *testing.T, p dockerPusher, layerContent [
}
func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, func()) {
reg := &uploadableMockRegistry{}
reg := &uploadableMockRegistry{
availableContents: make([]string, 0),
}
s := httptest.NewServer(reg)
u, err := url.Parse(s.URL)
if err != nil {
@@ -124,40 +184,203 @@ func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, func())
}
var manifestRegexp = regexp.MustCompile(`/([a-z0-9]+)/manifests/(.*)`)
var blobUploadRegexp = regexp.MustCompile(`/([a-z0-9]+)/blobs/uploads/`)
var blobUploadRegexp = regexp.MustCompile(`/([a-z0-9]+)/blobs/uploads/(.*)`)
// uploadableMockRegistry provides minimal registry APIs which are enough to serve requests from dockerPusher.
type uploadableMockRegistry struct {
uploadable bool
availableContents []string
uploadable bool
putHandlerFunc func(w http.ResponseWriter, r *http.Request) bool
}
func (u *uploadableMockRegistry) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" {
if r.Method == http.MethodPut && u.putHandlerFunc != nil {
// if true return the response witout calling default handler
if u.putHandlerFunc(w, r) {
return
}
}
u.defaultHandler(w, r)
}
func (u *uploadableMockRegistry) defaultHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
if matches := blobUploadRegexp.FindStringSubmatch(r.URL.Path); len(matches) != 0 {
if u.uploadable {
w.Header().Set("Location", "/upload")
} else {
w.Header().Set("Location", "/cannotupload")
}
w.WriteHeader(202)
dgstr := digest.Canonical.Digester()
if _, err := io.Copy(dgstr.Hash(), r.Body); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
u.availableContents = append(u.availableContents, dgstr.Digest().String())
w.WriteHeader(http.StatusAccepted)
return
}
} else if r.Method == "PUT" {
} else if r.Method == http.MethodPut {
mfstMatches := manifestRegexp.FindStringSubmatch(r.URL.Path)
if len(mfstMatches) != 0 || strings.HasPrefix(r.URL.Path, "/upload") {
dgstr := digest.Canonical.Digester()
if _, err := io.Copy(dgstr.Hash(), r.Body); err != nil {
w.WriteHeader(500)
w.WriteHeader(http.StatusInternalServerError)
return
}
u.availableContents = append(u.availableContents, dgstr.Digest().String())
w.Header().Set("Docker-Content-Digest", dgstr.Digest().String())
w.WriteHeader(201)
w.WriteHeader(http.StatusCreated)
return
} else if r.URL.Path == "/cannotupload" {
w.WriteHeader(500)
w.WriteHeader(http.StatusInternalServerError)
return
}
} else if r.Method == http.MethodHead {
var content string
// check for both manifest and blob paths
if manifestMatch := manifestRegexp.FindStringSubmatch(r.URL.Path); len(manifestMatch) == 3 {
content = manifestMatch[2]
} else if blobMatch := blobUploadRegexp.FindStringSubmatch(r.URL.Path); len(blobMatch) == 3 {
content = blobMatch[2]
}
// if content is not found or if the path is not manifest or blob
// we return 404
if u.isContentAlreadyExist(content) {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusNotFound)
}
return
}
fmt.Println(r)
w.WriteHeader(404)
w.WriteHeader(http.StatusNotFound)
}
// checks if the content is already present in the registry
func (u *uploadableMockRegistry) isContentAlreadyExist(c string) bool {
for _, ct := range u.availableContents {
if ct == c {
return true
}
}
return false
}
func Test_dockerPusher_push(t *testing.T) {
p, reg, done := samplePusher(t)
defer done()
reg.uploadable = true
manifestContent := []byte("manifest-content")
manifestContentDigest := digest.FromBytes(manifestContent)
layerContent := []byte("layer-content")
layerContentDigest := digest.FromBytes(layerContent)
// using a random object here
baseObject := "latest@sha256:55d31f3af94c797b65b310569803cacc1c9f4a34bf61afcdc8138f89345c8308"
type args struct {
content []byte
mediatype string
ref string
unavailableOnFail bool
}
tests := []struct {
name string
dp dockerPusher
dockerBaseObject string
args args
checkerFunc func(writer pushWriter) bool
wantErr error
}{
{
name: "when a manifest is pushed",
dp: p,
dockerBaseObject: baseObject,
args: args{
content: manifestContent,
mediatype: ocispec.MediaTypeImageManifest,
ref: fmt.Sprintf("manifest-%s", manifestContentDigest.String()),
unavailableOnFail: false,
},
checkerFunc: func(writer pushWriter) bool {
select {
case resp := <-writer.respC:
// 201 should be the response code when uploading a new manifest
return resp.StatusCode == http.StatusCreated
case <-writer.errC:
return false
}
},
wantErr: nil,
},
{
name: "trying to push content that already exists",
dp: p,
dockerBaseObject: baseObject,
args: args{
content: manifestContent,
mediatype: ocispec.MediaTypeImageManifest,
ref: fmt.Sprintf("manifest-%s", manifestContentDigest.String()),
unavailableOnFail: false,
},
wantErr: fmt.Errorf("content %v on remote: %w", digest.FromBytes(manifestContent), errdefs.ErrAlreadyExists),
},
{
name: "trying to push a blob layer",
dp: p,
// Not needed to set the base object as it is used to generate path only in case of manifests
// dockerBaseObject:
args: args{
content: layerContent,
mediatype: ocispec.MediaTypeImageLayer,
ref: fmt.Sprintf("layer-%s", layerContentDigest.String()),
unavailableOnFail: false,
},
checkerFunc: func(writer pushWriter) bool {
select {
case resp := <-writer.respC:
// 201 should be the response code when uploading a new blob
return resp.StatusCode == http.StatusCreated
case <-writer.errC:
return false
}
},
wantErr: nil,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
desc := ocispec.Descriptor{
MediaType: test.args.mediatype,
Digest: digest.FromBytes(test.args.content),
Size: int64(len(test.args.content)),
}
test.dp.object = test.dockerBaseObject
got, err := test.dp.push(context.Background(), desc, test.args.ref, test.args.unavailableOnFail)
assert.Equal(t, test.wantErr, err)
// if an error is expected, further comparisons are not required.
if test.wantErr != nil {
return
}
// write the content to the writer, this will be done when a Read() is called on the body of the request
got.Write(test.args.content)
pw, ok := got.(*pushWriter)
if !ok {
assert.Errorf(t, errors.New("unable to cast content.Writer to pushWriter"), "got %v instead of pushwriter", got)
}
// test whether a proper response has been received after the push operation
assert.True(t, test.checkerFunc(*pw))
})
}
}