Add container attach support.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu
2017-08-10 21:27:31 +00:00
parent 77b703f1e7
commit 45ee2e554a
24 changed files with 840 additions and 371 deletions

317
pkg/server/io/io.go Normal file
View File

@@ -0,0 +1,317 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package agents
import (
"errors"
"io"
"os"
"strings"
"sync"
"syscall"
"github.com/containerd/containerd"
"github.com/containerd/fifo"
"github.com/golang/glog"
"golang.org/x/net/context"
"github.com/kubernetes-incubator/cri-containerd/pkg/ioutil"
"github.com/kubernetes-incubator/cri-containerd/pkg/util"
)
// StreamType is the type of the stream, stdout/stderr.
type StreamType string
const (
// Stdin stream type.
Stdin StreamType = "stdin"
// Stdout stream type.
Stdout StreamType = "stdout"
// Stderr stream type.
Stderr StreamType = "stderr"
)
type wgCloser struct {
ctx context.Context
wg *sync.WaitGroup
set []io.Closer
cancel context.CancelFunc
}
func (g *wgCloser) Wait() {
g.wg.Wait()
}
func (g *wgCloser) Close() {
for _, f := range g.set {
f.Close()
}
}
func (g *wgCloser) Cancel() {
g.cancel()
}
// streamKey generates a key for the stream.
func streamKey(id, name string, stream StreamType) string {
return strings.Join([]string{id, name, string(stream)}, "-")
}
// ContainerIO holds the container io.
type ContainerIO struct {
dir string
stdinPath string
stdoutPath string
stderrPath string
id string
tty bool
stdin bool
stdout *ioutil.WriterGroup
stderr *ioutil.WriterGroup
closer *wgCloser
}
var _ containerd.IO = &ContainerIO{}
// Opts sets specific information to newly created ContainerIO.
type Opts func(*ContainerIO) error
// WithStdin enables stdin of the container io.
func WithStdin(stdin bool) Opts {
return func(c *ContainerIO) error {
c.stdin = stdin
return nil
}
}
// WithOutput adds output stream to the container io.
func WithOutput(name string, stdout, stderr io.WriteCloser) Opts {
return func(c *ContainerIO) error {
if stdout != nil {
if err := c.stdout.Add(streamKey(c.id, name, Stdout), stdout); err != nil {
return err
}
}
if stderr != nil {
if err := c.stderr.Add(streamKey(c.id, name, Stderr), stderr); err != nil {
return err
}
}
return nil
}
}
// WithTerminal enables tty of the container io.
func WithTerminal(tty bool) Opts {
return func(c *ContainerIO) error {
c.tty = tty
return nil
}
}
// NewContainerIO creates container io.
func NewContainerIO(id string, opts ...Opts) (*ContainerIO, error) {
fifos, err := containerd.NewFifos(id)
if err != nil {
return nil, err
}
c := &ContainerIO{
id: id,
dir: fifos.Dir,
stdoutPath: fifos.Out,
stderrPath: fifos.Err,
stdout: ioutil.NewWriterGroup(),
stderr: ioutil.NewWriterGroup(),
}
for _, opt := range opts {
if err := opt(c); err != nil {
return nil, err
}
}
if c.stdin {
c.stdinPath = fifos.In
}
return c, nil
}
// Config returns io config.
func (c *ContainerIO) Config() containerd.IOConfig {
return containerd.IOConfig{
Terminal: c.tty,
Stdin: c.stdinPath,
Stdout: c.stdoutPath,
Stderr: c.stderrPath,
}
}
// Pipe creates container fifos and pipe container output
// to output stream.
func (c *ContainerIO) Pipe() (err error) {
var (
f io.ReadWriteCloser
set []io.Closer
ctx, cancel = context.WithCancel(context.Background())
wg = &sync.WaitGroup{}
)
defer func() {
if err != nil {
for _, f := range set {
f.Close()
}
cancel()
}
}()
if c.stdinPath != "" {
// Just create the stdin, only open it when used.
if f, err = fifo.OpenFifo(ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return err
}
f.Close()
}
if f, err = fifo.OpenFifo(ctx, c.stdoutPath, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return err
}
set = append(set, f)
wg.Add(1)
go func(r io.ReadCloser) {
if _, err := io.Copy(c.stdout, r); err != nil {
glog.Errorf("Failed to redirect stdout of container %q: %v", c.id, err)
}
r.Close()
c.stdout.Close()
wg.Done()
glog.V(2).Infof("Finish piping stdout of container %q", c.id)
}(f)
if f, err = fifo.OpenFifo(ctx, c.stderrPath, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return err
}
set = append(set, f)
if !c.tty {
wg.Add(1)
go func(r io.ReadCloser) {
if _, err := io.Copy(c.stderr, r); err != nil {
glog.Errorf("Failed to redirect stderr of container %q: %v", c.id, err)
}
r.Close()
c.stderr.Close()
wg.Done()
glog.V(2).Infof("Finish piping stderr of container %q", c.id)
}(f)
}
c.closer = &wgCloser{
wg: wg,
set: set,
ctx: ctx,
cancel: cancel,
}
return nil
}
// Attach attaches container stdio.
func (c *ContainerIO) Attach(stdin io.Reader, stdout, stderr io.WriteCloser) error {
if c.closer == nil {
return errors.New("container io is not initialized")
}
var wg sync.WaitGroup
key := util.GenerateID()
stdinKey := streamKey(c.id, "attach-"+key, Stdin)
stdoutKey := streamKey(c.id, "attach-"+key, Stdout)
stderrKey := streamKey(c.id, "attach-"+key, Stderr)
var stdinCloser io.Closer
if c.stdinPath != "" && stdin != nil {
f, err := fifo.OpenFifo(c.closer.ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_NONBLOCK, 0700)
if err != nil {
return err
}
// Also increase wait group here, so that `closer.Wait` will
// also wait for this fifo to be closed.
c.closer.wg.Add(1)
wg.Add(1)
go func(w io.WriteCloser) {
if _, err := io.Copy(w, stdin); err != nil {
glog.Errorf("Failed to redirect stdin for container attach %q: %v", c.id, err)
}
w.Close()
glog.V(2).Infof("Attach stream %q closed", stdinKey)
// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
if stdout != nil {
c.stdout.Remove(stdoutKey)
}
if stderr != nil {
c.stderr.Remove(stderrKey)
}
wg.Done()
c.closer.wg.Done()
}(f)
stdinCloser = f
}
attachStream := func(key string, close <-chan struct{}) {
<-close
glog.V(2).Infof("Attach stream %q closed", key)
// Make sure stdin gets closed.
if stdinCloser != nil {
stdinCloser.Close()
}
wg.Done()
}
if stdout != nil {
wg.Add(1)
wc, close := ioutil.NewWriteCloseInformer(stdout)
if err := c.stdout.Add(stdoutKey, wc); err != nil {
return err
}
go attachStream(stdoutKey, close)
}
if !c.tty && stderr != nil {
wg.Add(1)
wc, close := ioutil.NewWriteCloseInformer(stderr)
if err := c.stderr.Add(stderrKey, wc); err != nil {
return err
}
go attachStream(stderrKey, close)
}
wg.Wait()
return nil
}
// Cancel cancels container io.
func (c *ContainerIO) Cancel() {
c.closer.Cancel()
}
// Wait waits container io to finish.
func (c *ContainerIO) Wait() {
c.closer.Wait()
}
// Close closes all FIFOs.
func (c *ContainerIO) Close() error {
if c.closer != nil {
c.closer.Close()
}
if c.dir != "" {
return os.RemoveAll(c.dir)
}
return nil
}

90
pkg/server/io/logger.go Normal file
View File

@@ -0,0 +1,90 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package agents
import (
"bufio"
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"time"
"github.com/golang/glog"
cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil"
)
const (
// delimiter used in CRI logging format.
delimiter = ' '
// eof is end-of-line.
eol = '\n'
// timestampFormat is the timestamp format used in CRI logging format.
timestampFormat = time.RFC3339Nano
// pipeBufSize is the system PIPE_BUF size, on linux it is 4096 bytes.
// POSIX.1 says that write less than PIPE_BUF is atmoic.
pipeBufSize = 4096
// bufSize is the size of the read buffer.
bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - 2 /*2 delimiter*/ - 1 /*eol*/
)
// NewDiscardLogger creates logger which discards all the input.
func NewDiscardLogger() io.WriteCloser {
return cioutil.NewNopWriteCloser(ioutil.Discard)
}
// NewCRILogger returns a write closer which redirect container log into
// log file, and decorate the log line into CRI defined format.
func NewCRILogger(path string, stream StreamType) (io.WriteCloser, error) {
glog.V(4).Infof("Start reading log file %q", path)
prc, pwc := io.Pipe()
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
if err != nil {
return nil, fmt.Errorf("failed to open log file: %v", err)
}
go redirectLogs(path, prc, f, stream)
return pwc, nil
}
func redirectLogs(path string, rc io.ReadCloser, wc io.WriteCloser, stream StreamType) {
defer rc.Close()
defer wc.Close()
streamBytes := []byte(stream)
delimiterBytes := []byte{delimiter}
r := bufio.NewReaderSize(rc, bufSize)
for {
// TODO(random-liu): Better define CRI log format, and escape newline in log.
lineBytes, _, err := r.ReadLine()
if err == io.EOF {
glog.V(4).Infof("Finish redirecting log file %q", path)
return
}
if err != nil {
glog.Errorf("An error occurred when redirecting log file %q: %v", path, err)
return
}
timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano)
data := bytes.Join([][]byte{timestampBytes, streamBytes, lineBytes}, delimiterBytes)
data = append(data, eol)
if _, err := wc.Write(data); err != nil {
glog.Errorf("Fail to write log line %q: %v", data, err)
}
// Continue on write error to drain the input.
}
}

View File

@@ -0,0 +1,81 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package agents
import (
"bytes"
"io/ioutil"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil"
)
func TestRedirectLogs(t *testing.T) {
for desc, test := range map[string]struct {
input string
stream StreamType
content []string
}{
"stdout log": {
input: "test stdout log 1\ntest stdout log 2",
stream: Stdout,
content: []string{
"test stdout log 1",
"test stdout log 2",
},
},
"stderr log": {
input: "test stderr log 1\ntest stderr log 2",
stream: Stderr,
content: []string{
"test stderr log 1",
"test stderr log 2",
},
},
"long log": {
input: strings.Repeat("a", bufSize+10) + "\n",
stream: Stdout,
content: []string{
strings.Repeat("a", bufSize),
strings.Repeat("a", 10),
},
},
} {
t.Logf("TestCase %q", desc)
rc := ioutil.NopCloser(strings.NewReader(test.input))
buf := bytes.NewBuffer(nil)
wc := cioutil.NewNopWriteCloser(buf)
redirectLogs("test-path", rc, wc, test.stream)
output := buf.String()
lines := strings.Split(output, "\n")
lines = lines[:len(lines)-1] // Discard empty string after last \n
assert.Len(t, lines, len(test.content))
for i := range lines {
fields := strings.SplitN(lines[i], string([]byte{delimiter}), 3)
require.Len(t, fields, 3)
_, err := time.Parse(timestampFormat, fields[0])
assert.NoError(t, err)
assert.EqualValues(t, test.stream, fields[1])
assert.Equal(t, test.content[i], fields[2])
}
}
}