From eaa7ca80dcc1ea3e3dffe1382d96d77377720c30 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Fri, 28 Feb 2025 11:29:51 -0600 Subject: [PATCH] proxy: break up writes from the remote writer to avoid grpc limits The remote content writer proxy already has the capability to break up large files into multiple writes, but the current API doesn't recognize when it's about to exceed the limits and attempts to send the data over grpc in one message instead of breaking it into multiple messages. This changes the behavior of `Write` to automatically break up the size of the content based on the max send message size. Signed-off-by: Jonathan A. Sternberg (cherry picked from commit f25f36c334144d87233e06b0de90522ebd97e144) --- core/content/proxy/content_writer.go | 45 +++++++++++++++++----------- core/content/testsuite/testsuite.go | 16 ++++++++-- 2 files changed, 41 insertions(+), 20 deletions(-) diff --git a/core/content/proxy/content_writer.go b/core/content/proxy/content_writer.go index 214a0a335..98818f234 100644 --- a/core/content/proxy/content_writer.go +++ b/core/content/proxy/content_writer.go @@ -26,6 +26,7 @@ import ( digest "github.com/opencontainers/go-digest" "github.com/containerd/containerd/v2/core/content" + "github.com/containerd/containerd/v2/defaults" "github.com/containerd/containerd/v2/pkg/protobuf" ) @@ -76,27 +77,37 @@ func (rw *remoteWriter) Digest() digest.Digest { } func (rw *remoteWriter) Write(p []byte) (n int, err error) { - offset := rw.offset + const maxBufferSize = defaults.DefaultMaxSendMsgSize >> 1 + for i := 0; i < len(p); i += maxBufferSize { + offset := rw.offset - resp, err := rw.send(&contentapi.WriteContentRequest{ - Action: contentapi.WriteAction_WRITE, - Offset: offset, - Data: p, - }) - if err != nil { - return 0, fmt.Errorf("failed to send write: %w", errgrpc.ToNative(err)) - } + end := i + maxBufferSize + if end > len(p) { + end = len(p) + } + data := p[i:end] - n = int(resp.Offset - offset) - if n < len(p) { - err = io.ErrShortWrite - } + resp, err := rw.send(&contentapi.WriteContentRequest{ + Action: contentapi.WriteAction_WRITE, + Offset: offset, + Data: data, + }) + if err != nil { + return 0, fmt.Errorf("failed to send write: %w", errgrpc.ToNative(err)) + } - rw.offset += int64(n) - if resp.Digest != "" { - rw.digest = digest.Digest(resp.Digest) + written := int(resp.Offset - offset) + rw.offset += int64(written) + if resp.Digest != "" { + rw.digest = digest.Digest(resp.Digest) + } + n += written + + if written < len(data) { + return n, io.ErrShortWrite + } } - return + return n, nil } func (rw *remoteWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) (err error) { diff --git a/core/content/testsuite/testsuite.go b/core/content/testsuite/testsuite.go index 3221a355d..fc3e2a500 100644 --- a/core/content/testsuite/testsuite.go +++ b/core/content/testsuite/testsuite.go @@ -175,6 +175,13 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store } defer w4.Close() + c5, d5 := createContent(16 << 21) + w5, err := content.OpenWriter(ctx, cs, content.WithRef("c5"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c5)), Digest: d5})) + if err != nil { + t.Fatal(err) + } + defer w5.Close() + smallbuf := make([]byte, 32) for _, s := range []struct { content []byte @@ -201,6 +208,11 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store digest: d4, writer: w4, }, + { + content: c5, + digest: d5, + writer: w5, + }, } { n, err := io.CopyBuffer(s.writer, bytes.NewReader(s.content), smallbuf) if err != nil { @@ -647,7 +659,6 @@ func checkLabels(ctx context.Context, t *testing.T, cs content.Store) { if err := checkInfo(ctx, cs, d1, info, preCommit, postCommit, preUpdate, postUpdate); err != nil { t.Fatalf("Check info failed: %+v", err) } - } func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64, digest.Digest) error) func(ctx context.Context, t *testing.T, cs content.Store) { @@ -734,7 +745,6 @@ func resumeDiscard(ctx context.Context, w content.Writer, b []byte, written, siz } if err := w.Commit(ctx, size, dgst); err != nil { return fmt.Errorf("commit failed: %w", err) - } return nil } @@ -931,7 +941,6 @@ func checkCrossNSAppend(ctx context.Context, t *testing.T, cs content.Store) { if err := checkContent(ctx2, cs, d2, info2, t1, t3, t1, t3); err != nil { t.Fatal(err) } - } func checkCrossNSIsolate(ctx context.Context, t *testing.T, cs content.Store) { @@ -1129,6 +1138,7 @@ func checkInfo(ctx context.Context, cs content.Store, d digest.Digest, expected return nil } + func checkContent(ctx context.Context, cs content.Store, d digest.Digest, expected content.Info, c1, c2, u1, u2 time.Time) error { if err := checkInfo(ctx, cs, d, expected, c1, c2, u1, u2); err != nil { return err