Merge pull request #56 from Random-Liu/add-container-logging

Add container logging support.
This commit is contained in:
Lantao Liu 2017-06-02 10:11:41 -07:00 committed by GitHub
commit 42131acc68
8 changed files with 338 additions and 24 deletions

View File

@ -0,0 +1,51 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package agents
import "io"
// StreamType is the type of the stream, stdout/stderr.
type StreamType string
const (
// Stdout stream type.
Stdout StreamType = "stdout"
// Stderr stream type.
Stderr StreamType = "stderr"
)
// Agent is the a running agent perform a specific task, e.g. redirect and
// decorate log, redirect stream etc.
type Agent interface {
// Start starts the logger.
Start() error
}
// AgentFactory is the factory to create required agents.
type AgentFactory interface {
// NewSandboxLogger creates a sandbox logging agent.
NewSandboxLogger(io.ReadCloser) Agent
// NewContainerLogger creates a container logging agent.
NewContainerLogger(string, StreamType, io.ReadCloser) Agent
}
type agentFactory struct{}
// NewAgentFactory creates a new agent factory.
func NewAgentFactory() AgentFactory {
return &agentFactory{}
}

116
pkg/server/agents/logger.go Normal file
View File

@ -0,0 +1,116 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package agents
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"time"
"github.com/golang/glog"
)
const (
// delimiter used in CRI logging format.
delimiter = ' '
// eof is end-of-line.
eol = '\n'
// timestampFormat is the timestamp format used in CRI logging format.
timestampFormat = time.RFC3339Nano
// pipeBufSize is the system PIPE_BUF size, on linux it is 4096 bytes.
// POSIX.1 says that write less than PIPE_BUF is atmoic.
pipeBufSize = 4096
// bufSize is the size of the read buffer.
bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - 2 /*2 delimiter*/ - 1 /*eol*/
)
// sandboxLogger is the log agent used for sandbox.
// It discards sandbox all output for now.
type sandboxLogger struct {
rc io.ReadCloser
}
func (*agentFactory) NewSandboxLogger(rc io.ReadCloser) Agent {
return &sandboxLogger{rc: rc}
}
func (s *sandboxLogger) Start() error {
go func() {
// Discard the output for now.
io.Copy(ioutil.Discard, s.rc) // nolint: errcheck
s.rc.Close()
}()
return nil
}
// containerLogger is the log agent used for container.
// It redirect container log into CRI log file, and decorate the log
// line into CRI defined format.
type containerLogger struct {
path string
stream StreamType
rc io.ReadCloser
}
func (*agentFactory) NewContainerLogger(path string, stream StreamType, rc io.ReadCloser) Agent {
return &containerLogger{
path: path,
stream: stream,
rc: rc,
}
}
func (c *containerLogger) Start() error {
glog.V(4).Infof("Start reading log file %q", c.path)
wc, err := os.OpenFile(c.path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
if err != nil {
return fmt.Errorf("failed to open log file %q: %v", c.path, err)
}
go c.redirectLogs(wc)
return nil
}
func (c *containerLogger) redirectLogs(wc io.WriteCloser) {
defer c.rc.Close()
defer wc.Close()
streamBytes := []byte(c.stream)
delimiterBytes := []byte{delimiter}
r := bufio.NewReaderSize(c.rc, bufSize)
for {
// TODO(random-liu): Better define CRI log format, and escape newline in log.
lineBytes, _, err := r.ReadLine()
if err == io.EOF {
glog.V(4).Infof("Finish redirecting log file %q", c.path)
return
}
if err != nil {
glog.Errorf("An error occurred when redirecting log file %q: %v", c.path, err)
return
}
timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano)
data := bytes.Join([][]byte{timestampBytes, streamBytes, lineBytes}, delimiterBytes)
data = append(data, eol)
if _, err := wc.Write(data); err != nil {
glog.Errorf("Fail to write log line %q: %v", data, err)
}
// Continue on write error to drain the input.
}
}

View File

@ -0,0 +1,90 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package agents
import (
"bytes"
"io/ioutil"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// writeCloserBuffer is a writecloser wrapper for bytes.Buffer
// with a nop closer.
type writeCloserBuffer struct {
*bytes.Buffer
}
// nop close
func (*writeCloserBuffer) Close() error { return nil }
func TestRedirectLogs(t *testing.T) {
f := NewAgentFactory()
// f.NewContainerLogger(
for desc, test := range map[string]struct {
input string
stream StreamType
content []string
}{
"stdout log": {
input: "test stdout log 1\ntest stdout log 2",
stream: Stdout,
content: []string{
"test stdout log 1",
"test stdout log 2",
},
},
"stderr log": {
input: "test stderr log 1\ntest stderr log 2",
stream: Stderr,
content: []string{
"test stderr log 1",
"test stderr log 2",
},
},
"long log": {
input: strings.Repeat("a", bufSize+10) + "\n",
stream: Stdout,
content: []string{
strings.Repeat("a", bufSize),
strings.Repeat("a", 10),
},
},
} {
t.Logf("TestCase %q", desc)
rc := ioutil.NopCloser(strings.NewReader(test.input))
c := f.NewContainerLogger("test-path", test.stream, rc).(*containerLogger)
wc := &writeCloserBuffer{bytes.NewBuffer(nil)}
c.redirectLogs(wc)
output := wc.String()
lines := strings.Split(output, "\n")
lines = lines[:len(lines)-1] // Discard empty string after last \n
assert.Len(t, lines, len(test.content))
for i := range lines {
fields := strings.SplitN(lines[i], string([]byte{delimiter}), 3)
require.Len(t, fields, 3)
_, err := time.Parse(timestampFormat, fields[0])
assert.NoError(t, err)
assert.EqualValues(t, test.stream, fields[1])
assert.Equal(t, test.content[i], fields[2])
}
}
}

View File

@ -0,0 +1,50 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"io"
"github.com/kubernetes-incubator/cri-containerd/pkg/server/agents"
)
// FakeAgent is a fake agent for test.
type FakeAgent struct{}
// Start always return nil not.
// TODO(random-liu): Inject error and test error.
func (f *FakeAgent) Start() error {
return nil
}
// FakeAgentFactory is a fake agent factory for test.
type FakeAgentFactory struct{}
// NewFakeAgentFactory creates fake agent factory.
func NewFakeAgentFactory() agents.AgentFactory {
return &FakeAgentFactory{}
}
// NewSandboxLogger creates a fake agent as sandbox logger.
func (*FakeAgentFactory) NewSandboxLogger(rc io.ReadCloser) agents.Agent {
return &FakeAgent{}
}
// NewContainerLogger creates a fake agent as container logger.
func (*FakeAgentFactory) NewContainerLogger(string, agents.StreamType, io.ReadCloser) agents.Agent {
return &FakeAgent{}
}

View File

@ -21,6 +21,7 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"time"
"github.com/containerd/containerd/api/services/execution"
@ -35,6 +36,7 @@ import (
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
"github.com/kubernetes-incubator/cri-containerd/pkg/server/agents"
)
// StartContainer starts the container.
@ -155,16 +157,18 @@ func (c *criContainerdService) startContainer(ctx context.Context, id string, me
w.Close()
}(stdinPipe)
}
go func(r io.ReadCloser) {
io.Copy(os.Stdout, r) // nolint: errcheck
r.Close()
}(stdoutPipe)
// Only redirect stderr when there is no tty.
if !config.GetTty() {
go func(r io.ReadCloser) {
io.Copy(os.Stderr, r) // nolint: errcheck
r.Close()
}(stderrPipe)
if config.GetLogPath() != "" {
// Only generate container log when log path is specified.
logPath := filepath.Join(sandboxConfig.GetLogDirectory(), config.GetLogPath())
if err := c.agentFactory.NewContainerLogger(logPath, agents.Stdout, stdoutPipe).Start(); err != nil {
return fmt.Errorf("failed to start container stdout logger: %v", err)
}
// Only redirect stderr when there is no tty.
if !config.GetTty() {
if err := c.agentFactory.NewContainerLogger(logPath, agents.Stderr, stderrPipe).Start(); err != nil {
return fmt.Errorf("failed to start container stderr logger: %v", err)
}
}
}
// Get rootfs mounts.

View File

@ -19,8 +19,6 @@ package server
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"strings"
"time"
@ -116,24 +114,23 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
}
}()
// TODO(random-liu): [P1] Moving following logging related logic into util functions.
// Discard sandbox container output because we don't care about it.
_, stdout, stderr := getStreamingPipes(sandboxRootDir)
_, stdoutPipe, stderrPipe, err := c.prepareStreamingPipes(ctx, "", stdout, stderr)
if err != nil {
return nil, fmt.Errorf("failed to prepare streaming pipes: %v", err)
}
for _, f := range []io.ReadCloser{stdoutPipe, stderrPipe} {
defer func(cl io.Closer) {
if retErr != nil {
cl.Close()
}
}(f)
go func(r io.ReadCloser) {
// Discard the output for now.
io.Copy(ioutil.Discard, r) // nolint: errcheck
r.Close()
}(f)
defer func() {
if retErr != nil {
stdoutPipe.Close()
stderrPipe.Close()
}
}()
if err := c.agentFactory.NewSandboxLogger(stdoutPipe).Start(); err != nil {
return nil, fmt.Errorf("failed to start sandbox stdout logger: %v", err)
}
if err := c.agentFactory.NewSandboxLogger(stderrPipe).Start(); err != nil {
return nil, fmt.Errorf("failed to start sandbox stderr logger: %v", err)
}
// Start sandbox container.

View File

@ -38,6 +38,7 @@ import (
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store"
osinterface "github.com/kubernetes-incubator/cri-containerd/pkg/os"
"github.com/kubernetes-incubator/cri-containerd/pkg/registrar"
"github.com/kubernetes-incubator/cri-containerd/pkg/server/agents"
)
// CRIContainerdService is the interface implement CRI remote service server.
@ -87,6 +88,8 @@ type criContainerdService struct {
healthService healthapi.HealthClient
// netPlugin is used to setup and teardown network when run/stop pod sandbox.
netPlugin ocicni.CNIPlugin
// agentFactory is the factory to create agent used in the cri containerd service.
agentFactory agents.AgentFactory
}
// NewCRIContainerdService returns a new instance of CRIContainerdService
@ -111,6 +114,7 @@ func NewCRIContainerdService(conn *grpc.ClientConn, rootDir, networkPluginBinDir
rootfsService: rootfsapi.NewRootFSClient(conn),
versionService: versionapi.NewVersionClient(conn),
healthService: healthapi.NewHealthClient(conn),
agentFactory: agents.NewAgentFactory(),
}
netPlugin, err := ocicni.InitCNI(networkPluginBinDir, networkPluginConfDir)

View File

@ -31,6 +31,7 @@ import (
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store"
ostesting "github.com/kubernetes-incubator/cri-containerd/pkg/os/testing"
"github.com/kubernetes-incubator/cri-containerd/pkg/registrar"
agentstesting "github.com/kubernetes-incubator/cri-containerd/pkg/server/agents/testing"
servertesting "github.com/kubernetes-incubator/cri-containerd/pkg/server/testing"
imagedigest "github.com/opencontainers/go-digest"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
@ -68,6 +69,7 @@ func newTestCRIContainerdService() *criContainerdService {
containerService: servertesting.NewFakeExecutionClient(),
rootfsService: servertesting.NewFakeRootfsClient(),
netPlugin: servertesting.NewFakeCNIPlugin(),
agentFactory: agentstesting.NewFakeAgentFactory(),
}
}