Fix the log ending newline handling.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu 2019-01-23 16:00:26 -08:00
parent 8976690320
commit 50ac40097e
2 changed files with 83 additions and 10 deletions

View File

@ -19,6 +19,7 @@ package io
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"time" "time"
@ -61,6 +62,56 @@ func NewCRILogger(path string, w io.Writer, stream StreamType, maxLen int) (io.W
return pwc, stop return pwc, stop
} }
// bufio.ReadLine in golang eats both read errors and tailing newlines
// (See https://golang.org/pkg/bufio/#Reader.ReadLine). When reading
// to io.EOF, it is impossible for the caller to figure out whether
// there is a newline at the end, for example:
// 1) When reading "CONTENT\n", it returns "CONTENT" without error;
// 2) When reading "CONTENT", it also returns "CONTENT" without error.
//
// To differentiate these 2 cases, we need to write a readLine function
// ourselves to not ignore the error.
//
// The code is similar with https://golang.org/src/bufio/bufio.go?s=9537:9604#L359.
// The only difference is that it returns all errors from `ReadSlice`.
//
// readLine returns err != nil if and only if line does not end with a new line.
func readLine(b *bufio.Reader) (line []byte, isPrefix bool, err error) {
line, err = b.ReadSlice('\n')
if err == bufio.ErrBufferFull {
// Handle the case where "\r\n" straddles the buffer.
if len(line) > 0 && line[len(line)-1] == '\r' {
// Unread the last '\r'
if err := b.UnreadByte(); err != nil {
panic(fmt.Sprintf("invalid unread %v", err))
}
line = line[:len(line)-1]
}
return line, true, nil
}
if len(line) == 0 {
if err != nil {
line = nil
}
return
}
if line[len(line)-1] == '\n' {
// "ReadSlice returns err != nil if and only if line does not end in delim"
// (See https://golang.org/pkg/bufio/#Reader.ReadSlice).
if err != nil {
panic(fmt.Sprintf("full read with unexpected error %v", err))
}
drop := 1
if len(line) > 1 && line[len(line)-2] == '\r' {
drop = 2
}
line = line[:len(line)-drop]
}
return
}
func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxLen int) { func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxLen int) {
defer rc.Close() defer rc.Close()
var ( var (
@ -88,7 +139,16 @@ func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxL
} }
for { for {
var stop bool var stop bool
newLine, isPrefix, err := r.ReadLine() newLine, isPrefix, err := readLine(r)
// NOTE(random-liu): readLine can return actual content even if there is an error.
if len(newLine) > 0 {
// Buffer returned by ReadLine will change after
// next read, copy it.
l := make([]byte, len(newLine))
copy(l, newLine)
buf = append(buf, l)
length += len(l)
}
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
logrus.Debugf("Getting EOF from stream %q while redirecting to log file %q", s, path) logrus.Debugf("Getting EOF from stream %q while redirecting to log file %q", s, path)
@ -101,13 +161,6 @@ func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxL
} }
// Stop after writing the content left in buffer. // Stop after writing the content left in buffer.
stop = true stop = true
} else {
// Buffer returned by ReadLine will change after
// next read, copy it.
l := make([]byte, len(newLine))
copy(l, newLine)
buf = append(buf, l)
length += len(l)
} }
if maxLen > 0 && length > maxLen { if maxLen > 0 && length > maxLen {
exceedLen := length - maxLen exceedLen := length - maxLen
@ -125,7 +178,14 @@ func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxL
if isPrefix { if isPrefix {
continue continue
} }
writeLine(full, bytes.Join(buf, nil)) if stop {
// readLine only returns error when the message doesn't
// end with a newline, in that case it should be treated
// as a partial line.
writeLine(partial, bytes.Join(buf, nil))
} else {
writeLine(full, bytes.Join(buf, nil))
}
buf = nil buf = nil
length = 0 length = 0
if stop { if stop {

View File

@ -72,7 +72,7 @@ func TestRedirectLogs(t *testing.T) {
maxLen: maxLen, maxLen: maxLen,
tag: []runtime.LogTag{ tag: []runtime.LogTag{
runtime.LogTagFull, runtime.LogTagFull,
runtime.LogTagFull, runtime.LogTagPartial,
}, },
content: []string{ content: []string{
"test stderr log 1", "test stderr log 1",
@ -222,6 +222,19 @@ func TestRedirectLogs(t *testing.T) {
strings.Repeat("a", defaultBufSize*10+20), strings.Repeat("a", defaultBufSize*10+20),
}, },
}, },
"log length longer than buffer size with tailing \\r\\n": {
input: strings.Repeat("a", defaultBufSize-1) + "\r\n" + strings.Repeat("a", defaultBufSize-1) + "\r\n",
stream: Stdout,
maxLen: -1,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", defaultBufSize-1),
strings.Repeat("a", defaultBufSize-1),
},
},
} { } {
t.Logf("TestCase %q", desc) t.Logf("TestCase %q", desc)
rc := ioutil.NopCloser(strings.NewReader(test.input)) rc := ioutil.NopCloser(strings.NewReader(test.input))