diff --git a/integration/container_log_test.go b/integration/container_log_test.go index 4f62298aa..27f6dbce2 100644 --- a/integration/container_log_test.go +++ b/integration/container_log_test.go @@ -30,6 +30,66 @@ import ( runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" ) +func TestContainerLogWithoutTailingNewLine(t *testing.T) { + testPodLogDir, err := ioutil.TempDir("/tmp", "container-log-without-tailing-newline") + require.NoError(t, err) + defer os.RemoveAll(testPodLogDir) + + t.Log("Create a sandbox with log directory") + sbConfig := PodSandboxConfig("sandbox", "container-log-without-tailing-newline", + WithPodLogDirectory(testPodLogDir), + ) + sb, err := runtimeService.RunPodSandbox(sbConfig, *runtimeHandler) + require.NoError(t, err) + defer func() { + assert.NoError(t, runtimeService.StopPodSandbox(sb)) + assert.NoError(t, runtimeService.RemovePodSandbox(sb)) + }() + + const ( + testImage = "busybox" + containerName = "test-container" + ) + t.Logf("Pull test image %q", testImage) + img, err := imageService.PullImage(&runtime.ImageSpec{Image: testImage}, nil) + require.NoError(t, err) + defer func() { + assert.NoError(t, imageService.RemoveImage(&runtime.ImageSpec{Image: img})) + }() + + t.Log("Create a container with log path") + cnConfig := ContainerConfig( + containerName, + testImage, + WithCommand("sh", "-c", "printf abcd"), + WithLogPath(containerName), + ) + cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig) + require.NoError(t, err) + + t.Log("Start the container") + require.NoError(t, runtimeService.StartContainer(cn)) + + t.Log("Wait for container to finish running") + require.NoError(t, Eventually(func() (bool, error) { + s, err := runtimeService.ContainerStatus(cn) + if err != nil { + return false, err + } + if s.GetState() == runtime.ContainerState_CONTAINER_EXITED { + return true, nil + } + return false, nil + }, time.Second, 30*time.Second)) + + t.Log("Check container log") + content, err := ioutil.ReadFile(filepath.Join(testPodLogDir, containerName)) + assert.NoError(t, err) + checkContainerLog(t, string(content), []string{ + fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagPartial, "abcd"), + }) +} + func TestLongContainerLog(t *testing.T) { testPodLogDir, err := ioutil.TempDir("/tmp", "long-container-log") require.NoError(t, err) @@ -66,9 +126,9 @@ func TestLongContainerLog(t *testing.T) { longLineCmd := fmt.Sprintf("i=0; while [ $i -lt %d ]; do printf %s; i=$((i+1)); done", maxSize+1, "c") cnConfig := ContainerConfig( containerName, - "busybox", + testImage, WithCommand("sh", "-c", - fmt.Sprintf("%s; echo; %s; echo; %s", shortLineCmd, maxLenLineCmd, longLineCmd)), + fmt.Sprintf("%s; echo; %s; echo; %s; echo", shortLineCmd, maxLenLineCmd, longLineCmd)), WithLogPath(containerName), ) cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig) diff --git a/pkg/server/events.go b/pkg/server/events.go index b3a8609be..87169b544 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -112,17 +112,17 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) { return "", nil, errors.Wrap(err, "failed to unmarshalany") } - switch evt.(type) { + switch e := evt.(type) { case *eventtypes.TaskExit: - id = evt.(*eventtypes.TaskExit).ContainerID + id = e.ContainerID case *eventtypes.TaskOOM: - id = evt.(*eventtypes.TaskOOM).ContainerID + id = e.ContainerID case *eventtypes.ImageCreate: - id = evt.(*eventtypes.ImageCreate).Name + id = e.Name case *eventtypes.ImageUpdate: - id = evt.(*eventtypes.ImageUpdate).Name + id = e.Name case *eventtypes.ImageDelete: - id = evt.(*eventtypes.ImageDelete).Name + id = e.Name default: return "", nil, errors.New("unsupported event") } @@ -200,9 +200,8 @@ func (em *eventMonitor) handleEvent(any interface{}) error { ctx, cancel := context.WithTimeout(ctx, handleEventTimeout) defer cancel() - switch any.(type) { + switch e := any.(type) { case *eventtypes.TaskExit: - e := any.(*eventtypes.TaskExit) logrus.Infof("TaskExit event %+v", e) // Use ID instead of ContainerID to rule out TaskExit event for exec. cntr, err := em.c.containerStore.Get(e.ID) @@ -226,7 +225,6 @@ func (em *eventMonitor) handleEvent(any interface{}) error { } return nil case *eventtypes.TaskOOM: - e := any.(*eventtypes.TaskOOM) logrus.Infof("TaskOOM event %+v", e) // For TaskOOM, we only care which container it belongs to. cntr, err := em.c.containerStore.Get(e.ContainerID) @@ -244,15 +242,12 @@ func (em *eventMonitor) handleEvent(any interface{}) error { return errors.Wrap(err, "failed to update container status for TaskOOM event") } case *eventtypes.ImageCreate: - e := any.(*eventtypes.ImageCreate) logrus.Infof("ImageCreate event %+v", e) return em.c.updateImage(ctx, e.Name) case *eventtypes.ImageUpdate: - e := any.(*eventtypes.ImageUpdate) logrus.Infof("ImageUpdate event %+v", e) return em.c.updateImage(ctx, e.Name) case *eventtypes.ImageDelete: - e := any.(*eventtypes.ImageDelete) logrus.Infof("ImageDelete event %+v", e) return em.c.updateImage(ctx, e.Name) } diff --git a/pkg/server/io/logger.go b/pkg/server/io/logger.go index 9d2af2769..c0a0d11ca 100644 --- a/pkg/server/io/logger.go +++ b/pkg/server/io/logger.go @@ -19,6 +19,7 @@ package io import ( "bufio" "bytes" + "fmt" "io" "io/ioutil" "time" @@ -61,6 +62,56 @@ func NewCRILogger(path string, w io.Writer, stream StreamType, maxLen int) (io.W return pwc, stop } +// bufio.ReadLine in golang eats both read errors and tailing newlines +// (See https://golang.org/pkg/bufio/#Reader.ReadLine). When reading +// to io.EOF, it is impossible for the caller to figure out whether +// there is a newline at the end, for example: +// 1) When reading "CONTENT\n", it returns "CONTENT" without error; +// 2) When reading "CONTENT", it also returns "CONTENT" without error. +// +// To differentiate these 2 cases, we need to write a readLine function +// ourselves to not ignore the error. +// +// The code is similar with https://golang.org/src/bufio/bufio.go?s=9537:9604#L359. +// The only difference is that it returns all errors from `ReadSlice`. +// +// readLine returns err != nil if and only if line does not end with a new line. +func readLine(b *bufio.Reader) (line []byte, isPrefix bool, err error) { + line, err = b.ReadSlice('\n') + if err == bufio.ErrBufferFull { + // Handle the case where "\r\n" straddles the buffer. + if len(line) > 0 && line[len(line)-1] == '\r' { + // Unread the last '\r' + if err := b.UnreadByte(); err != nil { + panic(fmt.Sprintf("invalid unread %v", err)) + } + line = line[:len(line)-1] + } + return line, true, nil + } + + if len(line) == 0 { + if err != nil { + line = nil + } + return + } + + if line[len(line)-1] == '\n' { + // "ReadSlice returns err != nil if and only if line does not end in delim" + // (See https://golang.org/pkg/bufio/#Reader.ReadSlice). + if err != nil { + panic(fmt.Sprintf("full read with unexpected error %v", err)) + } + drop := 1 + if len(line) > 1 && line[len(line)-2] == '\r' { + drop = 2 + } + line = line[:len(line)-drop] + } + return +} + func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxLen int) { defer rc.Close() var ( @@ -88,7 +139,16 @@ func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxL } for { var stop bool - newLine, isPrefix, err := r.ReadLine() + newLine, isPrefix, err := readLine(r) + // NOTE(random-liu): readLine can return actual content even if there is an error. + if len(newLine) > 0 { + // Buffer returned by ReadLine will change after + // next read, copy it. + l := make([]byte, len(newLine)) + copy(l, newLine) + buf = append(buf, l) + length += len(l) + } if err != nil { if err == io.EOF { logrus.Debugf("Getting EOF from stream %q while redirecting to log file %q", s, path) @@ -101,13 +161,6 @@ func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxL } // Stop after writing the content left in buffer. stop = true - } else { - // Buffer returned by ReadLine will change after - // next read, copy it. - l := make([]byte, len(newLine)) - copy(l, newLine) - buf = append(buf, l) - length += len(l) } if maxLen > 0 && length > maxLen { exceedLen := length - maxLen @@ -125,7 +178,14 @@ func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxL if isPrefix { continue } - writeLine(full, bytes.Join(buf, nil)) + if stop { + // readLine only returns error when the message doesn't + // end with a newline, in that case it should be treated + // as a partial line. + writeLine(partial, bytes.Join(buf, nil)) + } else { + writeLine(full, bytes.Join(buf, nil)) + } buf = nil length = 0 if stop { diff --git a/pkg/server/io/logger_test.go b/pkg/server/io/logger_test.go index 45d45cc47..8c0384412 100644 --- a/pkg/server/io/logger_test.go +++ b/pkg/server/io/logger_test.go @@ -72,7 +72,7 @@ func TestRedirectLogs(t *testing.T) { maxLen: maxLen, tag: []runtime.LogTag{ runtime.LogTagFull, - runtime.LogTagFull, + runtime.LogTagPartial, }, content: []string{ "test stderr log 1", @@ -222,6 +222,19 @@ func TestRedirectLogs(t *testing.T) { strings.Repeat("a", defaultBufSize*10+20), }, }, + "log length longer than buffer size with tailing \\r\\n": { + input: strings.Repeat("a", defaultBufSize-1) + "\r\n" + strings.Repeat("a", defaultBufSize-1) + "\r\n", + stream: Stdout, + maxLen: -1, + tag: []runtime.LogTag{ + runtime.LogTagFull, + runtime.LogTagFull, + }, + content: []string{ + strings.Repeat("a", defaultBufSize-1), + strings.Repeat("a", defaultBufSize-1), + }, + }, } { t.Logf("TestCase %q", desc) rc := ioutil.NopCloser(strings.NewReader(test.input)) diff --git a/pkg/util/strings.go b/pkg/util/strings.go index d5cbc2e8e..4d06ecbf0 100644 --- a/pkg/util/strings.go +++ b/pkg/util/strings.go @@ -22,7 +22,7 @@ import "strings" // Comparison is case insensitive. func InStringSlice(ss []string, str string) bool { for _, s := range ss { - if strings.ToLower(s) == strings.ToLower(str) { + if strings.EqualFold(s, str) { return true } } @@ -34,7 +34,7 @@ func InStringSlice(ss []string, str string) bool { func SubtractStringSlice(ss []string, str string) []string { var res []string for _, s := range ss { - if strings.ToLower(s) == strings.ToLower(str) { + if strings.EqualFold(s, str) { continue } res = append(res, s)