diff --git a/docs/config.md b/docs/config.md index d5c481ff2..1f5c30e5d 100644 --- a/docs/config.md +++ b/docs/config.md @@ -27,6 +27,11 @@ The explanation and default value of each configuration item are as follows: # enable_tls_streaming enables the TLS streaming support. enable_tls_streaming = false + # max_container_log_line_size is the maximum log line size in bytes for a container. + # Log line longer than the limit will be split into multiple lines. -1 means no + # limit. + max_container_log_line_size = 16384 + # "plugins.cri.containerd" contains config related to containerd [plugins.cri.containerd] diff --git a/integration/container_log_test.go b/integration/container_log_test.go new file mode 100644 index 000000000..561e2a883 --- /dev/null +++ b/integration/container_log_test.go @@ -0,0 +1,113 @@ +/* +Copyright 2018 The containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" +) + +func TestLongContainerLog(t *testing.T) { + testPodLogDir, err := ioutil.TempDir("/tmp", "long-container-log") + require.NoError(t, err) + defer os.RemoveAll(testPodLogDir) + + t.Log("Create a sandbox with log directory") + sbConfig := PodSandboxConfig("sandbox", "long-container-log", + WithPodLogDirectory(testPodLogDir), + ) + sb, err := runtimeService.RunPodSandbox(sbConfig) + require.NoError(t, err) + defer func() { + assert.NoError(t, runtimeService.StopPodSandbox(sb)) + assert.NoError(t, runtimeService.RemovePodSandbox(sb)) + }() + + const ( + testImage = "busybox" + containerName = "test-container" + ) + t.Logf("Pull test image %q", testImage) + img, err := imageService.PullImage(&runtime.ImageSpec{Image: testImage}, nil) + require.NoError(t, err) + defer func() { + assert.NoError(t, imageService.RemoveImage(&runtime.ImageSpec{Image: img})) + }() + + t.Log("Create a container with log path") + config, err := CRIConfig() + require.NoError(t, err) + maxSize := config.MaxContainerLogLineSize + shortLineCmd := fmt.Sprintf("i=0; while [ $i -lt %d ]; do printf %s; i=$((i+1)); done", maxSize-1, "a") + maxLenLineCmd := fmt.Sprintf("i=0; while [ $i -lt %d ]; do printf %s; i=$((i+1)); done", maxSize, "b") + longLineCmd := fmt.Sprintf("i=0; while [ $i -lt %d ]; do printf %s; i=$((i+1)); done", maxSize+1, "c") + cnConfig := ContainerConfig( + containerName, + "busybox", + WithCommand("sh", "-c", + fmt.Sprintf("%s; echo; %s; echo; %s", shortLineCmd, maxLenLineCmd, longLineCmd)), + WithLogPath(containerName), + ) + cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig) + require.NoError(t, err) + + t.Log("Start the container") + require.NoError(t, runtimeService.StartContainer(cn)) + + t.Log("Wait for container to finish running") + require.NoError(t, Eventually(func() (bool, error) { + s, err := runtimeService.ContainerStatus(cn) + if err != nil { + return false, err + } + if s.GetState() == runtime.ContainerState_CONTAINER_EXITED { + return true, nil + } + return false, nil + }, time.Second, 30*time.Second)) + + t.Log("Check container log") + content, err := ioutil.ReadFile(filepath.Join(testPodLogDir, containerName)) + assert.NoError(t, err) + checkContainerLog(t, string(content), []string{ + fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagFull, strings.Repeat("a", maxSize-1)), + fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagFull, strings.Repeat("b", maxSize)), + fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagPartial, strings.Repeat("c", maxSize)), + fmt.Sprintf("%s %s %s", runtime.Stdout, runtime.LogTagFull, "c"), + }) +} + +func checkContainerLog(t *testing.T, log string, messages []string) { + lines := strings.Split(strings.TrimSpace(log), "\n") + require.Len(t, lines, len(messages), "log line number should match") + for i, line := range lines { + parts := strings.SplitN(line, " ", 2) + require.Len(t, parts, 2) + _, err := time.Parse(time.RFC3339Nano, parts[0]) + assert.NoError(t, err, "timestamp should be in RFC3339Nano format") + assert.Equal(t, messages[i], parts[1], "log content should match") + } +} diff --git a/integration/container_update_resources_test.go b/integration/container_update_resources_test.go index 660f48826..36e6b7374 100644 --- a/integration/container_update_resources_test.go +++ b/integration/container_update_resources_test.go @@ -37,7 +37,7 @@ func checkMemoryLimit(t *testing.T, spec *runtimespec.Spec, memLimit int64) { } func TestUpdateContainerResources(t *testing.T) { - t.Logf("Create a sandbox") + t.Log("Create a sandbox") sbConfig := PodSandboxConfig("sandbox", "update-container-resources") sb, err := runtimeService.RunPodSandbox(sbConfig) require.NoError(t, err) @@ -46,7 +46,7 @@ func TestUpdateContainerResources(t *testing.T) { assert.NoError(t, runtimeService.RemovePodSandbox(sb)) }() - t.Logf("Create a container with memory limit") + t.Log("Create a container with memory limit") cnConfig := ContainerConfig( "container", pauseImage, @@ -57,48 +57,48 @@ func TestUpdateContainerResources(t *testing.T) { cn, err := runtimeService.CreateContainer(sb, cnConfig, sbConfig) require.NoError(t, err) - t.Logf("Check memory limit in container OCI spec") + t.Log("Check memory limit in container OCI spec") container, err := containerdClient.LoadContainer(context.Background(), cn) require.NoError(t, err) spec, err := container.Spec(context.Background()) require.NoError(t, err) checkMemoryLimit(t, spec, 2*1024*1024) - t.Logf("Update container memory limit after created") + t.Log("Update container memory limit after created") err = runtimeService.UpdateContainerResources(cn, &runtime.LinuxContainerResources{ MemoryLimitInBytes: 4 * 1024 * 1024, }) require.NoError(t, err) - t.Logf("Check memory limit in container OCI spec") + t.Log("Check memory limit in container OCI spec") spec, err = container.Spec(context.Background()) require.NoError(t, err) checkMemoryLimit(t, spec, 4*1024*1024) - t.Logf("Start the container") + t.Log("Start the container") require.NoError(t, runtimeService.StartContainer(cn)) task, err := container.Task(context.Background(), nil) require.NoError(t, err) - t.Logf("Check memory limit in cgroup") + t.Log("Check memory limit in cgroup") cgroup, err := cgroups.Load(cgroups.V1, cgroups.PidPath(int(task.Pid()))) require.NoError(t, err) stat, err := cgroup.Stat(cgroups.IgnoreNotExist) require.NoError(t, err) assert.Equal(t, uint64(4*1024*1024), stat.Memory.Usage.Limit) - t.Logf("Update container memory limit after started") + t.Log("Update container memory limit after started") err = runtimeService.UpdateContainerResources(cn, &runtime.LinuxContainerResources{ MemoryLimitInBytes: 8 * 1024 * 1024, }) require.NoError(t, err) - t.Logf("Check memory limit in container OCI spec") + t.Log("Check memory limit in container OCI spec") spec, err = container.Spec(context.Background()) require.NoError(t, err) checkMemoryLimit(t, spec, 8*1024*1024) - t.Logf("Check memory limit in cgroup") + t.Log("Check memory limit in cgroup") stat, err = cgroup.Stat(cgroups.IgnoreNotExist) require.NoError(t, err) assert.Equal(t, uint64(8*1024*1024), stat.Memory.Usage.Limit) diff --git a/integration/test_utils.go b/integration/test_utils.go index ebcca5c9f..c9c31f14d 100644 --- a/integration/test_utils.go +++ b/integration/test_utils.go @@ -18,6 +18,7 @@ package integration import ( "context" + "encoding/json" "flag" "fmt" "os/exec" @@ -28,12 +29,15 @@ import ( "github.com/containerd/containerd" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "google.golang.org/grpc" "k8s.io/kubernetes/pkg/kubelet/apis/cri" runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" "k8s.io/kubernetes/pkg/kubelet/remote" + kubeletutil "k8s.io/kubernetes/pkg/kubelet/util" api "github.com/containerd/cri/pkg/api/v1" "github.com/containerd/cri/pkg/client" + criconfig "github.com/containerd/cri/pkg/config" "github.com/containerd/cri/pkg/constants" "github.com/containerd/cri/pkg/util" ) @@ -100,6 +104,7 @@ func ConnectDaemons() error { // Opts sets specific information in pod sandbox config. type PodSandboxOpts func(*runtime.PodSandboxConfig) +// Set host network. func WithHostNetwork(p *runtime.PodSandboxConfig) { if p.Linux == nil { p.Linux = &runtime.LinuxPodSandboxConfig{} @@ -114,6 +119,13 @@ func WithHostNetwork(p *runtime.PodSandboxConfig) { } } +// Add pod log directory. +func WithPodLogDirectory(dir string) PodSandboxOpts { + return func(p *runtime.PodSandboxConfig) { + p.LogDirectory = dir + } +} + // PodSandboxConfig generates a pod sandbox config for test. func PodSandboxConfig(name, ns string, opts ...PodSandboxOpts) *runtime.PodSandboxConfig { config := &runtime.PodSandboxConfig{ @@ -137,52 +149,59 @@ func PodSandboxConfig(name, ns string, opts ...PodSandboxOpts) *runtime.PodSandb type ContainerOpts func(*runtime.ContainerConfig) func WithTestLabels() ContainerOpts { - return func(cf *runtime.ContainerConfig) { - cf.Labels = map[string]string{"key": "value"} + return func(c *runtime.ContainerConfig) { + c.Labels = map[string]string{"key": "value"} } } func WithTestAnnotations() ContainerOpts { - return func(cf *runtime.ContainerConfig) { - cf.Annotations = map[string]string{"a.b.c": "test"} + return func(c *runtime.ContainerConfig) { + c.Annotations = map[string]string{"a.b.c": "test"} } } // Add container resource limits. func WithResources(r *runtime.LinuxContainerResources) ContainerOpts { - return func(cf *runtime.ContainerConfig) { - if cf.Linux == nil { - cf.Linux = &runtime.LinuxContainerConfig{} + return func(c *runtime.ContainerConfig) { + if c.Linux == nil { + c.Linux = &runtime.LinuxContainerConfig{} } - cf.Linux.Resources = r + c.Linux.Resources = r } } // Add container command. -func WithCommand(c string, args ...string) ContainerOpts { - return func(cf *runtime.ContainerConfig) { - cf.Command = []string{c} - cf.Args = args +func WithCommand(cmd string, args ...string) ContainerOpts { + return func(c *runtime.ContainerConfig) { + c.Command = []string{cmd} + c.Args = args } } // Add pid namespace mode. func WithPidNamespace(mode runtime.NamespaceMode) ContainerOpts { - return func(cf *runtime.ContainerConfig) { - if cf.Linux == nil { - cf.Linux = &runtime.LinuxContainerConfig{} + return func(c *runtime.ContainerConfig) { + if c.Linux == nil { + c.Linux = &runtime.LinuxContainerConfig{} } - if cf.Linux.SecurityContext == nil { - cf.Linux.SecurityContext = &runtime.LinuxContainerSecurityContext{} + if c.Linux.SecurityContext == nil { + c.Linux.SecurityContext = &runtime.LinuxContainerSecurityContext{} } - if cf.Linux.SecurityContext.NamespaceOptions == nil { - cf.Linux.SecurityContext.NamespaceOptions = &runtime.NamespaceOption{} + if c.Linux.SecurityContext.NamespaceOptions == nil { + c.Linux.SecurityContext.NamespaceOptions = &runtime.NamespaceOption{} } - cf.Linux.SecurityContext.NamespaceOptions.Pid = mode + c.Linux.SecurityContext.NamespaceOptions.Pid = mode } } +// Add container log path. +func WithLogPath(path string) ContainerOpts { + return func(c *runtime.ContainerConfig) { + c.LogPath = path + } +} + // ContainerConfig creates a container config given a name and image name // and additional container config options func ContainerConfig(name, image string, opts ...ContainerOpts) *runtime.ContainerConfig { @@ -247,3 +266,27 @@ func PidOf(name string) (int, error) { } return strconv.Atoi(output) } + +// CRIConfig gets current cri config from containerd. +func CRIConfig() (*criconfig.Config, error) { + addr, dialer, err := kubeletutil.GetAddressAndDialer(*criEndpoint) + if err != nil { + return nil, errors.Wrap(err, "failed to get dialer") + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithDialer(dialer)) + if err != nil { + return nil, errors.Wrap(err, "failed to connect cri endpoint") + } + client := runtime.NewRuntimeServiceClient(conn) + resp, err := client.Status(ctx, &runtime.StatusRequest{Verbose: true}) + if err != nil { + return nil, errors.Wrap(err, "failed to get status") + } + config := &criconfig.Config{} + if err := json.Unmarshal([]byte(resp.Info["config"]), config); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal config") + } + return config, nil +} diff --git a/pkg/config/config.go b/pkg/config/config.go index d9e417d75..8ec2be834 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -96,6 +96,10 @@ type PluginConfig struct { SystemdCgroup bool `toml:"systemd_cgroup" json:"systemdCgroup"` // EnableTLSStreaming indicates to enable the TLS streaming support. EnableTLSStreaming bool `toml:"enable_tls_streaming" json:"enableTLSStreaming"` + // MaxContainerLogLineSize is the maximum log line size in bytes for a container. + // Log line longer than the limit will be split into multiple lines. Non-positive + // value means no limit. + MaxContainerLogLineSize int `toml:"max_container_log_line_size" json:"maxContainerLogSize"` } // Config contains all configurations for cri server. @@ -129,13 +133,14 @@ func DefaultConfig() PluginConfig { Root: "", }, }, - StreamServerAddress: "", - StreamServerPort: "10010", - EnableSelinux: false, - EnableTLSStreaming: false, - SandboxImage: "k8s.gcr.io/pause:3.1", - StatsCollectPeriod: 10, - SystemdCgroup: false, + StreamServerAddress: "", + StreamServerPort: "10010", + EnableSelinux: false, + EnableTLSStreaming: false, + SandboxImage: "k8s.gcr.io/pause:3.1", + StatsCollectPeriod: 10, + SystemdCgroup: false, + MaxContainerLogLineSize: 16 * 1024, Registry: Registry{ Mirrors: map[string]Mirror{ "docker.io": { diff --git a/pkg/ioutil/write_closer.go b/pkg/ioutil/write_closer.go index d1fa8ed50..987169fe1 100644 --- a/pkg/ioutil/write_closer.go +++ b/pkg/ioutil/write_closer.go @@ -16,7 +16,10 @@ limitations under the License. package ioutil -import "io" +import ( + "io" + "sync" +) // writeCloseInformer wraps passed in write closer with a close channel. // Caller could wait on the close channel for the write closer to be @@ -66,3 +69,34 @@ func (n *nopWriteCloser) Write(p []byte) (int, error) { func (n *nopWriteCloser) Close() error { return nil } + +// serialWriteCloser wraps a write closer and makes sure all writes +// are done in serial. +// Parallel write won't intersect with each other. Use case: +// 1) Pipe: Write content longer than PIPE_BUF. +// See http://man7.org/linux/man-pages/man7/pipe.7.html +// 2) <3.14 Linux Kernel: write is not atomic +// See http://man7.org/linux/man-pages/man2/write.2.html +type serialWriteCloser struct { + mu sync.Mutex + wc io.WriteCloser +} + +// NewSerialWriteCloser creates a SerialWriteCloser from a write closer. +func NewSerialWriteCloser(wc io.WriteCloser) io.WriteCloser { + return &serialWriteCloser{wc: wc} +} + +// Write writes a group of byte arrays in order atomically. +func (s *serialWriteCloser) Write(data []byte) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.wc.Write(data) +} + +// Close closes the write closer. +func (s *serialWriteCloser) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.wc.Close() +} diff --git a/pkg/ioutil/write_closer_test.go b/pkg/ioutil/write_closer_test.go index 67f6b02ed..8b1eb4b20 100644 --- a/pkg/ioutil/write_closer_test.go +++ b/pkg/ioutil/write_closer_test.go @@ -17,9 +17,16 @@ limitations under the License. package ioutil import ( + "io/ioutil" + "os" + "sort" + "strconv" + "strings" + "sync" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestWriteCloseInformer(t *testing.T) { @@ -47,3 +54,55 @@ func TestWriteCloseInformer(t *testing.T) { assert.Fail(t, "write closer not closed") } } + +func TestSerialWriteCloser(t *testing.T) { + const ( + // Test 10 times to make sure it always pass. + testCount = 10 + + goroutine = 10 + dataLen = 100000 + ) + for n := 0; n < testCount; n++ { + testData := make([][]byte, goroutine) + for i := 0; i < goroutine; i++ { + testData[i] = []byte(repeatNumber(i, dataLen) + "\n") + } + + f, err := ioutil.TempFile("/tmp", "serial-write-closer") + require.NoError(t, err) + defer os.RemoveAll(f.Name()) + defer f.Close() + wc := NewSerialWriteCloser(f) + defer wc.Close() + + // Write data in parallel + var wg sync.WaitGroup + wg.Add(goroutine) + for i := 0; i < goroutine; i++ { + go func(id int) { + n, err := wc.Write(testData[id]) + assert.NoError(t, err) + assert.Equal(t, dataLen+1, n) + wg.Done() + }(i) + } + wg.Wait() + wc.Close() + + // Check test result + content, err := ioutil.ReadFile(f.Name()) + require.NoError(t, err) + resultData := strings.Split(strings.TrimSpace(string(content)), "\n") + require.Len(t, resultData, goroutine) + sort.Strings(resultData) + for i := 0; i < goroutine; i++ { + expected := repeatNumber(i, dataLen) + assert.Equal(t, expected, resultData[i]) + } + } +} + +func repeatNumber(num, count int) string { + return strings.Repeat(strconv.Itoa(num), count) +} diff --git a/pkg/server/container_log_reopen.go b/pkg/server/container_log_reopen.go index 144cc2cc5..515cc20b9 100644 --- a/pkg/server/container_log_reopen.go +++ b/pkg/server/container_log_reopen.go @@ -36,7 +36,7 @@ func (c *criService) ReopenContainerLog(ctx context.Context, r *runtime.ReopenCo } // Create new container logger and replace the existing ones. - stdoutWC, stderrWC, err := createContainerLoggers(container.LogPath, container.Config.GetTty()) + stdoutWC, stderrWC, err := c.createContainerLoggers(container.LogPath, container.Config.GetTty()) if err != nil { return nil, err } diff --git a/pkg/server/container_start.go b/pkg/server/container_start.go index 26e3d216d..ed4b8b43f 100644 --- a/pkg/server/container_start.go +++ b/pkg/server/container_start.go @@ -18,6 +18,7 @@ package server import ( "io" + "os" "time" "github.com/containerd/containerd" @@ -29,6 +30,7 @@ import ( runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" ctrdutil "github.com/containerd/cri/pkg/containerd/util" + cioutil "github.com/containerd/cri/pkg/ioutil" cio "github.com/containerd/cri/pkg/server/io" containerstore "github.com/containerd/cri/pkg/store/container" sandboxstore "github.com/containerd/cri/pkg/store/sandbox" @@ -97,20 +99,10 @@ func (c *criService) startContainer(ctx context.Context, } ioCreation := func(id string) (_ containerdio.IO, err error) { - stdoutWC, stderrWC, err := createContainerLoggers(meta.LogPath, config.GetTty()) + stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, config.GetTty()) if err != nil { return nil, errors.Wrap(err, "failed to create container loggers") } - defer func() { - if err != nil { - if stdoutWC != nil { - stdoutWC.Close() - } - if stderrWC != nil { - stderrWC.Close() - } - } - }() cntr.IO.AddOutput("log", stdoutWC, stderrWC) cntr.IO.Pipe() return cntr.IO, nil @@ -142,24 +134,36 @@ func (c *criService) startContainer(ctx context.Context, return nil } -// Create container loggers and return write closer for stdout and stderr. -func createContainerLoggers(logPath string, tty bool) (stdout io.WriteCloser, stderr io.WriteCloser, err error) { +// createContainerLoggers creates container loggers and return write closer for stdout and stderr. +func (c *criService) createContainerLoggers(logPath string, tty bool) (stdout io.WriteCloser, stderr io.WriteCloser, err error) { if logPath != "" { // Only generate container log when log path is specified. - if stdout, err = cio.NewCRILogger(logPath, cio.Stdout); err != nil { - return nil, nil, errors.Wrap(err, "failed to start container stdout logger") + f, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to create and open log file") } defer func() { if err != nil { - stdout.Close() + f.Close() } }() + var stdoutCh, stderrCh <-chan struct{} + wc := cioutil.NewSerialWriteCloser(f) + stdout, stdoutCh = cio.NewCRILogger(logPath, wc, cio.Stdout, c.config.MaxContainerLogLineSize) // Only redirect stderr when there is no tty. if !tty { - if stderr, err = cio.NewCRILogger(logPath, cio.Stderr); err != nil { - return nil, nil, errors.Wrap(err, "failed to start container stderr logger") - } + stderr, stderrCh = cio.NewCRILogger(logPath, wc, cio.Stderr, c.config.MaxContainerLogLineSize) } + go func() { + if stdoutCh != nil { + <-stdoutCh + } + if stderrCh != nil { + <-stderrCh + } + logrus.Debugf("Finish redirecting log file %q, closing it", logPath) + f.Close() + }() } else { stdout = cio.NewDiscardLogger() stderr = cio.NewDiscardLogger() diff --git a/pkg/server/io/logger.go b/pkg/server/io/logger.go index 3d7816bc4..9d2af2769 100644 --- a/pkg/server/io/logger.go +++ b/pkg/server/io/logger.go @@ -21,10 +21,8 @@ import ( "bytes" "io" "io/ioutil" - "os" "time" - "github.com/pkg/errors" "github.com/sirupsen/logrus" runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" @@ -38,11 +36,8 @@ const ( eol = '\n' // timestampFormat is the timestamp format used in CRI logging format. timestampFormat = time.RFC3339Nano - // pipeBufSize is the system PIPE_BUF size, on linux it is 4096 bytes. - // POSIX.1 says that write less than PIPE_BUF is atmoic. - pipeBufSize = 4096 - // bufSize is the size of the read buffer. - bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - len(runtime.LogTagPartial) - 3 /*3 delimiter*/ - 1 /*eol*/ + // defaultBufSize is the default size of the read buffer in bytes. + defaultBufSize = 4096 ) // NewDiscardLogger creates logger which discards all the input. @@ -51,46 +46,91 @@ func NewDiscardLogger() io.WriteCloser { } // NewCRILogger returns a write closer which redirect container log into -// log file, and decorate the log line into CRI defined format. -func NewCRILogger(path string, stream StreamType) (io.WriteCloser, error) { - logrus.Debugf("Start writing log file %q", path) +// log file, and decorate the log line into CRI defined format. It also +// returns a channel which indicates whether the logger is stopped. +// maxLen is the max length limit of a line. A line longer than the +// limit will be cut into multiple lines. +func NewCRILogger(path string, w io.Writer, stream StreamType, maxLen int) (io.WriteCloser, <-chan struct{}) { + logrus.Debugf("Start writing stream %q to log file %q", stream, path) prc, pwc := io.Pipe() - f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640) - if err != nil { - return nil, errors.Wrap(err, "failed to open log file") - } - go redirectLogs(path, prc, f, stream) - return pwc, nil + stop := make(chan struct{}) + go func() { + redirectLogs(path, prc, w, stream, maxLen) + close(stop) + }() + return pwc, stop } -func redirectLogs(path string, rc io.ReadCloser, wc io.WriteCloser, stream StreamType) { +func redirectLogs(path string, rc io.ReadCloser, w io.Writer, s StreamType, maxLen int) { defer rc.Close() - defer wc.Close() - streamBytes := []byte(stream) - delimiterBytes := []byte{delimiter} - partialBytes := []byte(runtime.LogTagPartial) - fullBytes := []byte(runtime.LogTagFull) - r := bufio.NewReaderSize(rc, bufSize) - for { - lineBytes, isPrefix, err := r.ReadLine() - if err == io.EOF { - logrus.Debugf("Finish redirecting log file %q", path) - return - } - if err != nil { - logrus.WithError(err).Errorf("An error occurred when redirecting log file %q", path) - return - } - tagBytes := fullBytes - if isPrefix { - tagBytes = partialBytes - } - timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano) - data := bytes.Join([][]byte{timestampBytes, streamBytes, tagBytes, lineBytes}, delimiterBytes) - data = append(data, eol) - if _, err := wc.Write(data); err != nil { - logrus.WithError(err).Errorf("Fail to write %q log to log file %q", stream, path) - } - // Continue on write error to drain the input. + var ( + stream = []byte(s) + delimiter = []byte{delimiter} + partial = []byte(runtime.LogTagPartial) + full = []byte(runtime.LogTagFull) + buf [][]byte + length int + bufSize = defaultBufSize + ) + // Make sure bufSize <= maxLen + if maxLen > 0 && maxLen < bufSize { + bufSize = maxLen } + r := bufio.NewReaderSize(rc, bufSize) + writeLine := func(tag, line []byte) { + timestamp := time.Now().AppendFormat(nil, timestampFormat) + data := bytes.Join([][]byte{timestamp, stream, tag, line}, delimiter) + data = append(data, eol) + if _, err := w.Write(data); err != nil { + logrus.WithError(err).Errorf("Fail to write %q log to log file %q", s, path) + // Continue on write error to drain the container output. + } + } + for { + var stop bool + newLine, isPrefix, err := r.ReadLine() + if err != nil { + if err == io.EOF { + logrus.Debugf("Getting EOF from stream %q while redirecting to log file %q", s, path) + } else { + logrus.WithError(err).Errorf("An error occurred when redirecting stream %q to log file %q", s, path) + } + if length == 0 { + // No content left to write, break. + break + } + // Stop after writing the content left in buffer. + stop = true + } else { + // Buffer returned by ReadLine will change after + // next read, copy it. + l := make([]byte, len(newLine)) + copy(l, newLine) + buf = append(buf, l) + length += len(l) + } + if maxLen > 0 && length > maxLen { + exceedLen := length - maxLen + last := buf[len(buf)-1] + if exceedLen > len(last) { + // exceedLen must <= len(last), or else the buffer + // should have be written in the previous iteration. + panic("exceed length should <= last buffer size") + } + buf[len(buf)-1] = last[:len(last)-exceedLen] + writeLine(partial, bytes.Join(buf, nil)) + buf = [][]byte{last[len(last)-exceedLen:]} + length = exceedLen + } + if isPrefix { + continue + } + writeLine(full, bytes.Join(buf, nil)) + buf = nil + length = 0 + if stop { + break + } + } + logrus.Debugf("Finish redirecting stream %q to log file %q", s, path) } diff --git a/pkg/server/io/logger_test.go b/pkg/server/io/logger_test.go index 54ed8d3ec..45d45cc47 100644 --- a/pkg/server/io/logger_test.go +++ b/pkg/server/io/logger_test.go @@ -31,15 +31,19 @@ import ( ) func TestRedirectLogs(t *testing.T) { + // defaultBufSize is even number + const maxLen = defaultBufSize * 4 for desc, test := range map[string]struct { input string stream StreamType + maxLen int tag []runtime.LogTag content []string }{ "stdout log": { - input: "test stdout log 1\ntest stdout log 2", + input: "test stdout log 1\ntest stdout log 2\n", stream: Stdout, + maxLen: maxLen, tag: []runtime.LogTag{ runtime.LogTagFull, runtime.LogTagFull, @@ -50,8 +54,9 @@ func TestRedirectLogs(t *testing.T) { }, }, "stderr log": { - input: "test stderr log 1\ntest stderr log 2", + input: "test stderr log 1\ntest stderr log 2\n", stream: Stderr, + maxLen: maxLen, tag: []runtime.LogTag{ runtime.LogTagFull, runtime.LogTagFull, @@ -61,18 +66,160 @@ func TestRedirectLogs(t *testing.T) { "test stderr log 2", }, }, - "long log": { - input: strings.Repeat("a", 2*bufSize+10) + "\n", - stream: Stdout, + "log ends without newline": { + input: "test stderr log 1\ntest stderr log 2", + stream: Stderr, + maxLen: maxLen, tag: []runtime.LogTag{ + runtime.LogTagFull, + runtime.LogTagFull, + }, + content: []string{ + "test stderr log 1", + "test stderr log 2", + }, + }, + "log length equal to buffer size": { + input: strings.Repeat("a", defaultBufSize) + "\n" + strings.Repeat("a", defaultBufSize) + "\n", + stream: Stdout, + maxLen: maxLen, + tag: []runtime.LogTag{ + runtime.LogTagFull, + runtime.LogTagFull, + }, + content: []string{ + strings.Repeat("a", defaultBufSize), + strings.Repeat("a", defaultBufSize), + }, + }, + "log length longer than buffer size": { + input: strings.Repeat("a", defaultBufSize*2+10) + "\n" + strings.Repeat("a", defaultBufSize*2+20) + "\n", + stream: Stdout, + maxLen: maxLen, + tag: []runtime.LogTag{ + runtime.LogTagFull, + runtime.LogTagFull, + }, + content: []string{ + strings.Repeat("a", defaultBufSize*2+10), + strings.Repeat("a", defaultBufSize*2+20), + }, + }, + "log length equal to max length": { + input: strings.Repeat("a", maxLen) + "\n" + strings.Repeat("a", maxLen) + "\n", + stream: Stdout, + maxLen: maxLen, + tag: []runtime.LogTag{ + runtime.LogTagFull, + runtime.LogTagFull, + }, + content: []string{ + strings.Repeat("a", maxLen), + strings.Repeat("a", maxLen), + }, + }, + "log length exceed max length by 1": { + input: strings.Repeat("a", maxLen+1) + "\n" + strings.Repeat("a", maxLen+1) + "\n", + stream: Stdout, + maxLen: maxLen, + tag: []runtime.LogTag{ + runtime.LogTagPartial, + runtime.LogTagFull, + runtime.LogTagPartial, + runtime.LogTagFull, + }, + content: []string{ + strings.Repeat("a", maxLen), + "a", + strings.Repeat("a", maxLen), + "a", + }, + }, + "log length longer than max length": { + input: strings.Repeat("a", maxLen*2) + "\n" + strings.Repeat("a", maxLen*2+1) + "\n", + stream: Stdout, + maxLen: maxLen, + tag: []runtime.LogTag{ + runtime.LogTagPartial, + runtime.LogTagFull, runtime.LogTagPartial, runtime.LogTagPartial, runtime.LogTagFull, }, content: []string{ - strings.Repeat("a", bufSize), - strings.Repeat("a", bufSize), + strings.Repeat("a", maxLen), + strings.Repeat("a", maxLen), + strings.Repeat("a", maxLen), + strings.Repeat("a", maxLen), + "a", + }, + }, + "max length shorter than buffer size": { + input: strings.Repeat("a", defaultBufSize*3/2+10) + "\n" + strings.Repeat("a", defaultBufSize*3/2+20) + "\n", + stream: Stdout, + maxLen: defaultBufSize / 2, + tag: []runtime.LogTag{ + runtime.LogTagPartial, + runtime.LogTagPartial, + runtime.LogTagPartial, + runtime.LogTagFull, + runtime.LogTagPartial, + runtime.LogTagPartial, + runtime.LogTagPartial, + runtime.LogTagFull, + }, + content: []string{ + strings.Repeat("a", defaultBufSize*1/2), + strings.Repeat("a", defaultBufSize*1/2), + strings.Repeat("a", defaultBufSize*1/2), strings.Repeat("a", 10), + strings.Repeat("a", defaultBufSize*1/2), + strings.Repeat("a", defaultBufSize*1/2), + strings.Repeat("a", defaultBufSize*1/2), + strings.Repeat("a", 20), + }, + }, + "log length longer than max length, and (maxLen % defaultBufSize != 0)": { + input: strings.Repeat("a", defaultBufSize*2+10) + "\n" + strings.Repeat("a", defaultBufSize*2+20) + "\n", + stream: Stdout, + maxLen: defaultBufSize * 3 / 2, + tag: []runtime.LogTag{ + runtime.LogTagPartial, + runtime.LogTagFull, + runtime.LogTagPartial, + runtime.LogTagFull, + }, + content: []string{ + strings.Repeat("a", defaultBufSize*3/2), + strings.Repeat("a", defaultBufSize*1/2+10), + strings.Repeat("a", defaultBufSize*3/2), + strings.Repeat("a", defaultBufSize*1/2+20), + }, + }, + "no limit if max length is 0": { + input: strings.Repeat("a", defaultBufSize*10+10) + "\n" + strings.Repeat("a", defaultBufSize*10+20) + "\n", + stream: Stdout, + maxLen: 0, + tag: []runtime.LogTag{ + runtime.LogTagFull, + runtime.LogTagFull, + }, + content: []string{ + strings.Repeat("a", defaultBufSize*10+10), + strings.Repeat("a", defaultBufSize*10+20), + }, + }, + "no limit if max length is negative": { + input: strings.Repeat("a", defaultBufSize*10+10) + "\n" + strings.Repeat("a", defaultBufSize*10+20) + "\n", + stream: Stdout, + maxLen: -1, + tag: []runtime.LogTag{ + runtime.LogTagFull, + runtime.LogTagFull, + }, + content: []string{ + strings.Repeat("a", defaultBufSize*10+10), + strings.Repeat("a", defaultBufSize*10+20), }, }, } { @@ -80,7 +227,7 @@ func TestRedirectLogs(t *testing.T) { rc := ioutil.NopCloser(strings.NewReader(test.input)) buf := bytes.NewBuffer(nil) wc := cioutil.NewNopWriteCloser(buf) - redirectLogs("test-path", rc, wc, test.stream) + redirectLogs("test-path", rc, wc, test.stream, test.maxLen) output := buf.String() lines := strings.Split(output, "\n") lines = lines[:len(lines)-1] // Discard empty string after last \n diff --git a/pkg/server/restart.go b/pkg/server/restart.go index d37f8a287..4b11b7d77 100644 --- a/pkg/server/restart.go +++ b/pkg/server/restart.go @@ -78,9 +78,7 @@ func (c *criService) recover(ctx context.Context) error { return errors.Wrap(err, "failed to list containers") } for _, container := range containers { - containerDir := c.getContainerRootDir(container.ID()) - volatileContainerDir := c.getVolatileContainerRootDir(container.ID()) - cntr, err := loadContainer(ctx, container, containerDir, volatileContainerDir) + cntr, err := c.loadContainer(ctx, container) if err != nil { logrus.WithError(err).Errorf("Failed to load container %q", container.ID()) continue @@ -149,8 +147,10 @@ func (c *criService) recover(ctx context.Context) error { } // loadContainer loads container from containerd and status checkpoint. -func loadContainer(ctx context.Context, cntr containerd.Container, containerDir, volatileContainerDir string) (containerstore.Container, error) { +func (c *criService) loadContainer(ctx context.Context, cntr containerd.Container) (containerstore.Container, error) { id := cntr.ID() + containerDir := c.getContainerRootDir(id) + volatileContainerDir := c.getVolatileContainerRootDir(id) var container containerstore.Container // Load container metadata. exts, err := cntr.Extensions(ctx) @@ -176,11 +176,21 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir, // Load up-to-date status from containerd. var containerIO *cio.ContainerIO - t, err := cntr.Task(ctx, func(fifos *containerdio.FIFOSet) (containerdio.IO, error) { - stdoutWC, stderrWC, err := createContainerLoggers(meta.LogPath, meta.Config.GetTty()) + t, err := cntr.Task(ctx, func(fifos *containerdio.FIFOSet) (_ containerdio.IO, err error) { + stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, meta.Config.GetTty()) if err != nil { return nil, err } + defer func() { + if err != nil { + if stdoutWC != nil { + stdoutWC.Close() + } + if stderrWC != nil { + stderrWC.Close() + } + } + }() containerIO, err = cio.NewContainerIO(id, cio.WithFIFOs(fifos), )