Add max_container_log_size

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu 2018-05-04 23:09:43 -07:00
parent b39546ce2b
commit 405f57f8e0
9 changed files with 390 additions and 86 deletions

View File

@ -27,6 +27,11 @@ The explanation and default value of each configuration item are as follows:
# enable_tls_streaming enables the TLS streaming support.
enable_tls_streaming = false
# max_container_log_line_size is the maximum log line size in bytes for a container.
# Log line longer than the limit will be split into multiple lines. -1 means no
# limit.
max_container_log_line_size = 16384
# "plugins.cri.containerd" contains config related to containerd
[plugins.cri.containerd]

View File

@ -96,6 +96,10 @@ type PluginConfig struct {
SystemdCgroup bool `toml:"systemd_cgroup" json:"systemdCgroup"`
// EnableTLSStreaming indicates to enable the TLS streaming support.
EnableTLSStreaming bool `toml:"enable_tls_streaming" json:"enableTLSStreaming"`
// MaxContainerLogLineSize is the maximum log line size in bytes for a container.
// Log line longer than the limit will be split into multiple lines. Non-positive
// value means no limit.
MaxContainerLogLineSize int `toml:"max_container_log_line_size" json:"maxContainerLogSize"`
}
// Config contains all configurations for cri server.
@ -129,13 +133,14 @@ func DefaultConfig() PluginConfig {
Root: "",
},
},
StreamServerAddress: "",
StreamServerPort: "10010",
EnableSelinux: false,
EnableTLSStreaming: false,
SandboxImage: "k8s.gcr.io/pause:3.1",
StatsCollectPeriod: 10,
SystemdCgroup: false,
StreamServerAddress: "",
StreamServerPort: "10010",
EnableSelinux: false,
EnableTLSStreaming: false,
SandboxImage: "k8s.gcr.io/pause:3.1",
StatsCollectPeriod: 10,
SystemdCgroup: false,
MaxContainerLogLineSize: 16 * 1024,
Registry: Registry{
Mirrors: map[string]Mirror{
"docker.io": {

View File

@ -16,7 +16,10 @@ limitations under the License.
package ioutil
import "io"
import (
"io"
"sync"
)
// writeCloseInformer wraps passed in write closer with a close channel.
// Caller could wait on the close channel for the write closer to be
@ -66,3 +69,34 @@ func (n *nopWriteCloser) Write(p []byte) (int, error) {
func (n *nopWriteCloser) Close() error {
return nil
}
// serialWriteCloser wraps a write closer and makes sure all writes
// are done in serial.
// Parallel write won't intersect with each other. Use case:
// 1) Pipe: Write content longer than PIPE_BUF.
// See http://man7.org/linux/man-pages/man7/pipe.7.html
// 2) <3.14 Linux Kernel: write is not atomic
// See http://man7.org/linux/man-pages/man2/write.2.html
type serialWriteCloser struct {
mu sync.Mutex
wc io.WriteCloser
}
// NewSerialWriteCloser creates a SerialWriteCloser from a write closer.
func NewSerialWriteCloser(wc io.WriteCloser) io.WriteCloser {
return &serialWriteCloser{wc: wc}
}
// Write writes a group of byte arrays in order atomically.
func (s *serialWriteCloser) Write(data []byte) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.wc.Write(data)
}
// Close closes the write closer.
func (s *serialWriteCloser) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.wc.Close()
}

View File

@ -17,9 +17,16 @@ limitations under the License.
package ioutil
import (
"io/ioutil"
"os"
"sort"
"strconv"
"strings"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestWriteCloseInformer(t *testing.T) {
@ -47,3 +54,55 @@ func TestWriteCloseInformer(t *testing.T) {
assert.Fail(t, "write closer not closed")
}
}
func TestSerialWriteCloser(t *testing.T) {
const (
// Test 10 times to make sure it always pass.
testCount = 10
goroutine = 10
dataLen = 100000
)
for n := 0; n < testCount; n++ {
testData := make([][]byte, goroutine)
for i := 0; i < goroutine; i++ {
testData[i] = []byte(repeatNumber(i, dataLen) + "\n")
}
f, err := ioutil.TempFile("/tmp", "serial-write-closer")
require.NoError(t, err)
defer os.RemoveAll(f.Name())
defer f.Close()
wc := NewSerialWriteCloser(f)
defer wc.Close()
// Write data in parallel
var wg sync.WaitGroup
wg.Add(goroutine)
for i := 0; i < goroutine; i++ {
go func(id int) {
n, err := wc.Write(testData[id])
assert.NoError(t, err)
assert.Equal(t, dataLen+1, n)
wg.Done()
}(i)
}
wg.Wait()
wc.Close()
// Check test result
content, err := ioutil.ReadFile(f.Name())
require.NoError(t, err)
resultData := strings.Split(strings.TrimSpace(string(content)), "\n")
require.Len(t, resultData, goroutine)
sort.Strings(resultData)
for i := 0; i < goroutine; i++ {
expected := repeatNumber(i, dataLen)
assert.Equal(t, expected, resultData[i])
}
}
}
func repeatNumber(num, count int) string {
return strings.Repeat(strconv.Itoa(num), count)
}

View File

@ -36,7 +36,7 @@ func (c *criService) ReopenContainerLog(ctx context.Context, r *runtime.ReopenCo
}
// Create new container logger and replace the existing ones.
stdoutWC, stderrWC, err := createContainerLoggers(container.LogPath, container.Config.GetTty())
stdoutWC, stderrWC, err := c.createContainerLoggers(container.LogPath, container.Config.GetTty())
if err != nil {
return nil, err
}

View File

@ -18,6 +18,7 @@ package server
import (
"io"
"os"
"time"
"github.com/containerd/containerd"
@ -29,6 +30,7 @@ import (
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
cioutil "github.com/containerd/cri/pkg/ioutil"
cio "github.com/containerd/cri/pkg/server/io"
containerstore "github.com/containerd/cri/pkg/store/container"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
@ -97,20 +99,10 @@ func (c *criService) startContainer(ctx context.Context,
}
ioCreation := func(id string) (_ containerdio.IO, err error) {
stdoutWC, stderrWC, err := createContainerLoggers(meta.LogPath, config.GetTty())
stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, config.GetTty())
if err != nil {
return nil, errors.Wrap(err, "failed to create container loggers")
}
defer func() {
if err != nil {
if stdoutWC != nil {
stdoutWC.Close()
}
if stderrWC != nil {
stderrWC.Close()
}
}
}()
cntr.IO.AddOutput("log", stdoutWC, stderrWC)
cntr.IO.Pipe()
return cntr.IO, nil
@ -142,24 +134,36 @@ func (c *criService) startContainer(ctx context.Context,
return nil
}
// Create container loggers and return write closer for stdout and stderr.
func createContainerLoggers(logPath string, tty bool) (stdout io.WriteCloser, stderr io.WriteCloser, err error) {
// createContainerLoggers creates container loggers and return write closer for stdout and stderr.
func (c *criService) createContainerLoggers(logPath string, tty bool) (stdout io.WriteCloser, stderr io.WriteCloser, err error) {
if logPath != "" {
// Only generate container log when log path is specified.
if stdout, err = cio.NewCRILogger(logPath, cio.Stdout); err != nil {
return nil, nil, errors.Wrap(err, "failed to start container stdout logger")
f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create and open log file")
}
defer func() {
if err != nil {
stdout.Close()
f.Close()
}
}()
var stdoutCh, stderrCh <-chan struct{}
wc := cioutil.NewSerialWriteCloser(f)
stdout, stdoutCh = cio.NewCRILogger(logPath, wc, cio.Stdout, c.config.MaxContainerLogLineSize)
// Only redirect stderr when there is no tty.
if !tty {
if stderr, err = cio.NewCRILogger(logPath, cio.Stderr); err != nil {
return nil, nil, errors.Wrap(err, "failed to start container stderr logger")
}
stderr, stderrCh = cio.NewCRILogger(logPath, wc, cio.Stderr, c.config.MaxContainerLogLineSize)
}
go func() {
if stdoutCh != nil {
<-stdoutCh
}
if stderrCh != nil {
<-stderrCh
}
logrus.Debugf("Finish redirecting log file %q, closing it", logPath)
f.Close()
}()
} else {
stdout = cio.NewDiscardLogger()
stderr = cio.NewDiscardLogger()

View File

@ -21,10 +21,8 @@ import (
"bytes"
"io"
"io/ioutil"
"os"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
@ -38,11 +36,8 @@ const (
eol = '\n'
// timestampFormat is the timestamp format used in CRI logging format.
timestampFormat = time.RFC3339Nano
// pipeBufSize is the system PIPE_BUF size, on linux it is 4096 bytes.
// POSIX.1 says that write less than PIPE_BUF is atmoic.
pipeBufSize = 4096
// bufSize is the size of the read buffer.
bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - len(runtime.LogTagPartial) - 3 /*3 delimiter*/ - 1 /*eol*/
// defaultBufSize is the default size of the read buffer in bytes.
defaultBufSize = 4096
)
// NewDiscardLogger creates logger which discards all the input.
@ -51,46 +46,91 @@ func NewDiscardLogger() io.WriteCloser {
}
// NewCRILogger returns a write closer which redirect container log into
// log file, and decorate the log line into CRI defined format.
func NewCRILogger(path string, stream StreamType) (io.WriteCloser, error) {
logrus.Debugf("Start writing log file %q", path)
// log file, and decorate the log line into CRI defined format. It also
// returns a channel which indicates whether the logger is stopped.
// maxLen is the max length limit of a line. A line longer than the
// limit will be cut into multiple lines.
func NewCRILogger(path string, w io.Writer, stream StreamType, maxLen int) (io.WriteCloser, <-chan struct{}) {
logrus.Debugf("Start writing stream %q to log file %q", stream, path)
prc, pwc := io.Pipe()
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
if err != nil {
return nil, errors.Wrap(err, "failed to open log file")
}
go redirectLogs(path, prc, f, stream)
return pwc, nil
stop := make(chan struct{})
go func() {
redirectLogs(path, prc, w, stream, maxLen)
close(stop)
}()
return pwc, stop
}
func redirectLogs(path string, rc io.ReadCloser, wc io.WriteCloser, stream StreamType) {
func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxLen int) {
defer rc.Close()
defer wc.Close()
streamBytes := []byte(stream)
delimiterBytes := []byte{delimiter}
partialBytes := []byte(runtime.LogTagPartial)
fullBytes := []byte(runtime.LogTagFull)
r := bufio.NewReaderSize(rc, bufSize)
for {
lineBytes, isPrefix, err := r.ReadLine()
if err == io.EOF {
logrus.Debugf("Finish redirecting log file %q", path)
return
}
if err != nil {
logrus.WithError(err).Errorf("An error occurred when redirecting log file %q", path)
return
}
tagBytes := fullBytes
if isPrefix {
tagBytes = partialBytes
}
timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano)
data := bytes.Join([][]byte{timestampBytes, streamBytes, tagBytes, lineBytes}, delimiterBytes)
data = append(data, eol)
if _, err := wc.Write(data); err != nil {
logrus.WithError(err).Errorf("Fail to write %q log to log file %q", stream, path)
}
// Continue on write error to drain the input.
var (
stream = []byte(s)
delimiter = []byte{delimiter}
partial = []byte(runtime.LogTagPartial)
full = []byte(runtime.LogTagFull)
buf [][]byte
length int
bufSize = defaultBufSize
)
// Make sure bufSize <= maxLen
if maxLen > 0 && maxLen < bufSize {
bufSize = maxLen
}
r := bufio.NewReaderSize(rc, bufSize)
writeLine := func(tag, line []byte) {
timestamp := time.Now().AppendFormat(nil, timestampFormat)
data := bytes.Join([][]byte{timestamp, stream, tag, line}, delimiter)
data = append(data, eol)
if _, err := w.Write(data); err != nil {
logrus.WithError(err).Errorf("Fail to write %q log to log file %q", s, path)
// Continue on write error to drain the container output.
}
}
for {
var stop bool
newLine, isPrefix, err := r.ReadLine()
if err != nil {
if err == io.EOF {
logrus.Debugf("Getting EOF from stream %q while redirecting to log file %q", s, path)
} else {
logrus.WithError(err).Errorf("An error occurred when redirecting stream %q to log file %q", s, path)
}
if length == 0 {
// No content left to write, break.
break
}
// Stop after writing the content left in buffer.
stop = true
} else {
// Buffer returned by ReadLine will change after
// next read, copy it.
l := make([]byte, len(newLine))
copy(l, newLine)
buf = append(buf, l)
length += len(l)
}
if maxLen > 0 && length > maxLen {
exceedLen := length - maxLen
last := buf[len(buf)-1]
if exceedLen > len(last) {
// exceedLen must <= len(last), or else the buffer
// should have be written in the previous iteration.
panic("exceed length should <= last buffer size")
}
buf[len(buf)-1] = last[:len(last)-exceedLen]
writeLine(partial, bytes.Join(buf, nil))
buf = [][]byte{last[len(last)-exceedLen:]}
length = exceedLen
}
if isPrefix {
continue
}
writeLine(full, bytes.Join(buf, nil))
buf = nil
length = 0
if stop {
break
}
}
logrus.Debugf("Finish redirecting stream %q to log file %q", s, path)
}

View File

@ -31,15 +31,19 @@ import (
)
func TestRedirectLogs(t *testing.T) {
// defaultBufSize is even number
const maxLen = defaultBufSize * 4
for desc, test := range map[string]struct {
input string
stream StreamType
maxLen int
tag []runtime.LogTag
content []string
}{
"stdout log": {
input: "test stdout log 1\ntest stdout log 2",
input: "test stdout log 1\ntest stdout log 2\n",
stream: Stdout,
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
@ -50,8 +54,9 @@ func TestRedirectLogs(t *testing.T) {
},
},
"stderr log": {
input: "test stderr log 1\ntest stderr log 2",
input: "test stderr log 1\ntest stderr log 2\n",
stream: Stderr,
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
@ -61,18 +66,160 @@ func TestRedirectLogs(t *testing.T) {
"test stderr log 2",
},
},
"long log": {
input: strings.Repeat("a", 2*bufSize+10) + "\n",
stream: Stdout,
"log ends without newline": {
input: "test stderr log 1\ntest stderr log 2",
stream: Stderr,
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
},
content: []string{
"test stderr log 1",
"test stderr log 2",
},
},
"log length equal to buffer size": {
input: strings.Repeat("a", defaultBufSize) + "\n" + strings.Repeat("a", defaultBufSize) + "\n",
stream: Stdout,
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", defaultBufSize),
strings.Repeat("a", defaultBufSize),
},
},
"log length longer than buffer size": {
input: strings.Repeat("a", defaultBufSize*2+10) + "\n" + strings.Repeat("a", defaultBufSize*2+20) + "\n",
stream: Stdout,
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", defaultBufSize*2+10),
strings.Repeat("a", defaultBufSize*2+20),
},
},
"log length equal to max length": {
input: strings.Repeat("a", maxLen) + "\n" + strings.Repeat("a", maxLen) + "\n",
stream: Stdout,
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", maxLen),
strings.Repeat("a", maxLen),
},
},
"log length exceed max length by 1": {
input: strings.Repeat("a", maxLen+1) + "\n" + strings.Repeat("a", maxLen+1) + "\n",
stream: Stdout,
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagPartial,
runtime.LogTagFull,
runtime.LogTagPartial,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", maxLen),
"a",
strings.Repeat("a", maxLen),
"a",
},
},
"log length longer than max length": {
input: strings.Repeat("a", maxLen*2) + "\n" + strings.Repeat("a", maxLen*2+1) + "\n",
stream: Stdout,
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagPartial,
runtime.LogTagFull,
runtime.LogTagPartial,
runtime.LogTagPartial,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", bufSize),
strings.Repeat("a", bufSize),
strings.Repeat("a", maxLen),
strings.Repeat("a", maxLen),
strings.Repeat("a", maxLen),
strings.Repeat("a", maxLen),
"a",
},
},
"max length shorter than buffer size": {
input: strings.Repeat("a", defaultBufSize*3/2+10) + "\n" + strings.Repeat("a", defaultBufSize*3/2+20) + "\n",
stream: Stdout,
maxLen: defaultBufSize / 2,
tag: []runtime.LogTag{
runtime.LogTagPartial,
runtime.LogTagPartial,
runtime.LogTagPartial,
runtime.LogTagFull,
runtime.LogTagPartial,
runtime.LogTagPartial,
runtime.LogTagPartial,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", defaultBufSize*1/2),
strings.Repeat("a", defaultBufSize*1/2),
strings.Repeat("a", defaultBufSize*1/2),
strings.Repeat("a", 10),
strings.Repeat("a", defaultBufSize*1/2),
strings.Repeat("a", defaultBufSize*1/2),
strings.Repeat("a", defaultBufSize*1/2),
strings.Repeat("a", 20),
},
},
"log length longer than max length, and (maxLen % defaultBufSize != 0)": {
input: strings.Repeat("a", defaultBufSize*2+10) + "\n" + strings.Repeat("a", defaultBufSize*2+20) + "\n",
stream: Stdout,
maxLen: defaultBufSize * 3 / 2,
tag: []runtime.LogTag{
runtime.LogTagPartial,
runtime.LogTagFull,
runtime.LogTagPartial,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", defaultBufSize*3/2),
strings.Repeat("a", defaultBufSize*1/2+10),
strings.Repeat("a", defaultBufSize*3/2),
strings.Repeat("a", defaultBufSize*1/2+20),
},
},
"no limit if max length is 0": {
input: strings.Repeat("a", defaultBufSize*10+10) + "\n" + strings.Repeat("a", defaultBufSize*10+20) + "\n",
stream: Stdout,
maxLen: 0,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", defaultBufSize*10+10),
strings.Repeat("a", defaultBufSize*10+20),
},
},
"no limit if max length is negative": {
input: strings.Repeat("a", defaultBufSize*10+10) + "\n" + strings.Repeat("a", defaultBufSize*10+20) + "\n",
stream: Stdout,
maxLen: -1,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", defaultBufSize*10+10),
strings.Repeat("a", defaultBufSize*10+20),
},
},
} {
@ -80,7 +227,7 @@ func TestRedirectLogs(t *testing.T) {
rc := ioutil.NopCloser(strings.NewReader(test.input))
buf := bytes.NewBuffer(nil)
wc := cioutil.NewNopWriteCloser(buf)
redirectLogs("test-path", rc, wc, test.stream)
redirectLogs("test-path", rc, wc, test.stream, test.maxLen)
output := buf.String()
lines := strings.Split(output, "\n")
lines = lines[:len(lines)-1] // Discard empty string after last \n

View File

@ -78,9 +78,7 @@ func (c *criService) recover(ctx context.Context) error {
return errors.Wrap(err, "failed to list containers")
}
for _, container := range containers {
containerDir := c.getContainerRootDir(container.ID())
volatileContainerDir := c.getVolatileContainerRootDir(container.ID())
cntr, err := loadContainer(ctx, container, containerDir, volatileContainerDir)
cntr, err := c.loadContainer(ctx, container)
if err != nil {
logrus.WithError(err).Errorf("Failed to load container %q", container.ID())
continue
@ -149,8 +147,10 @@ func (c *criService) recover(ctx context.Context) error {
}
// loadContainer loads container from containerd and status checkpoint.
func loadContainer(ctx context.Context, cntr containerd.Container, containerDir, volatileContainerDir string) (containerstore.Container, error) {
func (c *criService) loadContainer(ctx context.Context, cntr containerd.Container) (containerstore.Container, error) {
id := cntr.ID()
containerDir := c.getContainerRootDir(id)
volatileContainerDir := c.getVolatileContainerRootDir(id)
var container containerstore.Container
// Load container metadata.
exts, err := cntr.Extensions(ctx)
@ -176,11 +176,21 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir,
// Load up-to-date status from containerd.
var containerIO *cio.ContainerIO
t, err := cntr.Task(ctx, func(fifos *containerdio.FIFOSet) (containerdio.IO, error) {
stdoutWC, stderrWC, err := createContainerLoggers(meta.LogPath, meta.Config.GetTty())
t, err := cntr.Task(ctx, func(fifos *containerdio.FIFOSet) (_ containerdio.IO, err error) {
stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, meta.Config.GetTty())
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if stdoutWC != nil {
stdoutWC.Close()
}
if stderrWC != nil {
stderrWC.Close()
}
}
}()
containerIO, err = cio.NewContainerIO(id,
cio.WithFIFOs(fifos),
)