From 49ca87d7270091b8193301dc2f6759e9aa7c97b1 Mon Sep 17 00:00:00 2001 From: Kazuyoshi Kato Date: Mon, 16 May 2022 17:33:33 +0000 Subject: [PATCH 1/2] Limit the response size of ExecSync Signed-off-by: Kazuyoshi Kato --- pkg/cri/server/container_execsync.go | 39 +++++++++++++++++- pkg/cri/server/container_execsync_test.go | 49 +++++++++++++++++++++++ 2 files changed, 86 insertions(+), 2 deletions(-) create mode 100644 pkg/cri/server/container_execsync_test.go diff --git a/pkg/cri/server/container_execsync.go b/pkg/cri/server/container_execsync.go index 3bf8d85c0..15221f9bd 100644 --- a/pkg/cri/server/container_execsync.go +++ b/pkg/cri/server/container_execsync.go @@ -19,6 +19,7 @@ package server import ( "bytes" "context" + "errors" "fmt" "io" "syscall" @@ -38,14 +39,48 @@ import ( cioutil "github.com/containerd/containerd/pkg/ioutil" ) +type cappedWriter struct { + w io.WriteCloser + remain int +} + +var errNoRemain = errors.New("no more space to write") + +func (cw *cappedWriter) Write(p []byte) (int, error) { + if cw.remain <= 0 { + return 0, errNoRemain + } + + end := cw.remain + if end > len(p) { + end = len(p) + } + written, err := cw.w.Write(p[0:end]) + cw.remain -= written + + if err != nil { + return written, err + } + if written < len(p) { + return written, errNoRemain + } + return written, nil +} + +func (cw *cappedWriter) Close() error { + return cw.w.Close() +} + // ExecSync executes a command in the container, and returns the stdout output. // If command exits with a non-zero exit code, an error is returned. func (c *criService) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) { + const maxStreamSize = 1024 * 1024 * 16 + var stdout, stderr bytes.Buffer exitCode, err := c.execInContainer(ctx, r.GetContainerId(), execOptions{ cmd: r.GetCmd(), - stdout: cioutil.NewNopWriteCloser(&stdout), - stderr: cioutil.NewNopWriteCloser(&stderr), + stdout: &cappedWriter{w: cioutil.NewNopWriteCloser(&stdout), remain: maxStreamSize}, + stderr: &cappedWriter{w: cioutil.NewNopWriteCloser(&stderr), remain: maxStreamSize}, timeout: time.Duration(r.GetTimeout()) * time.Second, }) if err != nil { diff --git a/pkg/cri/server/container_execsync_test.go b/pkg/cri/server/container_execsync_test.go new file mode 100644 index 000000000..18856b8fc --- /dev/null +++ b/pkg/cri/server/container_execsync_test.go @@ -0,0 +1,49 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package server + +import ( + "bytes" + "testing" + + cioutil "github.com/containerd/containerd/pkg/ioutil" + "github.com/stretchr/testify/assert" +) + +func TestCWWrite(t *testing.T) { + var buf bytes.Buffer + cw := &cappedWriter{w: cioutil.NewNopWriteCloser(&buf), remain: 10} + + n, err := cw.Write([]byte("hello")) + assert.NoError(t, err) + assert.Equal(t, 5, n) + + n, err = cw.Write([]byte("helloworld")) + assert.Equal(t, []byte("hellohello"), buf.Bytes(), "partial write") + assert.Equal(t, 5, n) + assert.ErrorIs(t, err, errNoRemain) + + _, err = cw.Write([]byte("world")) + assert.ErrorIs(t, err, errNoRemain) +} + +func TestCWClose(t *testing.T) { + var buf bytes.Buffer + cw := &cappedWriter{w: cioutil.NewNopWriteCloser(&buf), remain: 5} + err := cw.Close() + assert.NoError(t, err) +} From 40aa4f3f1bd7bd72839fa05d1ff61d58cba4430b Mon Sep 17 00:00:00 2001 From: Kazuyoshi Kato Date: Thu, 2 Jun 2022 03:53:11 +0000 Subject: [PATCH 2/2] Implicitly discard the input to drain the reader Signed-off-by: Derek McGowan --- pkg/cri/server/container_execsync.go | 26 ++++++++++++++--------- pkg/cri/server/container_execsync_test.go | 11 ++++++---- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/pkg/cri/server/container_execsync.go b/pkg/cri/server/container_execsync.go index 15221f9bd..1cb344435 100644 --- a/pkg/cri/server/container_execsync.go +++ b/pkg/cri/server/container_execsync.go @@ -19,7 +19,6 @@ package server import ( "bytes" "context" - "errors" "fmt" "io" "syscall" @@ -44,11 +43,9 @@ type cappedWriter struct { remain int } -var errNoRemain = errors.New("no more space to write") - func (cw *cappedWriter) Write(p []byte) (int, error) { if cw.remain <= 0 { - return 0, errNoRemain + return len(p), nil } end := cw.remain @@ -61,26 +58,35 @@ func (cw *cappedWriter) Write(p []byte) (int, error) { if err != nil { return written, err } - if written < len(p) { - return written, errNoRemain - } - return written, nil + return len(p), nil } func (cw *cappedWriter) Close() error { return cw.w.Close() } +func (cw *cappedWriter) isFull() bool { + return cw.remain <= 0 +} + // ExecSync executes a command in the container, and returns the stdout output. // If command exits with a non-zero exit code, an error is returned. func (c *criService) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) { const maxStreamSize = 1024 * 1024 * 16 var stdout, stderr bytes.Buffer + + // cappedWriter truncates the output. In that case, the size of + // the ExecSyncResponse will hit the CRI plugin's gRPC response limit. + // Thus the callers outside of the containerd process (e.g. Kubelet) never see + // the truncated output. + cout := &cappedWriter{w: cioutil.NewNopWriteCloser(&stdout), remain: maxStreamSize} + cerr := &cappedWriter{w: cioutil.NewNopWriteCloser(&stderr), remain: maxStreamSize} + exitCode, err := c.execInContainer(ctx, r.GetContainerId(), execOptions{ cmd: r.GetCmd(), - stdout: &cappedWriter{w: cioutil.NewNopWriteCloser(&stdout), remain: maxStreamSize}, - stderr: &cappedWriter{w: cioutil.NewNopWriteCloser(&stderr), remain: maxStreamSize}, + stdout: cout, + stderr: cerr, timeout: time.Duration(r.GetTimeout()) * time.Second, }) if err != nil { diff --git a/pkg/cri/server/container_execsync_test.go b/pkg/cri/server/container_execsync_test.go index 18856b8fc..989909649 100644 --- a/pkg/cri/server/container_execsync_test.go +++ b/pkg/cri/server/container_execsync_test.go @@ -33,12 +33,15 @@ func TestCWWrite(t *testing.T) { assert.Equal(t, 5, n) n, err = cw.Write([]byte("helloworld")) - assert.Equal(t, []byte("hellohello"), buf.Bytes(), "partial write") - assert.Equal(t, 5, n) - assert.ErrorIs(t, err, errNoRemain) + assert.NoError(t, err, "no errors even it hits the cap") + assert.Equal(t, 10, n, "no indication of partial write") + assert.True(t, cw.isFull()) + assert.Equal(t, []byte("hellohello"), buf.Bytes(), "the underlying writer is capped") _, err = cw.Write([]byte("world")) - assert.ErrorIs(t, err, errNoRemain) + assert.NoError(t, err) + assert.True(t, cw.isFull()) + assert.Equal(t, []byte("hellohello"), buf.Bytes(), "the underlying writer is capped") } func TestCWClose(t *testing.T) {