Merge pull request #761 from Random-Liu/add-log-max-size

Add log max size
This commit is contained in:
Lantao Liu 2018-06-15 15:56:04 -07:00 committed by GitHub
commit e3d57d240f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 576 additions and 116 deletions

View File

@ -27,6 +27,11 @@ The explanation and default value of each configuration item are as follows:
# enable_tls_streaming enables the TLS streaming support.
enable_tls_streaming = false
# max_container_log_line_size is the maximum log line size in bytes for a container.
# Log line longer than the limit will be split into multiple lines. -1 means no
# limit.
max_container_log_line_size = 16384
# "plugins.cri.containerd" contains config related to containerd
[plugins.cri.containerd]

View File

@ -0,0 +1,113 @@
/*
Copyright 2018 The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package integration
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
)
func TestLongContainerLog(t *testing.T) {
testPodLogDir, err := ioutil.TempDir("/tmp", "long-container-log")
require.NoError(t, err)
defer os.RemoveAll(testPodLogDir)
t.Log("Create a sandbox with log directory")
sbConfig := PodSandboxConfig("sandbox", "long-container-log",
WithPodLogDirectory(testPodLogDir),
)
sb, err := runtimeService.RunPodSandbox(sbConfig)
require.NoError(t, err)
defer func() {
assert.NoError(t, runtimeService.StopPodSandbox(sb))
assert.NoError(t, runtimeService.RemovePodSandbox(sb))
}()
const (
testImage = "busybox"
containerName = "test-container"
)
t.Logf("Pull test image %q", testImage)
img, err := imageService.PullImage(&runtime.ImageSpec{Image: testImage}, nil)
require.NoError(t, err)
defer func() {
assert.NoError(t, imageService.RemoveImage(&runtime.ImageSpec{Image: img}))
}()
t.Log("Create a container with log path")
config, err := CRIConfig()
require.NoError(t, err)
maxSize := config.MaxContainerLogLineSize
shortLineCmd := fmt.Sprintf("i=0; while [ $i -lt %d ]; do printf %s; i=$((i+1)); done", maxSize-1, "a")
maxLenLineCmd := fmt.Sprintf("i=0; while [ $i -lt %d ]; do printf %s; i=$((i+1)); done", maxSize, "b")
longLineCmd := fmt.Sprintf("i=0; while [ $i -lt %d ]; do printf %s; i=$((i+1)); done", maxSize+1, "c")
cnConfig := ContainerConfig(
containerName,
"busybox",
WithCommand("sh", "-c",
fmt.Sprintf("%s; echo; %s; echo; %s", shortLineCmd, maxLenLineCmd, longLineCmd)),
WithLogPath(containerName),
)
cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig)
require.NoError(t, err)
t.Log("Start the container")
require.NoError(t, runtimeService.StartContainer(cn))
t.Log("Wait for container to finish running")
require.NoError(t, Eventually(func() (bool, error) {
s, err := runtimeService.ContainerStatus(cn)
if err != nil {
return false, err
}
if s.GetState() == runtime.ContainerState_CONTAINER_EXITED {
return true, nil
}
return false, nil
}, time.Second, 30*time.Second))
t.Log("Check container log")
content, err := ioutil.ReadFile(filepath.Join(testPodLogDir, containerName))
assert.NoError(t, err)
checkContainerLog(t, string(content), []string{
fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagFull, strings.Repeat("a", maxSize-1)),
fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagFull, strings.Repeat("b", maxSize)),
fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagPartial, strings.Repeat("c", maxSize)),
fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagFull, "c"),
})
}
func checkContainerLog(t *testing.T, log string, messages []string) {
lines := strings.Split(strings.TrimSpace(log), "\n")
require.Len(t, lines, len(messages), "log line number should match")
for i, line := range lines {
parts := strings.SplitN(line, " ", 2)
require.Len(t, parts, 2)
_, err := time.Parse(time.RFC3339Nano, parts[0])
assert.NoError(t, err, "timestamp should be in RFC3339Nano format")
assert.Equal(t, messages[i], parts[1], "log content should match")
}
}

View File

@ -37,7 +37,7 @@ func checkMemoryLimit(t *testing.T, spec *runtimespec.Spec, memLimit int64) {
}
func TestUpdateContainerResources(t *testing.T) {
t.Logf("Create a sandbox")
t.Log("Create a sandbox")
sbConfig := PodSandboxConfig("sandbox", "update-container-resources")
sb, err := runtimeService.RunPodSandbox(sbConfig)
require.NoError(t, err)
@ -46,7 +46,7 @@ func TestUpdateContainerResources(t *testing.T) {
assert.NoError(t, runtimeService.RemovePodSandbox(sb))
}()
t.Logf("Create a container with memory limit")
t.Log("Create a container with memory limit")
cnConfig := ContainerConfig(
"container",
pauseImage,
@ -57,48 +57,48 @@ func TestUpdateContainerResources(t *testing.T) {
cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig)
require.NoError(t, err)
t.Logf("Check memory limit in container OCI spec")
t.Log("Check memory limit in container OCI spec")
container, err := containerdClient.LoadContainer(context.Background(), cn)
require.NoError(t, err)
spec, err := container.Spec(context.Background())
require.NoError(t, err)
checkMemoryLimit(t, spec, 2*1024*1024)
t.Logf("Update container memory limit after created")
t.Log("Update container memory limit after created")
err = runtimeService.UpdateContainerResources(cn, &runtime.LinuxContainerResources{
MemoryLimitInBytes: 4 * 1024 * 1024,
})
require.NoError(t, err)
t.Logf("Check memory limit in container OCI spec")
t.Log("Check memory limit in container OCI spec")
spec, err = container.Spec(context.Background())
require.NoError(t, err)
checkMemoryLimit(t, spec, 4*1024*1024)
t.Logf("Start the container")
t.Log("Start the container")
require.NoError(t, runtimeService.StartContainer(cn))
task, err := container.Task(context.Background(), nil)
require.NoError(t, err)
t.Logf("Check memory limit in cgroup")
t.Log("Check memory limit in cgroup")
cgroup, err := cgroups.Load(cgroups.V1, cgroups.PidPath(int(task.Pid())))
require.NoError(t, err)
stat, err := cgroup.Stat(cgroups.IgnoreNotExist)
require.NoError(t, err)
assert.Equal(t, uint64(4*1024*1024), stat.Memory.Usage.Limit)
t.Logf("Update container memory limit after started")
t.Log("Update container memory limit after started")
err = runtimeService.UpdateContainerResources(cn, &runtime.LinuxContainerResources{
MemoryLimitInBytes: 8 * 1024 * 1024,
})
require.NoError(t, err)
t.Logf("Check memory limit in container OCI spec")
t.Log("Check memory limit in container OCI spec")
spec, err = container.Spec(context.Background())
require.NoError(t, err)
checkMemoryLimit(t, spec, 8*1024*1024)
t.Logf("Check memory limit in cgroup")
t.Log("Check memory limit in cgroup")
stat, err = cgroup.Stat(cgroups.IgnoreNotExist)
require.NoError(t, err)
assert.Equal(t, uint64(8*1024*1024), stat.Memory.Usage.Limit)

View File

@ -18,6 +18,7 @@ package integration
import (
"context"
"encoding/json"
"flag"
"fmt"
"os/exec"
@ -28,12 +29,15 @@ import (
"github.com/containerd/containerd"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"k8s.io/kubernetes/pkg/kubelet/apis/cri"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/remote"
kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
api "github.com/containerd/cri/pkg/api/v1"
"github.com/containerd/cri/pkg/client"
criconfig "github.com/containerd/cri/pkg/config"
"github.com/containerd/cri/pkg/constants"
"github.com/containerd/cri/pkg/util"
)
@ -100,6 +104,7 @@ func ConnectDaemons() error {
// Opts sets specific information in pod sandbox config.
type PodSandboxOpts func(*runtime.PodSandboxConfig)
// Set host network.
func WithHostNetwork(p *runtime.PodSandboxConfig) {
if p.Linux == nil {
p.Linux = &runtime.LinuxPodSandboxConfig{}
@ -114,6 +119,13 @@ func WithHostNetwork(p *runtime.PodSandboxConfig) {
}
}
// Add pod log directory.
func WithPodLogDirectory(dir string) PodSandboxOpts {
return func(p *runtime.PodSandboxConfig) {
p.LogDirectory = dir
}
}
// PodSandboxConfig generates a pod sandbox config for test.
func PodSandboxConfig(name, ns string, opts ...PodSandboxOpts) *runtime.PodSandboxConfig {
config := &runtime.PodSandboxConfig{
@ -137,52 +149,59 @@ func PodSandboxConfig(name, ns string, opts ...PodSandboxOpts) *runtime.PodSandb
type ContainerOpts func(*runtime.ContainerConfig)
func WithTestLabels() ContainerOpts {
return func(cf *runtime.ContainerConfig) {
cf.Labels = map[string]string{"key": "value"}
return func(c *runtime.ContainerConfig) {
c.Labels = map[string]string{"key": "value"}
}
}
func WithTestAnnotations() ContainerOpts {
return func(cf *runtime.ContainerConfig) {
cf.Annotations = map[string]string{"a.b.c": "test"}
return func(c *runtime.ContainerConfig) {
c.Annotations = map[string]string{"a.b.c": "test"}
}
}
// Add container resource limits.
func WithResources(r *runtime.LinuxContainerResources) ContainerOpts {
return func(cf *runtime.ContainerConfig) {
if cf.Linux == nil {
cf.Linux = &runtime.LinuxContainerConfig{}
return func(c *runtime.ContainerConfig) {
if c.Linux == nil {
c.Linux = &runtime.LinuxContainerConfig{}
}
cf.Linux.Resources = r
c.Linux.Resources = r
}
}
// Add container command.
func WithCommand(c string, args ...string) ContainerOpts {
return func(cf *runtime.ContainerConfig) {
cf.Command = []string{c}
cf.Args = args
func WithCommand(cmd string, args ...string) ContainerOpts {
return func(c *runtime.ContainerConfig) {
c.Command = []string{cmd}
c.Args = args
}
}
// Add pid namespace mode.
func WithPidNamespace(mode runtime.NamespaceMode) ContainerOpts {
return func(cf *runtime.ContainerConfig) {
if cf.Linux == nil {
cf.Linux = &runtime.LinuxContainerConfig{}
return func(c *runtime.ContainerConfig) {
if c.Linux == nil {
c.Linux = &runtime.LinuxContainerConfig{}
}
if cf.Linux.SecurityContext == nil {
cf.Linux.SecurityContext = &runtime.LinuxContainerSecurityContext{}
if c.Linux.SecurityContext == nil {
c.Linux.SecurityContext = &runtime.LinuxContainerSecurityContext{}
}
if cf.Linux.SecurityContext.NamespaceOptions == nil {
cf.Linux.SecurityContext.NamespaceOptions = &runtime.NamespaceOption{}
if c.Linux.SecurityContext.NamespaceOptions == nil {
c.Linux.SecurityContext.NamespaceOptions = &runtime.NamespaceOption{}
}
cf.Linux.SecurityContext.NamespaceOptions.Pid = mode
c.Linux.SecurityContext.NamespaceOptions.Pid = mode
}
}
// Add container log path.
func WithLogPath(path string) ContainerOpts {
return func(c *runtime.ContainerConfig) {
c.LogPath = path
}
}
// ContainerConfig creates a container config given a name and image name
// and additional container config options
func ContainerConfig(name, image string, opts ...ContainerOpts) *runtime.ContainerConfig {
@ -247,3 +266,27 @@ func PidOf(name string) (int, error) {
}
return strconv.Atoi(output)
}
// CRIConfig gets current cri config from containerd.
func CRIConfig() (*criconfig.Config, error) {
addr, dialer, err := kubeletutil.GetAddressAndDialer(*criEndpoint)
if err != nil {
return nil, errors.Wrap(err, "failed to get dialer")
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithDialer(dialer))
if err != nil {
return nil, errors.Wrap(err, "failed to connect cri endpoint")
}
client := runtime.NewRuntimeServiceClient(conn)
resp, err := client.Status(ctx, &runtime.StatusRequest{Verbose: true})
if err != nil {
return nil, errors.Wrap(err, "failed to get status")
}
config := &criconfig.Config{}
if err := json.Unmarshal([]byte(resp.Info["config"]), config); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal config")
}
return config, nil
}

View File

@ -96,6 +96,10 @@ type PluginConfig struct {
SystemdCgroup bool `toml:"systemd_cgroup" json:"systemdCgroup"`
// EnableTLSStreaming indicates to enable the TLS streaming support.
EnableTLSStreaming bool `toml:"enable_tls_streaming" json:"enableTLSStreaming"`
// MaxContainerLogLineSize is the maximum log line size in bytes for a container.
// Log line longer than the limit will be split into multiple lines. Non-positive
// value means no limit.
MaxContainerLogLineSize int `toml:"max_container_log_line_size" json:"maxContainerLogSize"`
}
// Config contains all configurations for cri server.
@ -129,13 +133,14 @@ func DefaultConfig() PluginConfig {
Root: "",
},
},
StreamServerAddress: "",
StreamServerPort: "10010",
EnableSelinux: false,
EnableTLSStreaming: false,
SandboxImage: "k8s.gcr.io/pause:3.1",
StatsCollectPeriod: 10,
SystemdCgroup: false,
StreamServerAddress: "",
StreamServerPort: "10010",
EnableSelinux: false,
EnableTLSStreaming: false,
SandboxImage: "k8s.gcr.io/pause:3.1",
StatsCollectPeriod: 10,
SystemdCgroup: false,
MaxContainerLogLineSize: 16 * 1024,
Registry: Registry{
Mirrors: map[string]Mirror{
"docker.io": {

View File

@ -16,7 +16,10 @@ limitations under the License.
package ioutil
import "io"
import (
"io"
"sync"
)
// writeCloseInformer wraps passed in write closer with a close channel.
// Caller could wait on the close channel for the write closer to be
@ -66,3 +69,34 @@ func (n *nopWriteCloser) Write(p []byte) (int, error) {
func (n *nopWriteCloser) Close() error {
return nil
}
// serialWriteCloser wraps a write closer and makes sure all writes
// are done in serial.
// Parallel write won't intersect with each other. Use case:
// 1) Pipe: Write content longer than PIPE_BUF.
// See http://man7.org/linux/man-pages/man7/pipe.7.html
// 2) <3.14 Linux Kernel: write is not atomic
// See http://man7.org/linux/man-pages/man2/write.2.html
type serialWriteCloser struct {
mu sync.Mutex
wc io.WriteCloser
}
// NewSerialWriteCloser creates a SerialWriteCloser from a write closer.
func NewSerialWriteCloser(wc io.WriteCloser) io.WriteCloser {
return &serialWriteCloser{wc: wc}
}
// Write writes a group of byte arrays in order atomically.
func (s *serialWriteCloser) Write(data []byte) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.wc.Write(data)
}
// Close closes the write closer.
func (s *serialWriteCloser) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.wc.Close()
}

View File

@ -17,9 +17,16 @@ limitations under the License.
package ioutil
import (
"io/ioutil"
"os"
"sort"
"strconv"
"strings"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestWriteCloseInformer(t *testing.T) {
@ -47,3 +54,55 @@ func TestWriteCloseInformer(t *testing.T) {
assert.Fail(t, "write closer not closed")
}
}
func TestSerialWriteCloser(t *testing.T) {
const (
// Test 10 times to make sure it always pass.
testCount = 10
goroutine = 10
dataLen = 100000
)
for n := 0; n < testCount; n++ {
testData := make([][]byte, goroutine)
for i := 0; i < goroutine; i++ {
testData[i] = []byte(repeatNumber(i, dataLen) + "\n")
}
f, err := ioutil.TempFile("/tmp", "serial-write-closer")
require.NoError(t, err)
defer os.RemoveAll(f.Name())
defer f.Close()
wc := NewSerialWriteCloser(f)
defer wc.Close()
// Write data in parallel
var wg sync.WaitGroup
wg.Add(goroutine)
for i := 0; i < goroutine; i++ {
go func(id int) {
n, err := wc.Write(testData[id])
assert.NoError(t, err)
assert.Equal(t, dataLen+1, n)
wg.Done()
}(i)
}
wg.Wait()
wc.Close()
// Check test result
content, err := ioutil.ReadFile(f.Name())
require.NoError(t, err)
resultData := strings.Split(strings.TrimSpace(string(content)), "\n")
require.Len(t, resultData, goroutine)
sort.Strings(resultData)
for i := 0; i < goroutine; i++ {
expected := repeatNumber(i, dataLen)
assert.Equal(t, expected, resultData[i])
}
}
}
func repeatNumber(num, count int) string {
return strings.Repeat(strconv.Itoa(num), count)
}

View File

@ -36,7 +36,7 @@ func (c *criService) ReopenContainerLog(ctx context.Context, r *runtime.ReopenCo
}
// Create new container logger and replace the existing ones.
stdoutWC, stderrWC, err := createContainerLoggers(container.LogPath, container.Config.GetTty())
stdoutWC, stderrWC, err := c.createContainerLoggers(container.LogPath, container.Config.GetTty())
if err != nil {
return nil, err
}

View File

@ -18,6 +18,7 @@ package server
import (
"io"
"os"
"time"
"github.com/containerd/containerd"
@ -29,6 +30,7 @@ import (
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
cioutil "github.com/containerd/cri/pkg/ioutil"
cio "github.com/containerd/cri/pkg/server/io"
containerstore "github.com/containerd/cri/pkg/store/container"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
@ -97,20 +99,10 @@ func (c *criService) startContainer(ctx context.Context,
}
ioCreation := func(id string) (_ containerdio.IO, err error) {
stdoutWC, stderrWC, err := createContainerLoggers(meta.LogPath, config.GetTty())
stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, config.GetTty())
if err != nil {
return nil, errors.Wrap(err, "failed to create container loggers")
}
defer func() {
if err != nil {
if stdoutWC != nil {
stdoutWC.Close()
}
if stderrWC != nil {
stderrWC.Close()
}
}
}()
cntr.IO.AddOutput("log", stdoutWC, stderrWC)
cntr.IO.Pipe()
return cntr.IO, nil
@ -142,24 +134,36 @@ func (c *criService) startContainer(ctx context.Context,
return nil
}
// Create container loggers and return write closer for stdout and stderr.
func createContainerLoggers(logPath string, tty bool) (stdout io.WriteCloser, stderr io.WriteCloser, err error) {
// createContainerLoggers creates container loggers and return write closer for stdout and stderr.
func (c *criService) createContainerLoggers(logPath string, tty bool) (stdout io.WriteCloser, stderr io.WriteCloser, err error) {
if logPath != "" {
// Only generate container log when log path is specified.
if stdout, err = cio.NewCRILogger(logPath, cio.Stdout); err != nil {
return nil, nil, errors.Wrap(err, "failed to start container stdout logger")
f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create and open log file")
}
defer func() {
if err != nil {
stdout.Close()
f.Close()
}
}()
var stdoutCh, stderrCh <-chan struct{}
wc := cioutil.NewSerialWriteCloser(f)
stdout, stdoutCh = cio.NewCRILogger(logPath, wc, cio.Stdout, c.config.MaxContainerLogLineSize)
// Only redirect stderr when there is no tty.
if !tty {
if stderr, err = cio.NewCRILogger(logPath, cio.Stderr); err != nil {
return nil, nil, errors.Wrap(err, "failed to start container stderr logger")
}
stderr, stderrCh = cio.NewCRILogger(logPath, wc, cio.Stderr, c.config.MaxContainerLogLineSize)
}
go func() {
if stdoutCh != nil {
<-stdoutCh
}
if stderrCh != nil {
<-stderrCh
}
logrus.Debugf("Finish redirecting log file %q, closing it", logPath)
f.Close()
}()
} else {
stdout = cio.NewDiscardLogger()
stderr = cio.NewDiscardLogger()

View File

@ -21,10 +21,8 @@ import (
"bytes"
"io"
"io/ioutil"
"os"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
@ -38,11 +36,8 @@ const (
eol = '\n'
// timestampFormat is the timestamp format used in CRI logging format.
timestampFormat = time.RFC3339Nano
// pipeBufSize is the system PIPE_BUF size, on linux it is 4096 bytes.
// POSIX.1 says that write less than PIPE_BUF is atmoic.
pipeBufSize = 4096
// bufSize is the size of the read buffer.
bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - len(runtime.LogTagPartial) - 3 /*3 delimiter*/ - 1 /*eol*/
// defaultBufSize is the default size of the read buffer in bytes.
defaultBufSize = 4096
)
// NewDiscardLogger creates logger which discards all the input.
@ -51,46 +46,91 @@ func NewDiscardLogger() io.WriteCloser {
}
// NewCRILogger returns a write closer which redirect container log into
// log file, and decorate the log line into CRI defined format.
func NewCRILogger(path string, stream StreamType) (io.WriteCloser, error) {
logrus.Debugf("Start writing log file %q", path)
// log file, and decorate the log line into CRI defined format. It also
// returns a channel which indicates whether the logger is stopped.
// maxLen is the max length limit of a line. A line longer than the
// limit will be cut into multiple lines.
func NewCRILogger(path string, w io.Writer, stream StreamType, maxLen int) (io.WriteCloser, <-chan struct{}) {
logrus.Debugf("Start writing stream %q to log file %q", stream, path)
prc, pwc := io.Pipe()
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
if err != nil {
return nil, errors.Wrap(err, "failed to open log file")
}
go redirectLogs(path, prc, f, stream)
return pwc, nil
stop := make(chan struct{})
go func() {
redirectLogs(path, prc, w, stream, maxLen)
close(stop)
}()
return pwc, stop
}
func redirectLogs(path string, rc io.ReadCloser, wc io.WriteCloser, stream StreamType) {
func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxLen int) {
defer rc.Close()
defer wc.Close()
streamBytes := []byte(stream)
delimiterBytes := []byte{delimiter}
partialBytes := []byte(runtime.LogTagPartial)
fullBytes := []byte(runtime.LogTagFull)
r := bufio.NewReaderSize(rc, bufSize)
for {
lineBytes, isPrefix, err := r.ReadLine()
if err == io.EOF {
logrus.Debugf("Finish redirecting log file %q", path)
return
}
if err != nil {
logrus.WithError(err).Errorf("An error occurred when redirecting log file %q", path)
return
}
tagBytes := fullBytes
if isPrefix {
tagBytes = partialBytes
}
timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano)
data := bytes.Join([][]byte{timestampBytes, streamBytes, tagBytes, lineBytes}, delimiterBytes)
data = append(data, eol)
if _, err := wc.Write(data); err != nil {
logrus.WithError(err).Errorf("Fail to write %q log to log file %q", stream, path)
}
// Continue on write error to drain the input.
var (
stream = []byte(s)
delimiter = []byte{delimiter}
partial = []byte(runtime.LogTagPartial)
full = []byte(runtime.LogTagFull)
buf [][]byte
length int
bufSize = defaultBufSize
)
// Make sure bufSize <= maxLen
if maxLen > 0 && maxLen < bufSize {
bufSize = maxLen
}
r := bufio.NewReaderSize(rc, bufSize)
writeLine := func(tag, line []byte) {
timestamp := time.Now().AppendFormat(nil, timestampFormat)
data := bytes.Join([][]byte{timestamp, stream, tag, line}, delimiter)
data = append(data, eol)
if _, err := w.Write(data); err != nil {
logrus.WithError(err).Errorf("Fail to write %q log to log file %q", s, path)
// Continue on write error to drain the container output.
}
}
for {
var stop bool
newLine, isPrefix, err := r.ReadLine()
if err != nil {
if err == io.EOF {
logrus.Debugf("Getting EOF from stream %q while redirecting to log file %q", s, path)
} else {
logrus.WithError(err).Errorf("An error occurred when redirecting stream %q to log file %q", s, path)
}
if length == 0 {
// No content left to write, break.
break
}
// Stop after writing the content left in buffer.
stop = true
} else {
// Buffer returned by ReadLine will change after
// next read, copy it.
l := make([]byte, len(newLine))
copy(l, newLine)
buf = append(buf, l)
length += len(l)
}
if maxLen > 0 && length > maxLen {
exceedLen := length - maxLen
last := buf[len(buf)-1]
if exceedLen > len(last) {
// exceedLen must <= len(last), or else the buffer
// should have be written in the previous iteration.
panic("exceed length should <= last buffer size")
}
buf[len(buf)-1] = last[:len(last)-exceedLen]
writeLine(partial, bytes.Join(buf, nil))
buf = [][]byte{last[len(last)-exceedLen:]}
length = exceedLen
}
if isPrefix {
continue
}
writeLine(full, bytes.Join(buf, nil))
buf = nil
length = 0
if stop {
break
}
}
logrus.Debugf("Finish redirecting stream %q to log file %q", s, path)
}

View File

@ -31,15 +31,19 @@ import (
)
func TestRedirectLogs(t *testing.T) {
// defaultBufSize is even number
const maxLen = defaultBufSize * 4
for desc, test := range map[string]struct {
input string
stream StreamType
maxLen int
tag []runtime.LogTag
content []string
}{
"stdout log": {
input: "test stdout log 1\ntest stdout log 2",
input: "test stdout log 1\ntest stdout log 2\n",
stream: Stdout,
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
@ -50,8 +54,9 @@ func TestRedirectLogs(t *testing.T) {
},
},
"stderr log": {
input: "test stderr log 1\ntest stderr log 2",
input: "test stderr log 1\ntest stderr log 2\n",
stream: Stderr,
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
@ -61,18 +66,160 @@ func TestRedirectLogs(t *testing.T) {
"test stderr log 2",
},
},
"long log": {
input: strings.Repeat("a", 2*bufSize+10) + "\n",
stream: Stdout,
"log ends without newline": {
input: "test stderr log 1\ntest stderr log 2",
stream: Stderr,
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
},
content: []string{
"test stderr log 1",
"test stderr log 2",
},
},
"log length equal to buffer size": {
input: strings.Repeat("a", defaultBufSize) + "\n" + strings.Repeat("a", defaultBufSize) + "\n",
stream: Stdout,
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", defaultBufSize),
strings.Repeat("a", defaultBufSize),
},
},
"log length longer than buffer size": {
input: strings.Repeat("a", defaultBufSize*2+10) + "\n" + strings.Repeat("a", defaultBufSize*2+20) + "\n",
stream: Stdout,
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", defaultBufSize*2+10),
strings.Repeat("a", defaultBufSize*2+20),
},
},
"log length equal to max length": {
input: strings.Repeat("a", maxLen) + "\n" + strings.Repeat("a", maxLen) + "\n",
stream: Stdout,
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", maxLen),
strings.Repeat("a", maxLen),
},
},
"log length exceed max length by 1": {
input: strings.Repeat("a", maxLen+1) + "\n" + strings.Repeat("a", maxLen+1) + "\n",
stream: Stdout,
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagPartial,
runtime.LogTagFull,
runtime.LogTagPartial,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", maxLen),
"a",
strings.Repeat("a", maxLen),
"a",
},
},
"log length longer than max length": {
input: strings.Repeat("a", maxLen*2) + "\n" + strings.Repeat("a", maxLen*2+1) + "\n",
stream: Stdout,
maxLen: maxLen,
tag: []runtime.LogTag{
runtime.LogTagPartial,
runtime.LogTagFull,
runtime.LogTagPartial,
runtime.LogTagPartial,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", bufSize),
strings.Repeat("a", bufSize),
strings.Repeat("a", maxLen),
strings.Repeat("a", maxLen),
strings.Repeat("a", maxLen),
strings.Repeat("a", maxLen),
"a",
},
},
"max length shorter than buffer size": {
input: strings.Repeat("a", defaultBufSize*3/2+10) + "\n" + strings.Repeat("a", defaultBufSize*3/2+20) + "\n",
stream: Stdout,
maxLen: defaultBufSize / 2,
tag: []runtime.LogTag{
runtime.LogTagPartial,
runtime.LogTagPartial,
runtime.LogTagPartial,
runtime.LogTagFull,
runtime.LogTagPartial,
runtime.LogTagPartial,
runtime.LogTagPartial,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", defaultBufSize*1/2),
strings.Repeat("a", defaultBufSize*1/2),
strings.Repeat("a", defaultBufSize*1/2),
strings.Repeat("a", 10),
strings.Repeat("a", defaultBufSize*1/2),
strings.Repeat("a", defaultBufSize*1/2),
strings.Repeat("a", defaultBufSize*1/2),
strings.Repeat("a", 20),
},
},
"log length longer than max length, and (maxLen % defaultBufSize != 0)": {
input: strings.Repeat("a", defaultBufSize*2+10) + "\n" + strings.Repeat("a", defaultBufSize*2+20) + "\n",
stream: Stdout,
maxLen: defaultBufSize * 3 / 2,
tag: []runtime.LogTag{
runtime.LogTagPartial,
runtime.LogTagFull,
runtime.LogTagPartial,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", defaultBufSize*3/2),
strings.Repeat("a", defaultBufSize*1/2+10),
strings.Repeat("a", defaultBufSize*3/2),
strings.Repeat("a", defaultBufSize*1/2+20),
},
},
"no limit if max length is 0": {
input: strings.Repeat("a", defaultBufSize*10+10) + "\n" + strings.Repeat("a", defaultBufSize*10+20) + "\n",
stream: Stdout,
maxLen: 0,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", defaultBufSize*10+10),
strings.Repeat("a", defaultBufSize*10+20),
},
},
"no limit if max length is negative": {
input: strings.Repeat("a", defaultBufSize*10+10) + "\n" + strings.Repeat("a", defaultBufSize*10+20) + "\n",
stream: Stdout,
maxLen: -1,
tag: []runtime.LogTag{
runtime.LogTagFull,
runtime.LogTagFull,
},
content: []string{
strings.Repeat("a", defaultBufSize*10+10),
strings.Repeat("a", defaultBufSize*10+20),
},
},
} {
@ -80,7 +227,7 @@ func TestRedirectLogs(t *testing.T) {
rc := ioutil.NopCloser(strings.NewReader(test.input))
buf := bytes.NewBuffer(nil)
wc := cioutil.NewNopWriteCloser(buf)
redirectLogs("test-path", rc, wc, test.stream)
redirectLogs("test-path", rc, wc, test.stream, test.maxLen)
output := buf.String()
lines := strings.Split(output, "\n")
lines = lines[:len(lines)-1] // Discard empty string after last \n

View File

@ -78,9 +78,7 @@ func (c *criService) recover(ctx context.Context) error {
return errors.Wrap(err, "failed to list containers")
}
for _, container := range containers {
containerDir := c.getContainerRootDir(container.ID())
volatileContainerDir := c.getVolatileContainerRootDir(container.ID())
cntr, err := loadContainer(ctx, container, containerDir, volatileContainerDir)
cntr, err := c.loadContainer(ctx, container)
if err != nil {
logrus.WithError(err).Errorf("Failed to load container %q", container.ID())
continue
@ -149,8 +147,10 @@ func (c *criService) recover(ctx context.Context) error {
}
// loadContainer loads container from containerd and status checkpoint.
func loadContainer(ctx context.Context, cntr containerd.Container, containerDir, volatileContainerDir string) (containerstore.Container, error) {
func (c *criService) loadContainer(ctx context.Context, cntr containerd.Container) (containerstore.Container, error) {
id := cntr.ID()
containerDir := c.getContainerRootDir(id)
volatileContainerDir := c.getVolatileContainerRootDir(id)
var container containerstore.Container
// Load container metadata.
exts, err := cntr.Extensions(ctx)
@ -176,11 +176,21 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir,
// Load up-to-date status from containerd.
var containerIO *cio.ContainerIO
t, err := cntr.Task(ctx, func(fifos *containerdio.FIFOSet) (containerdio.IO, error) {
stdoutWC, stderrWC, err := createContainerLoggers(meta.LogPath, meta.Config.GetTty())
t, err := cntr.Task(ctx, func(fifos *containerdio.FIFOSet) (_ containerdio.IO, err error) {
stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, meta.Config.GetTty())
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if stdoutWC != nil {
stdoutWC.Close()
}
if stderrWC != nil {
stderrWC.Close()
}
}
}()
containerIO, err = cio.NewContainerIO(id,
cio.WithFIFOs(fifos),
)