Handle large output in v2 shim with TTY

Reized the I/O buffers to align with the size of the kernel buffers with fifos
and move the close aspect of the console to key off of the stdin closing.

Fixes #3738

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2019-10-11 12:16:55 -04:00
parent 38937f0607
commit f8cca26f3c
5 changed files with 114 additions and 7 deletions

View File

@ -1578,3 +1578,105 @@ func TestShimSockLength(t *testing.T) {
<-statusC
}
func TestContainerExecLargeOutputWithTTY(t *testing.T) {
t.Parallel()
client, err := newClient(t, address)
if err != nil {
t.Fatal(err)
}
defer client.Close()
var (
image Image
ctx, cancel = testContext(t)
id = t.Name()
)
defer cancel()
image, err = client.GetImage(ctx, testImage)
if err != nil {
t.Fatal(err)
}
container, err := client.NewContainer(ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withProcessArgs("sleep", "999")))
if err != nil {
t.Fatal(err)
}
defer container.Delete(ctx, WithSnapshotCleanup)
task, err := container.NewTask(ctx, empty())
if err != nil {
t.Fatal(err)
}
defer task.Delete(ctx)
finishedC, err := task.Wait(ctx)
if err != nil {
t.Fatal(err)
}
if err := task.Start(ctx); err != nil {
t.Fatal(err)
}
for i := 0; i < 100; i++ {
spec, err := container.Spec(ctx)
if err != nil {
t.Fatal(err)
}
// start an exec process without running the original container process info
processSpec := spec.Process
withExecArgs(processSpec, "sh", "-c", `seq -s " " 1000000`)
stdout := bytes.NewBuffer(nil)
execID := t.Name() + "_exec"
process, err := task.Exec(ctx, execID, processSpec, cio.NewCreator(withByteBuffers(stdout), withProcessTTY()))
if err != nil {
t.Fatal(err)
}
processStatusC, err := process.Wait(ctx)
if err != nil {
t.Fatal(err)
}
if err := process.Start(ctx); err != nil {
t.Fatal(err)
}
// wait for the exec to return
status := <-processStatusC
code, _, err := status.Result()
if err != nil {
t.Fatal(err)
}
if code != 0 {
t.Errorf("expected exec exit code 0 but received %d", code)
}
if _, err := process.Delete(ctx); err != nil {
t.Fatal(err)
}
const expectedSuffix = "999999 1000000"
stdoutString := stdout.String()
if !strings.Contains(stdoutString, expectedSuffix) {
t.Fatalf("process output does not end with %q at iteration %d, here are the last 20 characters of the output:\n\n %q", expectedSuffix, i, stdoutString[len(stdoutString)-20:])
}
}
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
t.Error(err)
}
<-finishedC
}
func withProcessTTY() cio.Opt {
return func(opt *cio.Streams) {
cio.WithTerminal(opt)
}
}

View File

@ -40,7 +40,9 @@ import (
var bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
// setting to 4096 to align with PIPE_BUF
// http://man7.org/linux/man-pages/man7/pipe.7.html
buffer := make([]byte, 4096)
return &buffer
},
}

View File

@ -55,7 +55,7 @@ var (
empty = &ptypes.Empty{}
bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
buffer := make([]byte, 4096)
return &buffer
},
}

View File

@ -55,6 +55,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
io.CopyBuffer(epollConsole, in, *bp)
// we need to shutdown epollConsole when pipe broken
epollConsole.Shutdown(p.epoller.CloseConsole)
epollConsole.Close()
}()
}
@ -73,9 +74,8 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(outw, epollConsole, *p)
epollConsole.Close()
outr.Close()
outw.Close()
outr.Close()
wg.Done()
}()
cwg.Wait()

View File

@ -32,7 +32,9 @@ import (
var bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 32<<10)
// setting to 4096 to align with PIPE_BUF
// http://man7.org/linux/man-pages/man7/pipe.7.html
buffer := make([]byte, 4096)
return &buffer
},
}
@ -77,6 +79,7 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
io.CopyBuffer(epollConsole, in, *bp)
// we need to shutdown epollConsole when pipe broken
epollConsole.Shutdown(p.epoller.CloseConsole)
epollConsole.Close()
}()
}
@ -95,9 +98,9 @@ func (p *linuxPlatform) CopyConsole(ctx context.Context, console console.Console
buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)
io.CopyBuffer(outw, epollConsole, *buf)
epollConsole.Close()
outr.Close()
outw.Close()
outr.Close()
wg.Done()
}()
cwg.Wait()