Add container logging support.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu 2017-05-30 16:36:06 +00:00
parent 264619a858
commit e657e1eb14
5 changed files with 196 additions and 24 deletions

View File

@ -0,0 +1,51 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package agents
import "io"
// StreamType is the type of the stream, stdout/stderr.
type StreamType string
const (
// Stdout stream type.
Stdout StreamType = "stdout"
// Stderr stream type.
Stderr StreamType = "stderr"
)
// Agent is the a running agent perform a specific task, e.g. redirect and
// decorate log, redirect stream etc.
type Agent interface {
// Start starts the logger.
Start() error
}
// AgentFactory is the factory to create required agents.
type AgentFactory interface {
// NewSandboxLogger creates a sandbox logging agent.
NewSandboxLogger(io.ReadCloser) Agent
// NewContainerLogger creates a container logging agent.
NewContainerLogger(string, StreamType, io.ReadCloser) Agent
}
type agentFactory struct{}
// NewAgentFactory creates a new agent factory.
func NewAgentFactory() AgentFactory {
return &agentFactory{}
}

116
pkg/server/agents/logger.go Normal file
View File

@ -0,0 +1,116 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package agents
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"time"
"github.com/golang/glog"
)
const (
// delimiter used in CRI logging format.
delimiter = ' '
// eof is end-of-line.
eol = '\n'
// timestampFormat is the timestamp format used in CRI logging format.
timestampFormat = time.RFC3339Nano
// pipeBufSize is the system PIPE_BUF size, on linux it is 4096 bytes.
// POSIX.1 says that write less than PIPE_BUF is atmoic.
pipeBufSize = 4096
// bufSize is the size of the read buffer.
bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - 2 /*2 delimiter*/ - 1 /*eol*/
)
// sandboxLogger is the log agent used for sandbox.
// It discards sandbox all output for now.
type sandboxLogger struct {
rc io.ReadCloser
}
func (*agentFactory) NewSandboxLogger(rc io.ReadCloser) Agent {
return &sandboxLogger{rc: rc}
}
func (s *sandboxLogger) Start() error {
go func() {
// Discard the output for now.
io.Copy(ioutil.Discard, s.rc) // nolint: errcheck
s.rc.Close()
}()
return nil
}
// containerLogger is the log agent used for container.
// It redirect container log into CRI log file, and decorate the log
// line into CRI defined format.
type containerLogger struct {
path string
stream StreamType
rc io.ReadCloser
}
func (*agentFactory) NewContainerLogger(path string, stream StreamType, rc io.ReadCloser) Agent {
return &containerLogger{
path: path,
stream: stream,
rc: rc,
}
}
func (c *containerLogger) Start() error {
glog.V(4).Infof("Start reading log file %q", c.path)
wc, err := os.OpenFile(c.path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
if err != nil {
return fmt.Errorf("failed to open log file %q: %v", c.path, err)
}
go c.redirectLogs(wc)
return nil
}
func (c *containerLogger) redirectLogs(wc io.WriteCloser) {
defer c.rc.Close()
defer wc.Close()
streamBytes := []byte(c.stream)
delimiterBytes := []byte{delimiter}
r := bufio.NewReaderSize(c.rc, bufSize)
for {
// TODO(random-liu): Better define CRI log format, and escape newline in log.
lineBytes, _, err := r.ReadLine()
if err == io.EOF {
glog.V(4).Infof("Finish redirecting log file %q", c.path)
return
}
if err != nil {
glog.Errorf("An error occurred when redirecting log file %q: %v", c.path, err)
return
}
timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano)
data := bytes.Join([][]byte{timestampBytes, streamBytes, lineBytes}, delimiterBytes)
data = append(data, eol)
if _, err := wc.Write(data); err != nil {
glog.Errorf("Fail to write log line %q: %v", data, err)
}
// Continue on write error to drain the input.
}
}

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"path/filepath"
"time" "time"
"github.com/containerd/containerd/api/services/execution" "github.com/containerd/containerd/api/services/execution"
@ -35,6 +36,7 @@ import (
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1" runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata" "github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
"github.com/kubernetes-incubator/cri-containerd/pkg/server/agents"
) )
// StartContainer starts the container. // StartContainer starts the container.
@ -155,16 +157,18 @@ func (c *criContainerdService) startContainer(ctx context.Context, id string, me
w.Close() w.Close()
}(stdinPipe) }(stdinPipe)
} }
go func(r io.ReadCloser) { if config.GetLogPath() != "" {
io.Copy(os.Stdout, r) // nolint: errcheck // Only generate container log when log path is specified.
r.Close() logPath := filepath.Join(sandboxConfig.GetLogDirectory(), config.GetLogPath())
}(stdoutPipe) if err := c.agentFactory.NewContainerLogger(logPath, agents.Stdout, stdoutPipe).Start(); err != nil {
// Only redirect stderr when there is no tty. return fmt.Errorf("failed to start container stdout logger: %v", err)
if !config.GetTty() { }
go func(r io.ReadCloser) { // Only redirect stderr when there is no tty.
io.Copy(os.Stderr, r) // nolint: errcheck if !config.GetTty() {
r.Close() if err := c.agentFactory.NewContainerLogger(logPath, agents.Stderr, stderrPipe).Start(); err != nil {
}(stderrPipe) return fmt.Errorf("failed to start container stderr logger: %v", err)
}
}
} }
// Get rootfs mounts. // Get rootfs mounts.

View File

@ -19,8 +19,6 @@ package server
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"io/ioutil"
"strings" "strings"
"time" "time"
@ -116,24 +114,23 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
} }
}() }()
// TODO(random-liu): [P1] Moving following logging related logic into util functions.
// Discard sandbox container output because we don't care about it. // Discard sandbox container output because we don't care about it.
_, stdout, stderr := getStreamingPipes(sandboxRootDir) _, stdout, stderr := getStreamingPipes(sandboxRootDir)
_, stdoutPipe, stderrPipe, err := c.prepareStreamingPipes(ctx, "", stdout, stderr) _, stdoutPipe, stderrPipe, err := c.prepareStreamingPipes(ctx, "", stdout, stderr)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to prepare streaming pipes: %v", err) return nil, fmt.Errorf("failed to prepare streaming pipes: %v", err)
} }
for _, f := range []io.ReadCloser{stdoutPipe, stderrPipe} { defer func() {
defer func(cl io.Closer) { if retErr != nil {
if retErr != nil { stdoutPipe.Close()
cl.Close() stderrPipe.Close()
} }
}(f) }()
go func(r io.ReadCloser) { if err := c.agentFactory.NewSandboxLogger(stdoutPipe).Start(); err != nil {
// Discard the output for now. return nil, fmt.Errorf("failed to start sandbox stdout logger: %v", err)
io.Copy(ioutil.Discard, r) // nolint: errcheck }
r.Close() if err := c.agentFactory.NewSandboxLogger(stderrPipe).Start(); err != nil {
}(f) return nil, fmt.Errorf("failed to start sandbox stderr logger: %v", err)
} }
// Start sandbox container. // Start sandbox container.

View File

@ -38,6 +38,7 @@ import (
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store" "github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store"
osinterface "github.com/kubernetes-incubator/cri-containerd/pkg/os" osinterface "github.com/kubernetes-incubator/cri-containerd/pkg/os"
"github.com/kubernetes-incubator/cri-containerd/pkg/registrar" "github.com/kubernetes-incubator/cri-containerd/pkg/registrar"
"github.com/kubernetes-incubator/cri-containerd/pkg/server/agents"
) )
// CRIContainerdService is the interface implement CRI remote service server. // CRIContainerdService is the interface implement CRI remote service server.
@ -87,6 +88,8 @@ type criContainerdService struct {
healthService healthapi.HealthClient healthService healthapi.HealthClient
// netPlugin is used to setup and teardown network when run/stop pod sandbox. // netPlugin is used to setup and teardown network when run/stop pod sandbox.
netPlugin ocicni.CNIPlugin netPlugin ocicni.CNIPlugin
// agentFactory is the factory to create agent used in the cri containerd service.
agentFactory agents.AgentFactory
} }
// NewCRIContainerdService returns a new instance of CRIContainerdService // NewCRIContainerdService returns a new instance of CRIContainerdService
@ -111,6 +114,7 @@ func NewCRIContainerdService(conn *grpc.ClientConn, rootDir, networkPluginBinDir
rootfsService: rootfsapi.NewRootFSClient(conn), rootfsService: rootfsapi.NewRootFSClient(conn),
versionService: versionapi.NewVersionClient(conn), versionService: versionapi.NewVersionClient(conn),
healthService: healthapi.NewHealthClient(conn), healthService: healthapi.NewHealthClient(conn),
agentFactory: agents.NewAgentFactory(),
} }
netPlugin, err := ocicni.InitCNI(networkPluginBinDir, networkPluginConfDir) netPlugin, err := ocicni.InitCNI(networkPluginBinDir, networkPluginConfDir)