Merge pull request #11457 from jsternberg/backport-remote-write-grpc-limits-exceeded
[release/2.0] Update remote content to break up writes to avoid grpc message size limits
This commit is contained in:
		| @@ -26,6 +26,7 @@ import ( | ||||
| 	digest "github.com/opencontainers/go-digest" | ||||
|  | ||||
| 	"github.com/containerd/containerd/v2/core/content" | ||||
| 	"github.com/containerd/containerd/v2/defaults" | ||||
| 	"github.com/containerd/containerd/v2/pkg/protobuf" | ||||
| ) | ||||
|  | ||||
| @@ -76,27 +77,37 @@ func (rw *remoteWriter) Digest() digest.Digest { | ||||
| } | ||||
|  | ||||
| func (rw *remoteWriter) Write(p []byte) (n int, err error) { | ||||
| 	offset := rw.offset | ||||
| 	const maxBufferSize = defaults.DefaultMaxSendMsgSize >> 1 | ||||
| 	for i := 0; i < len(p); i += maxBufferSize { | ||||
| 		offset := rw.offset | ||||
|  | ||||
| 	resp, err := rw.send(&contentapi.WriteContentRequest{ | ||||
| 		Action: contentapi.WriteAction_WRITE, | ||||
| 		Offset: offset, | ||||
| 		Data:   p, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return 0, fmt.Errorf("failed to send write: %w", errgrpc.ToNative(err)) | ||||
| 	} | ||||
| 		end := i + maxBufferSize | ||||
| 		if end > len(p) { | ||||
| 			end = len(p) | ||||
| 		} | ||||
| 		data := p[i:end] | ||||
|  | ||||
| 	n = int(resp.Offset - offset) | ||||
| 	if n < len(p) { | ||||
| 		err = io.ErrShortWrite | ||||
| 	} | ||||
| 		resp, err := rw.send(&contentapi.WriteContentRequest{ | ||||
| 			Action: contentapi.WriteAction_WRITE, | ||||
| 			Offset: offset, | ||||
| 			Data:   data, | ||||
| 		}) | ||||
| 		if err != nil { | ||||
| 			return 0, fmt.Errorf("failed to send write: %w", errgrpc.ToNative(err)) | ||||
| 		} | ||||
|  | ||||
| 	rw.offset += int64(n) | ||||
| 	if resp.Digest != "" { | ||||
| 		rw.digest = digest.Digest(resp.Digest) | ||||
| 		written := int(resp.Offset - offset) | ||||
| 		rw.offset += int64(written) | ||||
| 		if resp.Digest != "" { | ||||
| 			rw.digest = digest.Digest(resp.Digest) | ||||
| 		} | ||||
| 		n += written | ||||
|  | ||||
| 		if written < len(data) { | ||||
| 			return n, io.ErrShortWrite | ||||
| 		} | ||||
| 	} | ||||
| 	return | ||||
| 	return n, nil | ||||
| } | ||||
|  | ||||
| func (rw *remoteWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) (err error) { | ||||
|   | ||||
| @@ -175,6 +175,13 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store | ||||
| 	} | ||||
| 	defer w4.Close() | ||||
|  | ||||
| 	c5, d5 := createContent(16 << 21) | ||||
| 	w5, err := content.OpenWriter(ctx, cs, content.WithRef("c5"), content.WithDescriptor(ocispec.Descriptor{Size: int64(len(c5)), Digest: d5})) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	defer w5.Close() | ||||
|  | ||||
| 	smallbuf := make([]byte, 32) | ||||
| 	for _, s := range []struct { | ||||
| 		content []byte | ||||
| @@ -201,6 +208,11 @@ func checkContentStoreWriter(ctx context.Context, t *testing.T, cs content.Store | ||||
| 			digest:  d4, | ||||
| 			writer:  w4, | ||||
| 		}, | ||||
| 		{ | ||||
| 			content: c5, | ||||
| 			digest:  d5, | ||||
| 			writer:  w5, | ||||
| 		}, | ||||
| 	} { | ||||
| 		n, err := io.CopyBuffer(s.writer, bytes.NewReader(s.content), smallbuf) | ||||
| 		if err != nil { | ||||
| @@ -647,7 +659,6 @@ func checkLabels(ctx context.Context, t *testing.T, cs content.Store) { | ||||
| 	if err := checkInfo(ctx, cs, d1, info, preCommit, postCommit, preUpdate, postUpdate); err != nil { | ||||
| 		t.Fatalf("Check info failed: %+v", err) | ||||
| 	} | ||||
|  | ||||
| } | ||||
|  | ||||
| func checkResume(rf func(context.Context, content.Writer, []byte, int64, int64, digest.Digest) error) func(ctx context.Context, t *testing.T, cs content.Store) { | ||||
| @@ -734,7 +745,6 @@ func resumeDiscard(ctx context.Context, w content.Writer, b []byte, written, siz | ||||
| 	} | ||||
| 	if err := w.Commit(ctx, size, dgst); err != nil { | ||||
| 		return fmt.Errorf("commit failed: %w", err) | ||||
|  | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| @@ -931,7 +941,6 @@ func checkCrossNSAppend(ctx context.Context, t *testing.T, cs content.Store) { | ||||
| 	if err := checkContent(ctx2, cs, d2, info2, t1, t3, t1, t3); err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
|  | ||||
| } | ||||
|  | ||||
| func checkCrossNSIsolate(ctx context.Context, t *testing.T, cs content.Store) { | ||||
| @@ -1129,6 +1138,7 @@ func checkInfo(ctx context.Context, cs content.Store, d digest.Digest, expected | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func checkContent(ctx context.Context, cs content.Store, d digest.Digest, expected content.Info, c1, c2, u1, u2 time.Time) error { | ||||
| 	if err := checkInfo(ctx, cs, d, expected, c1, c2, u1, u2); err != nil { | ||||
| 		return err | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Derek McGowan
					Derek McGowan