diff --git a/core/content/proxy/content_writer.go b/core/content/proxy/content_writer.go index 214a0a335..98818f234 100644 --- a/core/content/proxy/content_writer.go +++ b/core/content/proxy/content_writer.go @@ -26,6 +26,7 @@ import ( digest "github.com/opencontainers/go-digest" "github.com/containerd/containerd/v2/core/content" + "github.com/containerd/containerd/v2/defaults" "github.com/containerd/containerd/v2/pkg/protobuf" ) @@ -76,27 +77,37 @@ func (rw *remoteWriter) Digest() digest.Digest { } func (rw *remoteWriter) Write(p []byte) (n int, err error) { - offset := rw.offset + const maxBufferSize = defaults.DefaultMaxSendMsgSize >> 1 + for i := 0; i < len(p); i += maxBufferSize { + offset := rw.offset - resp, err := rw.send(&contentapi.WriteContentRequest{ - Action: contentapi.WriteAction_WRITE, - Offset: offset, - Data: p, - }) - if err != nil { - return 0, fmt.Errorf("failed to send write: %w", errgrpc.ToNative(err)) - } + end := i + maxBufferSize + if end > len(p) { + end = len(p) + } + data := p[i:end] - n = int(resp.Offset - offset) - if n < len(p) { - err = io.ErrShortWrite - } + resp, err := rw.send(&contentapi.WriteContentRequest{ + Action: contentapi.WriteAction_WRITE, + Offset: offset, + Data: data, + }) + if err != nil { + return 0, fmt.Errorf("failed to send write: %w", errgrpc.ToNative(err)) + } - rw.offset += int64(n) - if resp.Digest != "" { - rw.digest = digest.Digest(resp.Digest) + written := int(resp.Offset - offset) + rw.offset += int64(written) + if resp.Digest != "" { + rw.digest = digest.Digest(resp.Digest) + } + n += written + + if written < len(data) { + return n, io.ErrShortWrite + } } - return + return n, nil } func (rw *remoteWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) (err error) { diff --git a/core/content/testsuite/testsuite.go b/core/content/testsuite/testsuite.go index 3221a355d..fc3e2a500 100644 --- a/core/content/testsuite/testsuite.go +++ b/core/content/testsuite/testsuite.go @@ -175,6 +175,13 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store } defer w4.Close() + c5, d5 := createContent(16 << 21) + w5, err := content.OpenWriter(ctx, cs, content.WithRef("c5"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c5)), Digest: d5})) + if err != nil { + t.Fatal(err) + } + defer w5.Close() + smallbuf := make([]byte, 32) for _, s := range []struct { content []byte @@ -201,6 +208,11 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store digest: d4, writer: w4, }, + { + content: c5, + digest: d5, + writer: w5, + }, } { n, err := io.CopyBuffer(s.writer, bytes.NewReader(s.content), smallbuf) if err != nil { @@ -647,7 +659,6 @@ func checkLabels(ctx context.Context, t *testing.T, cs content.Store) { if err := checkInfo(ctx, cs, d1, info, preCommit, postCommit, preUpdate, postUpdate); err != nil { t.Fatalf("Check info failed: %+v", err) } - } func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64, digest.Digest) error) func(ctx context.Context, t *testing.T, cs content.Store) { @@ -734,7 +745,6 @@ func resumeDiscard(ctx context.Context, w content.Writer, b []byte, written, siz } if err := w.Commit(ctx, size, dgst); err != nil { return fmt.Errorf("commit failed: %w", err) - } return nil } @@ -931,7 +941,6 @@ func checkCrossNSAppend(ctx context.Context, t *testing.T, cs content.Store) { if err := checkContent(ctx2, cs, d2, info2, t1, t3, t1, t3); err != nil { t.Fatal(err) } - } func checkCrossNSIsolate(ctx context.Context, t *testing.T, cs content.Store) { @@ -1129,6 +1138,7 @@ func checkInfo(ctx context.Context, cs content.Store, d digest.Digest, expected return nil } + func checkContent(ctx context.Context, cs content.Store, d digest.Digest, expected content.Info, c1, c2, u1, u2 time.Time) error { if err := checkInfo(ctx, cs, d, expected, c1, c2, u1, u2); err != nil { return err