proxy: break up writes from the remote writer to avoid grpc limits

The remote content writer proxy already has the capability to break up
large files into multiple writes, but the current API doesn't recognize
when it's about to exceed the limits and attempts to send the data over
grpc in one message instead of breaking it into multiple messages.

This changes the behavior of `Write` to automatically break up the size
of the content based on the max send message size.

Signed-off-by: Jonathan A. Sternberg <jonathan.sternberg@docker.com>
(cherry picked from commit f25f36c334144d87233e06b0de90522ebd97e144)
This commit is contained in:
Jonathan A. Sternberg 2025-02-28 11:29:51 -06:00
parent 67bb32a8b2
commit eaa7ca80dc
No known key found for this signature in database
GPG Key ID: 6603D4B96394F6B1
2 changed files with 41 additions and 20 deletions

View File

@ -26,6 +26,7 @@ import (
digest "github.com/opencontainers/go-digest"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/defaults"
"github.com/containerd/containerd/v2/pkg/protobuf"
)
@ -76,27 +77,37 @@ func (rw *remoteWriter) Digest() digest.Digest {
}
func (rw *remoteWriter) Write(p []byte) (n int, err error) {
offset := rw.offset
const maxBufferSize = defaults.DefaultMaxSendMsgSize >> 1
for i := 0; i < len(p); i += maxBufferSize {
offset := rw.offset
resp, err := rw.send(&contentapi.WriteContentRequest{
Action: contentapi.WriteAction_WRITE,
Offset: offset,
Data: p,
})
if err != nil {
return 0, fmt.Errorf("failed to send write: %w", errgrpc.ToNative(err))
}
end := i + maxBufferSize
if end > len(p) {
end = len(p)
}
data := p[i:end]
n = int(resp.Offset - offset)
if n < len(p) {
err = io.ErrShortWrite
}
resp, err := rw.send(&contentapi.WriteContentRequest{
Action: contentapi.WriteAction_WRITE,
Offset: offset,
Data: data,
})
if err != nil {
return 0, fmt.Errorf("failed to send write: %w", errgrpc.ToNative(err))
}
rw.offset += int64(n)
if resp.Digest != "" {
rw.digest = digest.Digest(resp.Digest)
written := int(resp.Offset - offset)
rw.offset += int64(written)
if resp.Digest != "" {
rw.digest = digest.Digest(resp.Digest)
}
n += written
if written < len(data) {
return n, io.ErrShortWrite
}
}
return
return n, nil
}
func (rw *remoteWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) (err error) {

View File

@ -175,6 +175,13 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store
}
defer w4.Close()
c5, d5 := createContent(16 << 21)
w5, err := content.OpenWriter(ctx, cs, content.WithRef("c5"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c5)), Digest: d5}))
if err != nil {
t.Fatal(err)
}
defer w5.Close()
smallbuf := make([]byte, 32)
for _, s := range []struct {
content []byte
@ -201,6 +208,11 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store
digest: d4,
writer: w4,
},
{
content: c5,
digest: d5,
writer: w5,
},
} {
n, err := io.CopyBuffer(s.writer, bytes.NewReader(s.content), smallbuf)
if err != nil {
@ -647,7 +659,6 @@ func checkLabels(ctx context.Context, t *testing.T, cs content.Store) {
if err := checkInfo(ctx, cs, d1, info, preCommit, postCommit, preUpdate, postUpdate); err != nil {
t.Fatalf("Check info failed: %+v", err)
}
}
func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64, digest.Digest) error) func(ctx context.Context, t *testing.T, cs content.Store) {
@ -734,7 +745,6 @@ func resumeDiscard(ctx context.Context, w content.Writer, b []byte, written, siz
}
if err := w.Commit(ctx, size, dgst); err != nil {
return fmt.Errorf("commit failed: %w", err)
}
return nil
}
@ -931,7 +941,6 @@ func checkCrossNSAppend(ctx context.Context, t *testing.T, cs content.Store) {
if err := checkContent(ctx2, cs, d2, info2, t1, t3, t1, t3); err != nil {
t.Fatal(err)
}
}
func checkCrossNSIsolate(ctx context.Context, t *testing.T, cs content.Store) {
@ -1129,6 +1138,7 @@ func checkInfo(ctx context.Context, cs content.Store, d digest.Digest, expected
return nil
}
func checkContent(ctx context.Context, cs content.Store, d digest.Digest, expected content.Info, c1, c2, u1, u2 time.Time) error {
if err := checkInfo(ctx, cs, d, expected, c1, c2, u1, u2); err != nil {
return err