From a8264ec035740ed51f49122a4855a670aedb5e45 Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Tue, 13 Feb 2018 01:55:29 +0000 Subject: [PATCH] Support reopening container log. Signed-off-by: Lantao Liu --- hack/versions | 2 +- pkg/ioutil/writer_group.go | 22 ++++++++++----- pkg/ioutil/writer_group_test.go | 41 ++++++++++++++++++++------- pkg/server/container_attach.go | 4 +-- pkg/server/container_log_reopen.go | 26 +++++++++++++++-- pkg/server/container_start.go | 4 +-- pkg/server/io/container_io.go | 45 +++++++++++++----------------- pkg/server/restart.go | 2 +- 8 files changed, 93 insertions(+), 53 deletions(-) diff --git a/hack/versions b/hack/versions index 99cc8945d..9e456e7dd 100644 --- a/hack/versions +++ b/hack/versions @@ -10,5 +10,5 @@ if [ x"$KUBERNETES_REPO" = "xk8s.io" ] ; then fi # Not from vendor.conf. -CRITOOL_VERSION=240a840375cdabb5860c75c99e8b0d0a776006b4 +CRITOOL_VERSION=c87ea764cecbcbabbb51c5bdd10ea317181fdd62 CRITOOL_REPO=github.com/kubernetes-incubator/cri-tools diff --git a/pkg/ioutil/writer_group.go b/pkg/ioutil/writer_group.go index 7c3dde5cb..ea69ce88b 100644 --- a/pkg/ioutil/writer_group.go +++ b/pkg/ioutil/writer_group.go @@ -39,16 +39,24 @@ func NewWriterGroup() *WriterGroup { } } -// Add adds a writer into the group, returns an error when writer -// group is closed. -func (g *WriterGroup) Add(key string, w io.WriteCloser) error { +// Add adds a writer into the group. The writer will be closed +// if the writer group is closed. +func (g *WriterGroup) Add(key string, w io.WriteCloser) { g.mu.Lock() defer g.mu.Unlock() if g.closed { - return errors.New("wait group closed") + w.Close() + return } g.writers[key] = w - return nil +} + +// Get gets a writer from the group, returns nil if the writer +// doesn't exist. +func (g *WriterGroup) Get(key string) io.WriteCloser { + g.mu.Lock() + defer g.mu.Unlock() + return g.writers[key] } // Remove removes a writer from the group. @@ -84,8 +92,8 @@ func (g *WriterGroup) Write(p []byte) (int, error) { return len(p), nil } -// Close closes the writer group. Write or Add will return error -// after closed. +// Close closes the writer group. Write will return error after +// closed. func (g *WriterGroup) Close() { g.mu.Lock() defer g.mu.Unlock() diff --git a/pkg/ioutil/writer_group_test.go b/pkg/ioutil/writer_group_test.go index 26bc940b5..f2fbea72d 100644 --- a/pkg/ioutil/writer_group_test.go +++ b/pkg/ioutil/writer_group_test.go @@ -48,8 +48,7 @@ func TestClosedWriterGroup(t *testing.T) { wc := &writeCloser{} key, data := "test key", "test data" - err := wg.Add(key, wc) - assert.NoError(t, err) + wg.Add(key, wc) n, err := wg.Write([]byte(data)) assert.Equal(t, len(data), n) @@ -59,36 +58,58 @@ func TestClosedWriterGroup(t *testing.T) { wg.Close() assert.True(t, wc.closed) - err = wg.Add(key, &writeCloser{}) - assert.Error(t, err) + newWC := &writeCloser{} + wg.Add(key, newWC) + assert.True(t, newWC.closed) _, err = wg.Write([]byte(data)) assert.Error(t, err) } -func TestAddRemoveWriter(t *testing.T) { +func TestAddGetRemoveWriter(t *testing.T) { wg := NewWriterGroup() wc1, wc2 := &writeCloser{}, &writeCloser{} key1, key2 := "test key 1", "test key 2" - err := wg.Add(key1, wc1) - assert.NoError(t, err) - _, err = wg.Write([]byte("test data 1")) + wg.Add(key1, wc1) + _, err := wg.Write([]byte("test data 1")) assert.NoError(t, err) assert.Equal(t, "test data 1", wc1.buf.String()) - err = wg.Add(key2, wc2) - assert.NoError(t, err) + wg.Add(key2, wc2) _, err = wg.Write([]byte("test data 2")) assert.NoError(t, err) assert.Equal(t, "test data 1test data 2", wc1.buf.String()) assert.Equal(t, "test data 2", wc2.buf.String()) + assert.Equal(t, wc1, wg.Get(key1)) + wg.Remove(key1) _, err = wg.Write([]byte("test data 3")) assert.NoError(t, err) assert.Equal(t, "test data 1test data 2", wc1.buf.String()) assert.Equal(t, "test data 2test data 3", wc2.buf.String()) + assert.Equal(t, nil, wg.Get(key1)) + + wg.Close() +} + +func TestReplaceWriter(t *testing.T) { + wg := NewWriterGroup() + wc1, wc2 := &writeCloser{}, &writeCloser{} + key := "test-key" + + wg.Add(key, wc1) + _, err := wg.Write([]byte("test data 1")) + assert.NoError(t, err) + assert.Equal(t, "test data 1", wc1.buf.String()) + + wg.Add(key, wc2) + _, err = wg.Write([]byte("test data 2")) + assert.NoError(t, err) + assert.Equal(t, "test data 1", wc1.buf.String()) + assert.Equal(t, "test data 2", wc2.buf.String()) + wg.Close() } diff --git a/pkg/server/container_attach.go b/pkg/server/container_attach.go index 116c0cf4e..c823f0953 100644 --- a/pkg/server/container_attach.go +++ b/pkg/server/container_attach.go @@ -77,8 +77,6 @@ func (c *criContainerdService) attachContainer(ctx context.Context, id string, s }, } // TODO(random-liu): Figure out whether we need to support historical output. - if err := cntr.IO.Attach(opts); err != nil { - return fmt.Errorf("failed to attach container: %v", err) - } + cntr.IO.Attach(opts) return nil } diff --git a/pkg/server/container_log_reopen.go b/pkg/server/container_log_reopen.go index 3df4f3efd..b233e65d4 100644 --- a/pkg/server/container_log_reopen.go +++ b/pkg/server/container_log_reopen.go @@ -17,7 +17,7 @@ limitations under the License. package server import ( - "errors" + "fmt" "golang.org/x/net/context" runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" @@ -25,7 +25,27 @@ import ( // ReopenContainerLog asks cri-containerd to reopen the stdout/stderr log file for the container. // This is often called after the log file has been rotated. -// TODO(random-liu): Implement this for kubelet log rotation. func (c *criContainerdService) ReopenContainerLog(ctx context.Context, r *runtime.ReopenContainerLogRequest) (*runtime.ReopenContainerLogResponse, error) { - return nil, errors.New("not implemented") + container, err := c.containerStore.Get(r.GetContainerId()) + if err != nil { + return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err) + } + + if container.Status.Get().State() != runtime.ContainerState_CONTAINER_RUNNING { + return nil, fmt.Errorf("container is not running") + } + + // Create new container logger and replace the existing ones. + stdoutWC, stderrWC, err := createContainerLoggers(container.LogPath, container.Config.GetTty()) + if err != nil { + return nil, err + } + oldStdoutWC, oldStderrWC := container.IO.AddOutput("log", stdoutWC, stderrWC) + if oldStdoutWC != nil { + oldStdoutWC.Close() + } + if oldStderrWC != nil { + oldStderrWC.Close() + } + return &runtime.ReopenContainerLogResponse{}, nil } diff --git a/pkg/server/container_start.go b/pkg/server/container_start.go index d16cecf86..32fb2963a 100644 --- a/pkg/server/container_start.go +++ b/pkg/server/container_start.go @@ -110,9 +110,7 @@ func (c *criContainerdService) startContainer(ctx context.Context, } } }() - if err := cio.WithOutput("log", stdoutWC, stderrWC)(cntr.IO); err != nil { - return nil, fmt.Errorf("failed to add container log: %v", err) - } + cntr.IO.AddOutput("log", stdoutWC, stderrWC) cntr.IO.Pipe() return cntr.IO, nil } diff --git a/pkg/server/io/container_io.go b/pkg/server/io/container_io.go index b8d166a38..726071213 100644 --- a/pkg/server/io/container_io.go +++ b/pkg/server/io/container_io.go @@ -52,23 +52,6 @@ var _ cio.IO = &ContainerIO{} // ContainerIOOpts sets specific information to newly created ContainerIO. type ContainerIOOpts func(*ContainerIO) error -// WithOutput adds output stream to the container io. -func WithOutput(name string, stdout, stderr io.WriteCloser) ContainerIOOpts { - return func(c *ContainerIO) error { - if stdout != nil { - if err := c.stdoutGroup.Add(streamKey(c.id, name, Stdout), stdout); err != nil { - return err - } - } - if stderr != nil { - if err := c.stderrGroup.Add(streamKey(c.id, name, Stderr), stderr); err != nil { - return err - } - } - return nil - } -} - // WithFIFOs specifies existing fifos for the container io. func WithFIFOs(fifos *cio.FIFOSet) ContainerIOOpts { return func(c *ContainerIO) error { @@ -149,7 +132,7 @@ func (c *ContainerIO) Pipe() { // Attach attaches container stdio. // TODO(random-liu): Use pools.Copy in docker to reduce memory usage? -func (c *ContainerIO) Attach(opts AttachOptions) error { +func (c *ContainerIO) Attach(opts AttachOptions) { var wg sync.WaitGroup key := util.GenerateID() stdinKey := streamKey(c.id, "attach-"+key, Stdin) @@ -202,21 +185,33 @@ func (c *ContainerIO) Attach(opts AttachOptions) error { if opts.Stdout != nil { wg.Add(1) wc, close := cioutil.NewWriteCloseInformer(opts.Stdout) - if err := c.stdoutGroup.Add(stdoutKey, wc); err != nil { - return err - } + c.stdoutGroup.Add(stdoutKey, wc) go attachStream(stdoutKey, close) } if !opts.Tty && opts.Stderr != nil { wg.Add(1) wc, close := cioutil.NewWriteCloseInformer(opts.Stderr) - if err := c.stderrGroup.Add(stderrKey, wc); err != nil { - return err - } + c.stderrGroup.Add(stderrKey, wc) go attachStream(stderrKey, close) } wg.Wait() - return nil +} + +// AddOutput adds new write closers to the container stream, and returns existing +// write closers if there are any. +func (c *ContainerIO) AddOutput(name string, stdout, stderr io.WriteCloser) (io.WriteCloser, io.WriteCloser) { + var oldStdout, oldStderr io.WriteCloser + if stdout != nil { + key := streamKey(c.id, name, Stdout) + oldStdout = c.stdoutGroup.Get(key) + c.stdoutGroup.Add(key, stdout) + } + if stderr != nil { + key := streamKey(c.id, name, Stderr) + oldStderr = c.stderrGroup.Get(key) + c.stderrGroup.Add(key, stderr) + } + return oldStdout, oldStderr } // Cancel cancels container io. diff --git a/pkg/server/restart.go b/pkg/server/restart.go index 04887034e..158c4b178 100644 --- a/pkg/server/restart.go +++ b/pkg/server/restart.go @@ -161,11 +161,11 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir } containerIO, err = cio.NewContainerIO(id, cio.WithFIFOs(fifos), - cio.WithOutput("log", stdoutWC, stderrWC), ) if err != nil { return nil, err } + containerIO.AddOutput("log", stdoutWC, stderrWC) containerIO.Pipe() return containerIO, nil })