From e657e1eb1449161e697fc7bcbc3fb1ec63889739 Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Tue, 30 May 2017 16:36:06 +0000 Subject: [PATCH] Add container logging support. Signed-off-by: Lantao Liu --- pkg/server/agents/agents.go | 51 +++++++++++++++ pkg/server/agents/logger.go | 116 ++++++++++++++++++++++++++++++++++ pkg/server/container_start.go | 24 ++++--- pkg/server/sandbox_run.go | 25 ++++---- pkg/server/service.go | 4 ++ 5 files changed, 196 insertions(+), 24 deletions(-) create mode 100644 pkg/server/agents/agents.go create mode 100644 pkg/server/agents/logger.go diff --git a/pkg/server/agents/agents.go b/pkg/server/agents/agents.go new file mode 100644 index 000000000..1da854d52 --- /dev/null +++ b/pkg/server/agents/agents.go @@ -0,0 +1,51 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package agents + +import "io" + +// StreamType is the type of the stream, stdout/stderr. +type StreamType string + +const ( + // Stdout stream type. + Stdout StreamType = "stdout" + // Stderr stream type. + Stderr StreamType = "stderr" +) + +// Agent is the a running agent perform a specific task, e.g. redirect and +// decorate log, redirect stream etc. +type Agent interface { + // Start starts the logger. + Start() error +} + +// AgentFactory is the factory to create required agents. +type AgentFactory interface { + // NewSandboxLogger creates a sandbox logging agent. + NewSandboxLogger(io.ReadCloser) Agent + // NewContainerLogger creates a container logging agent. + NewContainerLogger(string, StreamType, io.ReadCloser) Agent +} + +type agentFactory struct{} + +// NewAgentFactory creates a new agent factory. +func NewAgentFactory() AgentFactory { + return &agentFactory{} +} diff --git a/pkg/server/agents/logger.go b/pkg/server/agents/logger.go new file mode 100644 index 000000000..ebcb748d3 --- /dev/null +++ b/pkg/server/agents/logger.go @@ -0,0 +1,116 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package agents + +import ( + "bufio" + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "time" + + "github.com/golang/glog" +) + +const ( + // delimiter used in CRI logging format. + delimiter = ' ' + // eof is end-of-line. + eol = '\n' + // timestampFormat is the timestamp format used in CRI logging format. + timestampFormat = time.RFC3339Nano + // pipeBufSize is the system PIPE_BUF size, on linux it is 4096 bytes. + // POSIX.1 says that write less than PIPE_BUF is atmoic. + pipeBufSize = 4096 + // bufSize is the size of the read buffer. + bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - 2 /*2 delimiter*/ - 1 /*eol*/ +) + +// sandboxLogger is the log agent used for sandbox. +// It discards sandbox all output for now. +type sandboxLogger struct { + rc io.ReadCloser +} + +func (*agentFactory) NewSandboxLogger(rc io.ReadCloser) Agent { + return &sandboxLogger{rc: rc} +} + +func (s *sandboxLogger) Start() error { + go func() { + // Discard the output for now. + io.Copy(ioutil.Discard, s.rc) // nolint: errcheck + s.rc.Close() + }() + return nil +} + +// containerLogger is the log agent used for container. +// It redirect container log into CRI log file, and decorate the log +// line into CRI defined format. +type containerLogger struct { + path string + stream StreamType + rc io.ReadCloser +} + +func (*agentFactory) NewContainerLogger(path string, stream StreamType, rc io.ReadCloser) Agent { + return &containerLogger{ + path: path, + stream: stream, + rc: rc, + } +} + +func (c *containerLogger) Start() error { + glog.V(4).Infof("Start reading log file %q", c.path) + wc, err := os.OpenFile(c.path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640) + if err != nil { + return fmt.Errorf("failed to open log file %q: %v", c.path, err) + } + go c.redirectLogs(wc) + return nil +} + +func (c *containerLogger) redirectLogs(wc io.WriteCloser) { + defer c.rc.Close() + defer wc.Close() + streamBytes := []byte(c.stream) + delimiterBytes := []byte{delimiter} + r := bufio.NewReaderSize(c.rc, bufSize) + for { + // TODO(random-liu): Better define CRI log format, and escape newline in log. + lineBytes, _, err := r.ReadLine() + if err == io.EOF { + glog.V(4).Infof("Finish redirecting log file %q", c.path) + return + } + if err != nil { + glog.Errorf("An error occurred when redirecting log file %q: %v", c.path, err) + return + } + timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano) + data := bytes.Join([][]byte{timestampBytes, streamBytes, lineBytes}, delimiterBytes) + data = append(data, eol) + if _, err := wc.Write(data); err != nil { + glog.Errorf("Fail to write log line %q: %v", data, err) + } + // Continue on write error to drain the input. + } +} diff --git a/pkg/server/container_start.go b/pkg/server/container_start.go index 9a38fb8b7..84aff28d0 100644 --- a/pkg/server/container_start.go +++ b/pkg/server/container_start.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "os" + "path/filepath" "time" "github.com/containerd/containerd/api/services/execution" @@ -35,6 +36,7 @@ import ( runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1" "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" + "github.com/kubernetes-incubator/cri-containerd/pkg/server/agents" ) // StartContainer starts the container. @@ -155,16 +157,18 @@ func (c *criContainerdService) startContainer(ctx context.Context, id string, me w.Close() }(stdinPipe) } - go func(r io.ReadCloser) { - io.Copy(os.Stdout, r) // nolint: errcheck - r.Close() - }(stdoutPipe) - // Only redirect stderr when there is no tty. - if !config.GetTty() { - go func(r io.ReadCloser) { - io.Copy(os.Stderr, r) // nolint: errcheck - r.Close() - }(stderrPipe) + if config.GetLogPath() != "" { + // Only generate container log when log path is specified. + logPath := filepath.Join(sandboxConfig.GetLogDirectory(), config.GetLogPath()) + if err := c.agentFactory.NewContainerLogger(logPath, agents.Stdout, stdoutPipe).Start(); err != nil { + return fmt.Errorf("failed to start container stdout logger: %v", err) + } + // Only redirect stderr when there is no tty. + if !config.GetTty() { + if err := c.agentFactory.NewContainerLogger(logPath, agents.Stderr, stderrPipe).Start(); err != nil { + return fmt.Errorf("failed to start container stderr logger: %v", err) + } + } } // Get rootfs mounts. diff --git a/pkg/server/sandbox_run.go b/pkg/server/sandbox_run.go index 74c7ae17b..ddbc59a17 100644 --- a/pkg/server/sandbox_run.go +++ b/pkg/server/sandbox_run.go @@ -19,8 +19,6 @@ package server import ( "encoding/json" "fmt" - "io" - "io/ioutil" "strings" "time" @@ -116,24 +114,23 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run } }() - // TODO(random-liu): [P1] Moving following logging related logic into util functions. // Discard sandbox container output because we don't care about it. _, stdout, stderr := getStreamingPipes(sandboxRootDir) _, stdoutPipe, stderrPipe, err := c.prepareStreamingPipes(ctx, "", stdout, stderr) if err != nil { return nil, fmt.Errorf("failed to prepare streaming pipes: %v", err) } - for _, f := range []io.ReadCloser{stdoutPipe, stderrPipe} { - defer func(cl io.Closer) { - if retErr != nil { - cl.Close() - } - }(f) - go func(r io.ReadCloser) { - // Discard the output for now. - io.Copy(ioutil.Discard, r) // nolint: errcheck - r.Close() - }(f) + defer func() { + if retErr != nil { + stdoutPipe.Close() + stderrPipe.Close() + } + }() + if err := c.agentFactory.NewSandboxLogger(stdoutPipe).Start(); err != nil { + return nil, fmt.Errorf("failed to start sandbox stdout logger: %v", err) + } + if err := c.agentFactory.NewSandboxLogger(stderrPipe).Start(); err != nil { + return nil, fmt.Errorf("failed to start sandbox stderr logger: %v", err) } // Start sandbox container. diff --git a/pkg/server/service.go b/pkg/server/service.go index 6a9f92d2e..54de8257a 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -38,6 +38,7 @@ import ( "github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store" osinterface "github.com/kubernetes-incubator/cri-containerd/pkg/os" "github.com/kubernetes-incubator/cri-containerd/pkg/registrar" + "github.com/kubernetes-incubator/cri-containerd/pkg/server/agents" ) // CRIContainerdService is the interface implement CRI remote service server. @@ -87,6 +88,8 @@ type criContainerdService struct { healthService healthapi.HealthClient // netPlugin is used to setup and teardown network when run/stop pod sandbox. netPlugin ocicni.CNIPlugin + // agentFactory is the factory to create agent used in the cri containerd service. + agentFactory agents.AgentFactory } // NewCRIContainerdService returns a new instance of CRIContainerdService @@ -111,6 +114,7 @@ func NewCRIContainerdService(conn *grpc.ClientConn, rootDir, networkPluginBinDir rootfsService: rootfsapi.NewRootFSClient(conn), versionService: versionapi.NewVersionClient(conn), healthService: healthapi.NewHealthClient(conn), + agentFactory: agents.NewAgentFactory(), } netPlugin, err := ocicni.InitCNI(networkPluginBinDir, networkPluginConfDir)