Add support for CRI partial log.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu 2017-11-17 07:09:43 +00:00
parent cb0d97e74c
commit 48726ecd27
3 changed files with 34 additions and 10 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/fifo" "github.com/containerd/fifo"
"golang.org/x/net/context" "golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
) )
// AttachOptions specifies how to attach to a container. // AttachOptions specifies how to attach to a container.
@ -47,9 +48,9 @@ const (
// Stdin stream type. // Stdin stream type.
Stdin StreamType = "stdin" Stdin StreamType = "stdin"
// Stdout stream type. // Stdout stream type.
Stdout StreamType = "stdout" Stdout StreamType = StreamType(runtime.Stdout)
// Stderr stream type. // Stderr stream type.
Stderr StreamType = "stderr" Stderr StreamType = StreamType(runtime.Stderr)
) )
type wgCloser struct { type wgCloser struct {

View File

@ -26,6 +26,7 @@ import (
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil" cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil"
) )
@ -41,7 +42,7 @@ const (
// POSIX.1 says that write less than PIPE_BUF is atmoic. // POSIX.1 says that write less than PIPE_BUF is atmoic.
pipeBufSize = 4096 pipeBufSize = 4096
// bufSize is the size of the read buffer. // bufSize is the size of the read buffer.
bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - 2 /*2 delimiter*/ - 1 /*eol*/ bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - len(runtime.LogTagPartial) - 3 /*3 delimiter*/ - 1 /*eol*/
) )
// NewDiscardLogger creates logger which discards all the input. // NewDiscardLogger creates logger which discards all the input.
@ -67,10 +68,11 @@ func redirectLogs(path string, rc io.ReadCloser, wc io.WriteCloser, stream Strea
defer wc.Close() defer wc.Close()
streamBytes := []byte(stream) streamBytes := []byte(stream)
delimiterBytes := []byte{delimiter} delimiterBytes := []byte{delimiter}
partialBytes := []byte(runtime.LogTagPartial)
fullBytes := []byte(runtime.LogTagFull)
r := bufio.NewReaderSize(rc, bufSize) r := bufio.NewReaderSize(rc, bufSize)
for { for {
// TODO(random-liu): Better define CRI log format, and escape newline in log. lineBytes, isPrefix, err := r.ReadLine()
lineBytes, _, err := r.ReadLine()
if err == io.EOF { if err == io.EOF {
glog.V(4).Infof("Finish redirecting log file %q", path) glog.V(4).Infof("Finish redirecting log file %q", path)
return return
@ -79,8 +81,12 @@ func redirectLogs(path string, rc io.ReadCloser, wc io.WriteCloser, stream Strea
glog.Errorf("An error occurred when redirecting log file %q: %v", path, err) glog.Errorf("An error occurred when redirecting log file %q: %v", path, err)
return return
} }
tagBytes := fullBytes
if isPrefix {
tagBytes = partialBytes
}
timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano) timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano)
data := bytes.Join([][]byte{timestampBytes, streamBytes, lineBytes}, delimiterBytes) data := bytes.Join([][]byte{timestampBytes, streamBytes, tagBytes, lineBytes}, delimiterBytes)
data = append(data, eol) data = append(data, eol)
if _, err := wc.Write(data); err != nil { if _, err := wc.Write(data); err != nil {
glog.Errorf("Fail to write %q log to log file %q: %v", stream, path, err) glog.Errorf("Fail to write %q log to log file %q: %v", stream, path, err)

View File

@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil" cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil"
) )
@ -33,11 +34,16 @@ func TestRedirectLogs(t *testing.T) {
for desc, test := range map[string]struct { for desc, test := range map[string]struct {
input string input string
stream StreamType stream StreamType
tag []runtime.LogTag
content []string content []string
}{ }{
"stdout log": { "stdout log": {
input: "test stdout log 1\ntest stdout log 2", input: "test stdout log 1\ntest stdout log 2",
stream: Stdout, stream: Stdout,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
},
content: []string{ content: []string{
"test stdout log 1", "test stdout log 1",
"test stdout log 2", "test stdout log 2",
@ -46,15 +52,25 @@ func TestRedirectLogs(t *testing.T) {
"stderr log": { "stderr log": {
input: "test stderr log 1\ntest stderr log 2", input: "test stderr log 1\ntest stderr log 2",
stream: Stderr, stream: Stderr,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
},
content: []string{ content: []string{
"test stderr log 1", "test stderr log 1",
"test stderr log 2", "test stderr log 2",
}, },
}, },
"long log": { "long log": {
input: strings.Repeat("a", bufSize+10) + "\n", input: strings.Repeat("a", 2*bufSize+10) + "\n",
stream: Stdout, stream: Stdout,
tag: []runtime.LogTag{
runtime.LogTagPartial,
runtime.LogTagPartial,
runtime.LogTagFull,
},
content: []string{ content: []string{
strings.Repeat("a", bufSize),
strings.Repeat("a", bufSize), strings.Repeat("a", bufSize),
strings.Repeat("a", 10), strings.Repeat("a", 10),
}, },
@ -70,12 +86,13 @@ func TestRedirectLogs(t *testing.T) {
lines = lines[:len(lines)-1] // Discard empty string after last \n lines = lines[:len(lines)-1] // Discard empty string after last \n
assert.Len(t, lines, len(test.content)) assert.Len(t, lines, len(test.content))
for i := range lines { for i := range lines {
fields := strings.SplitN(lines[i], string([]byte{delimiter}), 3) fields := strings.SplitN(lines[i], string([]byte{delimiter}), 4)
require.Len(t, fields, 3) require.Len(t, fields, 4)
_, err := time.Parse(timestampFormat, fields[0]) _, err := time.Parse(timestampFormat, fields[0])
assert.NoError(t, err) assert.NoError(t, err)
assert.EqualValues(t, test.stream, fields[1]) assert.EqualValues(t, test.stream, fields[1])
assert.Equal(t, test.content[i], fields[2]) assert.Equal(t, string(test.tag[i]), fields[2])
assert.Equal(t, test.content[i], fields[3])
} }
} }
} }