From e657e1eb1449161e697fc7bcbc3fb1ec63889739 Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Tue, 30 May 2017 16:36:06 +0000 Subject: [PATCH 1/2] Add container logging support. Signed-off-by: Lantao Liu --- pkg/server/agents/agents.go | 51 +++++++++++++++ pkg/server/agents/logger.go | 116 ++++++++++++++++++++++++++++++++++ pkg/server/container_start.go | 24 ++++--- pkg/server/sandbox_run.go | 25 ++++---- pkg/server/service.go | 4 ++ 5 files changed, 196 insertions(+), 24 deletions(-) create mode 100644 pkg/server/agents/agents.go create mode 100644 pkg/server/agents/logger.go diff --git a/pkg/server/agents/agents.go b/pkg/server/agents/agents.go new file mode 100644 index 000000000..1da854d52 --- /dev/null +++ b/pkg/server/agents/agents.go @@ -0,0 +1,51 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package agents + +import "io" + +// StreamType is the type of the stream, stdout/stderr. +type StreamType string + +const ( + // Stdout stream type. + Stdout StreamType = "stdout" + // Stderr stream type. + Stderr StreamType = "stderr" +) + +// Agent is the a running agent perform a specific task, e.g. redirect and +// decorate log, redirect stream etc. +type Agent interface { + // Start starts the logger. + Start() error +} + +// AgentFactory is the factory to create required agents. +type AgentFactory interface { + // NewSandboxLogger creates a sandbox logging agent. + NewSandboxLogger(io.ReadCloser) Agent + // NewContainerLogger creates a container logging agent. + NewContainerLogger(string, StreamType, io.ReadCloser) Agent +} + +type agentFactory struct{} + +// NewAgentFactory creates a new agent factory. +func NewAgentFactory() AgentFactory { + return &agentFactory{} +} diff --git a/pkg/server/agents/logger.go b/pkg/server/agents/logger.go new file mode 100644 index 000000000..ebcb748d3 --- /dev/null +++ b/pkg/server/agents/logger.go @@ -0,0 +1,116 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package agents + +import ( + "bufio" + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "time" + + "github.com/golang/glog" +) + +const ( + // delimiter used in CRI logging format. + delimiter = ' ' + // eof is end-of-line. + eol = '\n' + // timestampFormat is the timestamp format used in CRI logging format. + timestampFormat = time.RFC3339Nano + // pipeBufSize is the system PIPE_BUF size, on linux it is 4096 bytes. + // POSIX.1 says that write less than PIPE_BUF is atmoic. + pipeBufSize = 4096 + // bufSize is the size of the read buffer. + bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - 2 /*2 delimiter*/ - 1 /*eol*/ +) + +// sandboxLogger is the log agent used for sandbox. +// It discards sandbox all output for now. +type sandboxLogger struct { + rc io.ReadCloser +} + +func (*agentFactory) NewSandboxLogger(rc io.ReadCloser) Agent { + return &sandboxLogger{rc: rc} +} + +func (s *sandboxLogger) Start() error { + go func() { + // Discard the output for now. + io.Copy(ioutil.Discard, s.rc) // nolint: errcheck + s.rc.Close() + }() + return nil +} + +// containerLogger is the log agent used for container. +// It redirect container log into CRI log file, and decorate the log +// line into CRI defined format. +type containerLogger struct { + path string + stream StreamType + rc io.ReadCloser +} + +func (*agentFactory) NewContainerLogger(path string, stream StreamType, rc io.ReadCloser) Agent { + return &containerLogger{ + path: path, + stream: stream, + rc: rc, + } +} + +func (c *containerLogger) Start() error { + glog.V(4).Infof("Start reading log file %q", c.path) + wc, err := os.OpenFile(c.path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640) + if err != nil { + return fmt.Errorf("failed to open log file %q: %v", c.path, err) + } + go c.redirectLogs(wc) + return nil +} + +func (c *containerLogger) redirectLogs(wc io.WriteCloser) { + defer c.rc.Close() + defer wc.Close() + streamBytes := []byte(c.stream) + delimiterBytes := []byte{delimiter} + r := bufio.NewReaderSize(c.rc, bufSize) + for { + // TODO(random-liu): Better define CRI log format, and escape newline in log. + lineBytes, _, err := r.ReadLine() + if err == io.EOF { + glog.V(4).Infof("Finish redirecting log file %q", c.path) + return + } + if err != nil { + glog.Errorf("An error occurred when redirecting log file %q: %v", c.path, err) + return + } + timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano) + data := bytes.Join([][]byte{timestampBytes, streamBytes, lineBytes}, delimiterBytes) + data = append(data, eol) + if _, err := wc.Write(data); err != nil { + glog.Errorf("Fail to write log line %q: %v", data, err) + } + // Continue on write error to drain the input. + } +} diff --git a/pkg/server/container_start.go b/pkg/server/container_start.go index 9a38fb8b7..84aff28d0 100644 --- a/pkg/server/container_start.go +++ b/pkg/server/container_start.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "os" + "path/filepath" "time" "github.com/containerd/containerd/api/services/execution" @@ -35,6 +36,7 @@ import ( runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1" "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" + "github.com/kubernetes-incubator/cri-containerd/pkg/server/agents" ) // StartContainer starts the container. @@ -155,16 +157,18 @@ func (c *criContainerdService) startContainer(ctx context.Context, id string, me w.Close() }(stdinPipe) } - go func(r io.ReadCloser) { - io.Copy(os.Stdout, r) // nolint: errcheck - r.Close() - }(stdoutPipe) - // Only redirect stderr when there is no tty. - if !config.GetTty() { - go func(r io.ReadCloser) { - io.Copy(os.Stderr, r) // nolint: errcheck - r.Close() - }(stderrPipe) + if config.GetLogPath() != "" { + // Only generate container log when log path is specified. + logPath := filepath.Join(sandboxConfig.GetLogDirectory(), config.GetLogPath()) + if err := c.agentFactory.NewContainerLogger(logPath, agents.Stdout, stdoutPipe).Start(); err != nil { + return fmt.Errorf("failed to start container stdout logger: %v", err) + } + // Only redirect stderr when there is no tty. + if !config.GetTty() { + if err := c.agentFactory.NewContainerLogger(logPath, agents.Stderr, stderrPipe).Start(); err != nil { + return fmt.Errorf("failed to start container stderr logger: %v", err) + } + } } // Get rootfs mounts. diff --git a/pkg/server/sandbox_run.go b/pkg/server/sandbox_run.go index 74c7ae17b..ddbc59a17 100644 --- a/pkg/server/sandbox_run.go +++ b/pkg/server/sandbox_run.go @@ -19,8 +19,6 @@ package server import ( "encoding/json" "fmt" - "io" - "io/ioutil" "strings" "time" @@ -116,24 +114,23 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run } }() - // TODO(random-liu): [P1] Moving following logging related logic into util functions. // Discard sandbox container output because we don't care about it. _, stdout, stderr := getStreamingPipes(sandboxRootDir) _, stdoutPipe, stderrPipe, err := c.prepareStreamingPipes(ctx, "", stdout, stderr) if err != nil { return nil, fmt.Errorf("failed to prepare streaming pipes: %v", err) } - for _, f := range []io.ReadCloser{stdoutPipe, stderrPipe} { - defer func(cl io.Closer) { - if retErr != nil { - cl.Close() - } - }(f) - go func(r io.ReadCloser) { - // Discard the output for now. - io.Copy(ioutil.Discard, r) // nolint: errcheck - r.Close() - }(f) + defer func() { + if retErr != nil { + stdoutPipe.Close() + stderrPipe.Close() + } + }() + if err := c.agentFactory.NewSandboxLogger(stdoutPipe).Start(); err != nil { + return nil, fmt.Errorf("failed to start sandbox stdout logger: %v", err) + } + if err := c.agentFactory.NewSandboxLogger(stderrPipe).Start(); err != nil { + return nil, fmt.Errorf("failed to start sandbox stderr logger: %v", err) } // Start sandbox container. diff --git a/pkg/server/service.go b/pkg/server/service.go index 6a9f92d2e..54de8257a 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -38,6 +38,7 @@ import ( "github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store" osinterface "github.com/kubernetes-incubator/cri-containerd/pkg/os" "github.com/kubernetes-incubator/cri-containerd/pkg/registrar" + "github.com/kubernetes-incubator/cri-containerd/pkg/server/agents" ) // CRIContainerdService is the interface implement CRI remote service server. @@ -87,6 +88,8 @@ type criContainerdService struct { healthService healthapi.HealthClient // netPlugin is used to setup and teardown network when run/stop pod sandbox. netPlugin ocicni.CNIPlugin + // agentFactory is the factory to create agent used in the cri containerd service. + agentFactory agents.AgentFactory } // NewCRIContainerdService returns a new instance of CRIContainerdService @@ -111,6 +114,7 @@ func NewCRIContainerdService(conn *grpc.ClientConn, rootDir, networkPluginBinDir rootfsService: rootfsapi.NewRootFSClient(conn), versionService: versionapi.NewVersionClient(conn), healthService: healthapi.NewHealthClient(conn), + agentFactory: agents.NewAgentFactory(), } netPlugin, err := ocicni.InitCNI(networkPluginBinDir, networkPluginConfDir) From 69fcf97583d06420ce5279b7c4d08dd095860384 Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Tue, 30 May 2017 16:36:43 +0000 Subject: [PATCH 2/2] Add unit test Signed-off-by: Lantao Liu --- pkg/server/agents/logger_test.go | 90 +++++++++++++++++++ .../agents/testing/fake_agent_factory.go | 50 +++++++++++ pkg/server/service_test.go | 2 + 3 files changed, 142 insertions(+) create mode 100644 pkg/server/agents/logger_test.go create mode 100644 pkg/server/agents/testing/fake_agent_factory.go diff --git a/pkg/server/agents/logger_test.go b/pkg/server/agents/logger_test.go new file mode 100644 index 000000000..c7a958530 --- /dev/null +++ b/pkg/server/agents/logger_test.go @@ -0,0 +1,90 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package agents + +import ( + "bytes" + "io/ioutil" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// writeCloserBuffer is a writecloser wrapper for bytes.Buffer +// with a nop closer. +type writeCloserBuffer struct { + *bytes.Buffer +} + +// nop close +func (*writeCloserBuffer) Close() error { return nil } + +func TestRedirectLogs(t *testing.T) { + f := NewAgentFactory() + // f.NewContainerLogger( + for desc, test := range map[string]struct { + input string + stream StreamType + content []string + }{ + "stdout log": { + input: "test stdout log 1\ntest stdout log 2", + stream: Stdout, + content: []string{ + "test stdout log 1", + "test stdout log 2", + }, + }, + "stderr log": { + input: "test stderr log 1\ntest stderr log 2", + stream: Stderr, + content: []string{ + "test stderr log 1", + "test stderr log 2", + }, + }, + "long log": { + input: strings.Repeat("a", bufSize+10) + "\n", + stream: Stdout, + content: []string{ + strings.Repeat("a", bufSize), + strings.Repeat("a", 10), + }, + }, + } { + t.Logf("TestCase %q", desc) + rc := ioutil.NopCloser(strings.NewReader(test.input)) + c := f.NewContainerLogger("test-path", test.stream, rc).(*containerLogger) + wc := &writeCloserBuffer{bytes.NewBuffer(nil)} + c.redirectLogs(wc) + output := wc.String() + lines := strings.Split(output, "\n") + lines = lines[:len(lines)-1] // Discard empty string after last \n + assert.Len(t, lines, len(test.content)) + for i := range lines { + fields := strings.SplitN(lines[i], string([]byte{delimiter}), 3) + require.Len(t, fields, 3) + _, err := time.Parse(timestampFormat, fields[0]) + assert.NoError(t, err) + assert.EqualValues(t, test.stream, fields[1]) + assert.Equal(t, test.content[i], fields[2]) + } + } +} diff --git a/pkg/server/agents/testing/fake_agent_factory.go b/pkg/server/agents/testing/fake_agent_factory.go new file mode 100644 index 000000000..ffb6408a1 --- /dev/null +++ b/pkg/server/agents/testing/fake_agent_factory.go @@ -0,0 +1,50 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "io" + + "github.com/kubernetes-incubator/cri-containerd/pkg/server/agents" +) + +// FakeAgent is a fake agent for test. +type FakeAgent struct{} + +// Start always return nil not. +// TODO(random-liu): Inject error and test error. +func (f *FakeAgent) Start() error { + return nil +} + +// FakeAgentFactory is a fake agent factory for test. +type FakeAgentFactory struct{} + +// NewFakeAgentFactory creates fake agent factory. +func NewFakeAgentFactory() agents.AgentFactory { + return &FakeAgentFactory{} +} + +// NewSandboxLogger creates a fake agent as sandbox logger. +func (*FakeAgentFactory) NewSandboxLogger(rc io.ReadCloser) agents.Agent { + return &FakeAgent{} +} + +// NewContainerLogger creates a fake agent as container logger. +func (*FakeAgentFactory) NewContainerLogger(string, agents.StreamType, io.ReadCloser) agents.Agent { + return &FakeAgent{} +} diff --git a/pkg/server/service_test.go b/pkg/server/service_test.go index 8b73f6fd4..5be122796 100644 --- a/pkg/server/service_test.go +++ b/pkg/server/service_test.go @@ -31,6 +31,7 @@ import ( "github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store" ostesting "github.com/kubernetes-incubator/cri-containerd/pkg/os/testing" "github.com/kubernetes-incubator/cri-containerd/pkg/registrar" + agentstesting "github.com/kubernetes-incubator/cri-containerd/pkg/server/agents/testing" servertesting "github.com/kubernetes-incubator/cri-containerd/pkg/server/testing" imagedigest "github.com/opencontainers/go-digest" imagespec "github.com/opencontainers/image-spec/specs-go/v1" @@ -68,6 +69,7 @@ func newTestCRIContainerdService() *criContainerdService { containerService: servertesting.NewFakeExecutionClient(), rootfsService: servertesting.NewFakeRootfsClient(), netPlugin: servertesting.NewFakeCNIPlugin(), + agentFactory: agentstesting.NewFakeAgentFactory(), } }