Merge pull request #1027 from Random-Liu/fix-log-ending-newline

Fix the log ending newline handling.
This commit is contained in:
Lantao Liu 2019-01-24 10:22:03 -08:00 committed by GitHub
commit 97c7a1b17b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 154 additions and 26 deletions

View File

@ -30,6 +30,66 @@ import (
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
)
func TestContainerLogWithoutTailingNewLine(t *testing.T) {
testPodLogDir, err := ioutil.TempDir("/tmp", "container-log-without-tailing-newline")
require.NoError(t, err)
defer os.RemoveAll(testPodLogDir)
t.Log("Create a sandbox with log directory")
sbConfig := PodSandboxConfig("sandbox", "container-log-without-tailing-newline",
WithPodLogDirectory(testPodLogDir),
)
sb, err := runtimeService.RunPodSandbox(sbConfig, *runtimeHandler)
require.NoError(t, err)
defer func() {
assert.NoError(t, runtimeService.StopPodSandbox(sb))
assert.NoError(t, runtimeService.RemovePodSandbox(sb))
}()
const (
testImage = "busybox"
containerName = "test-container"
)
t.Logf("Pull test image %q", testImage)
img, err := imageService.PullImage(&runtime.ImageSpec{Image: testImage}, nil)
require.NoError(t, err)
defer func() {
assert.NoError(t, imageService.RemoveImage(&runtime.ImageSpec{Image: img}))
}()
t.Log("Create a container with log path")
cnConfig := ContainerConfig(
containerName,
testImage,
WithCommand("sh", "-c", "printf abcd"),
WithLogPath(containerName),
)
cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig)
require.NoError(t, err)
t.Log("Start the container")
require.NoError(t, runtimeService.StartContainer(cn))
t.Log("Wait for container to finish running")
require.NoError(t, Eventually(func() (bool, error) {
s, err := runtimeService.ContainerStatus(cn)
if err != nil {
return false, err
}
if s.GetState() == runtime.ContainerState_CONTAINER_EXITED {
return true, nil
}
return false, nil
}, time.Second, 30*time.Second))
t.Log("Check container log")
content, err := ioutil.ReadFile(filepath.Join(testPodLogDir, containerName))
assert.NoError(t, err)
checkContainerLog(t, string(content), []string{
fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagPartial, "abcd"),
})
}
func TestLongContainerLog(t *testing.T) {
testPodLogDir, err := ioutil.TempDir("/tmp", "long-container-log")
require.NoError(t, err)
@ -66,9 +126,9 @@ func TestLongContainerLog(t *testing.T) {
longLineCmd := fmt.Sprintf("i=0; while [ $i -lt %d ]; do printf %s; i=$((i+1)); done", maxSize+1, "c")
cnConfig := ContainerConfig(
containerName,
"busybox",
testImage,
WithCommand("sh", "-c",
fmt.Sprintf("%s; echo; %s; echo; %s", shortLineCmd, maxLenLineCmd, longLineCmd)),
fmt.Sprintf("%s; echo; %s; echo; %s; echo", shortLineCmd, maxLenLineCmd, longLineCmd)),
WithLogPath(containerName),
)
cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig)

View File

@ -112,17 +112,17 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
return "", nil, errors.Wrap(err, "failed to unmarshalany")
}
switch evt.(type) {
switch e := evt.(type) {
case *eventtypes.TaskExit:
id = evt.(*eventtypes.TaskExit).ContainerID
id = e.ContainerID
case *eventtypes.TaskOOM:
id = evt.(*eventtypes.TaskOOM).ContainerID
id = e.ContainerID
case *eventtypes.ImageCreate:
id = evt.(*eventtypes.ImageCreate).Name
id = e.Name
case *eventtypes.ImageUpdate:
id = evt.(*eventtypes.ImageUpdate).Name
id = e.Name
case *eventtypes.ImageDelete:
id = evt.(*eventtypes.ImageDelete).Name
id = e.Name
default:
return "", nil, errors.New("unsupported event")
}
@ -200,9 +200,8 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
ctx, cancel := context.WithTimeout(ctx, handleEventTimeout)
defer cancel()
switch any.(type) {
switch e := any.(type) {
case *eventtypes.TaskExit:
e := any.(*eventtypes.TaskExit)
logrus.Infof("TaskExit event %+v", e)
// Use ID instead of ContainerID to rule out TaskExit event for exec.
cntr, err := em.c.containerStore.Get(e.ID)
@ -226,7 +225,6 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
}
return nil
case *eventtypes.TaskOOM:
e := any.(*eventtypes.TaskOOM)
logrus.Infof("TaskOOM event %+v", e)
// For TaskOOM, we only care which container it belongs to.
cntr, err := em.c.containerStore.Get(e.ContainerID)
@ -244,15 +242,12 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
return errors.Wrap(err, "failed to update container status for TaskOOM event")
}
case *eventtypes.ImageCreate:
e := any.(*eventtypes.ImageCreate)
logrus.Infof("ImageCreate event %+v", e)
return em.c.updateImage(ctx, e.Name)
case *eventtypes.ImageUpdate:
e := any.(*eventtypes.ImageUpdate)
logrus.Infof("ImageUpdate event %+v", e)
return em.c.updateImage(ctx, e.Name)
case *eventtypes.ImageDelete:
e := any.(*eventtypes.ImageDelete)
logrus.Infof("ImageDelete event %+v", e)
return em.c.updateImage(ctx, e.Name)
}

View File

@ -19,6 +19,7 @@ package io
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"time"
@ -61,6 +62,56 @@ func NewCRILogger(path string, w io.Writer, stream StreamType, maxLen int) (io.W
return pwc, stop
}
// bufio.ReadLine in golang eats both read errors and tailing newlines
// (See https://golang.org/pkg/bufio/#Reader.ReadLine). When reading
// to io.EOF, it is impossible for the caller to figure out whether
// there is a newline at the end, for example:
// 1) When reading "CONTENT\n", it returns "CONTENT" without error;
// 2) When reading "CONTENT", it also returns "CONTENT" without error.
//
// To differentiate these 2 cases, we need to write a readLine function
// ourselves to not ignore the error.
//
// The code is similar with https://golang.org/src/bufio/bufio.go?s=9537:9604#L359.
// The only difference is that it returns all errors from `ReadSlice`.
//
// readLine returns err != nil if and only if line does not end with a new line.
func readLine(b *bufio.Reader) (line []byte, isPrefix bool, err error) {
line, err = b.ReadSlice('\n')
if err == bufio.ErrBufferFull {
// Handle the case where "\r\n" straddles the buffer.
if len(line) > 0 && line[len(line)-1] == '\r' {
// Unread the last '\r'
if err := b.UnreadByte(); err != nil {
panic(fmt.Sprintf("invalid unread %v", err))
}
line = line[:len(line)-1]
}
return line, true, nil
}
if len(line) == 0 {
if err != nil {
line = nil
}
return
}
if line[len(line)-1] == '\n' {
// "ReadSlice returns err != nil if and only if line does not end in delim"
// (See https://golang.org/pkg/bufio/#Reader.ReadSlice).
if err != nil {
panic(fmt.Sprintf("full read with unexpected error %v", err))
}
drop := 1
if len(line) > 1 && line[len(line)-2] == '\r' {
drop = 2
}
line = line[:len(line)-drop]
}
return
}
func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxLen int) {
defer rc.Close()
var (
@ -88,7 +139,16 @@ func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxL
}
for {
var stop bool
newLine, isPrefix, err := r.ReadLine()
newLine, isPrefix, err := readLine(r)
// NOTE(random-liu): readLine can return actual content even if there is an error.
if len(newLine) > 0 {
// Buffer returned by ReadLine will change after
// next read, copy it.
l := make([]byte, len(newLine))
copy(l, newLine)
buf = append(buf, l)
length += len(l)
}
if err != nil {
if err == io.EOF {
logrus.Debugf("Getting EOF from stream %q while redirecting to log file %q", s, path)
@ -101,13 +161,6 @@ func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxL
}
// Stop after writing the content left in buffer.
stop = true
} else {
// Buffer returned by ReadLine will change after
// next read, copy it.
l := make([]byte, len(newLine))
copy(l, newLine)
buf = append(buf, l)
length += len(l)
}
if maxLen > 0 && length > maxLen {
exceedLen := length - maxLen
@ -125,7 +178,14 @@ func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxL
if isPrefix {
continue
}
if stop {
// readLine only returns error when the message doesn't
// end with a newline, in that case it should be treated
// as a partial line.
writeLine(partial, bytes.Join(buf, nil))
} else {
writeLine(full, bytes.Join(buf, nil))
}
buf = nil
length = 0
if stop {

View File

@ -72,7 +72,7 @@ func TestRedirectLogs(t *testing.T) {
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
runtime.LogTagPartial,
},
content: []string{
"test stderr log 1",
@ -222,6 +222,19 @@ func TestRedirectLogs(t *testing.T) {
strings.Repeat("a", defaultBufSize*10+20),
},
},
"log length longer than buffer size with tailing \\r\\n": {
input: strings.Repeat("a", defaultBufSize-1) + "\r\n" + strings.Repeat("a", defaultBufSize-1) + "\r\n",
stream: Stdout,
maxLen: -1,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", defaultBufSize-1),
strings.Repeat("a", defaultBufSize-1),
},
},
} {
t.Logf("TestCase %q", desc)
rc := ioutil.NopCloser(strings.NewReader(test.input))

View File

@ -22,7 +22,7 @@ import "strings"
// Comparison is case insensitive.
func InStringSlice(ss []string, str string) bool {
for _, s := range ss {
if strings.ToLower(s) == strings.ToLower(str) {
if strings.EqualFold(s, str) {
return true
}
}
@ -34,7 +34,7 @@ func InStringSlice(ss []string, str string) bool {
func SubtractStringSlice(ss []string, str string) []string {
var res []string
for _, s := range ss {
if strings.ToLower(s) == strings.ToLower(str) {
if strings.EqualFold(s, str) {
continue
}
res = append(res, s)