diff --git a/pkg/server/io/helpers.go b/pkg/server/io/helpers.go index 88b5c076d..d41a67726 100644 --- a/pkg/server/io/helpers.go +++ b/pkg/server/io/helpers.go @@ -27,6 +27,7 @@ import ( "github.com/containerd/containerd" "github.com/containerd/fifo" "golang.org/x/net/context" + "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) // AttachOptions specifies how to attach to a container. @@ -47,9 +48,9 @@ const ( // Stdin stream type. Stdin StreamType = "stdin" // Stdout stream type. - Stdout StreamType = "stdout" + Stdout StreamType = StreamType(runtime.Stdout) // Stderr stream type. - Stderr StreamType = "stderr" + Stderr StreamType = StreamType(runtime.Stderr) ) type wgCloser struct { diff --git a/pkg/server/io/logger.go b/pkg/server/io/logger.go index da2c4fc0e..11f85a625 100644 --- a/pkg/server/io/logger.go +++ b/pkg/server/io/logger.go @@ -26,6 +26,7 @@ import ( "time" "github.com/golang/glog" + "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil" ) @@ -41,7 +42,7 @@ const ( // POSIX.1 says that write less than PIPE_BUF is atmoic. pipeBufSize = 4096 // bufSize is the size of the read buffer. - bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - 2 /*2 delimiter*/ - 1 /*eol*/ + bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - len(runtime.LogTagPartial) - 3 /*3 delimiter*/ - 1 /*eol*/ ) // NewDiscardLogger creates logger which discards all the input. @@ -67,10 +68,11 @@ func redirectLogs(path string, rc io.ReadCloser, wc io.WriteCloser, stream Strea defer wc.Close() streamBytes := []byte(stream) delimiterBytes := []byte{delimiter} + partialBytes := []byte(runtime.LogTagPartial) + fullBytes := []byte(runtime.LogTagFull) r := bufio.NewReaderSize(rc, bufSize) for { - // TODO(random-liu): Better define CRI log format, and escape newline in log. - lineBytes, _, err := r.ReadLine() + lineBytes, isPrefix, err := r.ReadLine() if err == io.EOF { glog.V(4).Infof("Finish redirecting log file %q", path) return @@ -79,8 +81,12 @@ func redirectLogs(path string, rc io.ReadCloser, wc io.WriteCloser, stream Strea glog.Errorf("An error occurred when redirecting log file %q: %v", path, err) return } + tagBytes := fullBytes + if isPrefix { + tagBytes = partialBytes + } timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano) - data := bytes.Join([][]byte{timestampBytes, streamBytes, lineBytes}, delimiterBytes) + data := bytes.Join([][]byte{timestampBytes, streamBytes, tagBytes, lineBytes}, delimiterBytes) data = append(data, eol) if _, err := wc.Write(data); err != nil { glog.Errorf("Fail to write %q log to log file %q: %v", stream, path, err) diff --git a/pkg/server/io/logger_test.go b/pkg/server/io/logger_test.go index 9b788c8b2..1dbc03916 100644 --- a/pkg/server/io/logger_test.go +++ b/pkg/server/io/logger_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil" ) @@ -33,11 +34,16 @@ func TestRedirectLogs(t *testing.T) { for desc, test := range map[string]struct { input string stream StreamType + tag []runtime.LogTag content []string }{ "stdout log": { input: "test stdout log 1\ntest stdout log 2", stream: Stdout, + tag: []runtime.LogTag{ + runtime.LogTagFull, + runtime.LogTagFull, + }, content: []string{ "test stdout log 1", "test stdout log 2", @@ -46,15 +52,25 @@ func TestRedirectLogs(t *testing.T) { "stderr log": { input: "test stderr log 1\ntest stderr log 2", stream: Stderr, + tag: []runtime.LogTag{ + runtime.LogTagFull, + runtime.LogTagFull, + }, content: []string{ "test stderr log 1", "test stderr log 2", }, }, "long log": { - input: strings.Repeat("a", bufSize+10) + "\n", + input: strings.Repeat("a", 2*bufSize+10) + "\n", stream: Stdout, + tag: []runtime.LogTag{ + runtime.LogTagPartial, + runtime.LogTagPartial, + runtime.LogTagFull, + }, content: []string{ + strings.Repeat("a", bufSize), strings.Repeat("a", bufSize), strings.Repeat("a", 10), }, @@ -70,12 +86,13 @@ func TestRedirectLogs(t *testing.T) { lines = lines[:len(lines)-1] // Discard empty string after last \n assert.Len(t, lines, len(test.content)) for i := range lines { - fields := strings.SplitN(lines[i], string([]byte{delimiter}), 3) - require.Len(t, fields, 3) + fields := strings.SplitN(lines[i], string([]byte{delimiter}), 4) + require.Len(t, fields, 4) _, err := time.Parse(timestampFormat, fields[0]) assert.NoError(t, err) assert.EqualValues(t, test.stream, fields[1]) - assert.Equal(t, test.content[i], fields[2]) + assert.Equal(t, string(test.tag[i]), fields[2]) + assert.Equal(t, test.content[i], fields[3]) } } }