Support reopening container log.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu 2018-02-13 01:55:29 +00:00
parent 8357315564
commit a8264ec035
8 changed files with 93 additions and 53 deletions

View File

@ -10,5 +10,5 @@ if [ x"$KUBERNETES_REPO" = "xk8s.io" ] ; then
fi fi
# Not from vendor.conf. # Not from vendor.conf.
CRITOOL_VERSION=240a840375cdabb5860c75c99e8b0d0a776006b4 CRITOOL_VERSION=c87ea764cecbcbabbb51c5bdd10ea317181fdd62
CRITOOL_REPO=github.com/kubernetes-incubator/cri-tools CRITOOL_REPO=github.com/kubernetes-incubator/cri-tools

View File

@ -39,16 +39,24 @@ func NewWriterGroup() *WriterGroup {
} }
} }
// Add adds a writer into the group, returns an error when writer // Add adds a writer into the group. The writer will be closed
// group is closed. // if the writer group is closed.
func (g *WriterGroup) Add(key string, w io.WriteCloser) error { func (g *WriterGroup) Add(key string, w io.WriteCloser) {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()
if g.closed { if g.closed {
return errors.New("wait group closed") w.Close()
return
} }
g.writers[key] = w g.writers[key] = w
return nil }
// Get gets a writer from the group, returns nil if the writer
// doesn't exist.
func (g *WriterGroup) Get(key string) io.WriteCloser {
g.mu.Lock()
defer g.mu.Unlock()
return g.writers[key]
} }
// Remove removes a writer from the group. // Remove removes a writer from the group.
@ -84,8 +92,8 @@ func (g *WriterGroup) Write(p []byte) (int, error) {
return len(p), nil return len(p), nil
} }
// Close closes the writer group. Write or Add will return error // Close closes the writer group. Write will return error after
// after closed. // closed.
func (g *WriterGroup) Close() { func (g *WriterGroup) Close() {
g.mu.Lock() g.mu.Lock()
defer g.mu.Unlock() defer g.mu.Unlock()

View File

@ -48,8 +48,7 @@ func TestClosedWriterGroup(t *testing.T) {
wc := &writeCloser{} wc := &writeCloser{}
key, data := "test key", "test data" key, data := "test key", "test data"
err := wg.Add(key, wc) wg.Add(key, wc)
assert.NoError(t, err)
n, err := wg.Write([]byte(data)) n, err := wg.Write([]byte(data))
assert.Equal(t, len(data), n) assert.Equal(t, len(data), n)
@ -59,36 +58,58 @@ func TestClosedWriterGroup(t *testing.T) {
wg.Close() wg.Close()
assert.True(t, wc.closed) assert.True(t, wc.closed)
err = wg.Add(key, &writeCloser{}) newWC := &writeCloser{}
assert.Error(t, err) wg.Add(key, newWC)
assert.True(t, newWC.closed)
_, err = wg.Write([]byte(data)) _, err = wg.Write([]byte(data))
assert.Error(t, err) assert.Error(t, err)
} }
func TestAddRemoveWriter(t *testing.T) { func TestAddGetRemoveWriter(t *testing.T) {
wg := NewWriterGroup() wg := NewWriterGroup()
wc1, wc2 := &writeCloser{}, &writeCloser{} wc1, wc2 := &writeCloser{}, &writeCloser{}
key1, key2 := "test key 1", "test key 2" key1, key2 := "test key 1", "test key 2"
err := wg.Add(key1, wc1) wg.Add(key1, wc1)
assert.NoError(t, err) _, err := wg.Write([]byte("test data 1"))
_, err = wg.Write([]byte("test data 1"))
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "test data 1", wc1.buf.String()) assert.Equal(t, "test data 1", wc1.buf.String())
err = wg.Add(key2, wc2) wg.Add(key2, wc2)
assert.NoError(t, err)
_, err = wg.Write([]byte("test data 2")) _, err = wg.Write([]byte("test data 2"))
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "test data 1test data 2", wc1.buf.String()) assert.Equal(t, "test data 1test data 2", wc1.buf.String())
assert.Equal(t, "test data 2", wc2.buf.String()) assert.Equal(t, "test data 2", wc2.buf.String())
assert.Equal(t, wc1, wg.Get(key1))
wg.Remove(key1) wg.Remove(key1)
_, err = wg.Write([]byte("test data 3")) _, err = wg.Write([]byte("test data 3"))
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "test data 1test data 2", wc1.buf.String()) assert.Equal(t, "test data 1test data 2", wc1.buf.String())
assert.Equal(t, "test data 2test data 3", wc2.buf.String()) assert.Equal(t, "test data 2test data 3", wc2.buf.String())
assert.Equal(t, nil, wg.Get(key1))
wg.Close()
}
func TestReplaceWriter(t *testing.T) {
wg := NewWriterGroup()
wc1, wc2 := &writeCloser{}, &writeCloser{}
key := "test-key"
wg.Add(key, wc1)
_, err := wg.Write([]byte("test data 1"))
assert.NoError(t, err)
assert.Equal(t, "test data 1", wc1.buf.String())
wg.Add(key, wc2)
_, err = wg.Write([]byte("test data 2"))
assert.NoError(t, err)
assert.Equal(t, "test data 1", wc1.buf.String())
assert.Equal(t, "test data 2", wc2.buf.String())
wg.Close() wg.Close()
} }

View File

@ -77,8 +77,6 @@ func (c *criContainerdService) attachContainer(ctx context.Context, id string, s
}, },
} }
// TODO(random-liu): Figure out whether we need to support historical output. // TODO(random-liu): Figure out whether we need to support historical output.
if err := cntr.IO.Attach(opts); err != nil { cntr.IO.Attach(opts)
return fmt.Errorf("failed to attach container: %v", err)
}
return nil return nil
} }

View File

@ -17,7 +17,7 @@ limitations under the License.
package server package server
import ( import (
"errors" "fmt"
"golang.org/x/net/context" "golang.org/x/net/context"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
@ -25,7 +25,27 @@ import (
// ReopenContainerLog asks cri-containerd to reopen the stdout/stderr log file for the container. // ReopenContainerLog asks cri-containerd to reopen the stdout/stderr log file for the container.
// This is often called after the log file has been rotated. // This is often called after the log file has been rotated.
// TODO(random-liu): Implement this for kubelet log rotation.
func (c *criContainerdService) ReopenContainerLog(ctx context.Context, r *runtime.ReopenContainerLogRequest) (*runtime.ReopenContainerLogResponse, error) { func (c *criContainerdService) ReopenContainerLog(ctx context.Context, r *runtime.ReopenContainerLogRequest) (*runtime.ReopenContainerLogResponse, error) {
return nil, errors.New("not implemented") container, err := c.containerStore.Get(r.GetContainerId())
if err != nil {
return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err)
}
if container.Status.Get().State() != runtime.ContainerState_CONTAINER_RUNNING {
return nil, fmt.Errorf("container is not running")
}
// Create new container logger and replace the existing ones.
stdoutWC, stderrWC, err := createContainerLoggers(container.LogPath, container.Config.GetTty())
if err != nil {
return nil, err
}
oldStdoutWC, oldStderrWC := container.IO.AddOutput("log", stdoutWC, stderrWC)
if oldStdoutWC != nil {
oldStdoutWC.Close()
}
if oldStderrWC != nil {
oldStderrWC.Close()
}
return &runtime.ReopenContainerLogResponse{}, nil
} }

View File

@ -110,9 +110,7 @@ func (c *criContainerdService) startContainer(ctx context.Context,
} }
} }
}() }()
if err := cio.WithOutput("log", stdoutWC, stderrWC)(cntr.IO); err != nil { cntr.IO.AddOutput("log", stdoutWC, stderrWC)
return nil, fmt.Errorf("failed to add container log: %v", err)
}
cntr.IO.Pipe() cntr.IO.Pipe()
return cntr.IO, nil return cntr.IO, nil
} }

View File

@ -52,23 +52,6 @@ var _ cio.IO = &ContainerIO{}
// ContainerIOOpts sets specific information to newly created ContainerIO. // ContainerIOOpts sets specific information to newly created ContainerIO.
type ContainerIOOpts func(*ContainerIO) error type ContainerIOOpts func(*ContainerIO) error
// WithOutput adds output stream to the container io.
func WithOutput(name string, stdout, stderr io.WriteCloser) ContainerIOOpts {
return func(c *ContainerIO) error {
if stdout != nil {
if err := c.stdoutGroup.Add(streamKey(c.id, name, Stdout), stdout); err != nil {
return err
}
}
if stderr != nil {
if err := c.stderrGroup.Add(streamKey(c.id, name, Stderr), stderr); err != nil {
return err
}
}
return nil
}
}
// WithFIFOs specifies existing fifos for the container io. // WithFIFOs specifies existing fifos for the container io.
func WithFIFOs(fifos *cio.FIFOSet) ContainerIOOpts { func WithFIFOs(fifos *cio.FIFOSet) ContainerIOOpts {
return func(c *ContainerIO) error { return func(c *ContainerIO) error {
@ -149,7 +132,7 @@ func (c *ContainerIO) Pipe() {
// Attach attaches container stdio. // Attach attaches container stdio.
// TODO(random-liu): Use pools.Copy in docker to reduce memory usage? // TODO(random-liu): Use pools.Copy in docker to reduce memory usage?
func (c *ContainerIO) Attach(opts AttachOptions) error { func (c *ContainerIO) Attach(opts AttachOptions) {
var wg sync.WaitGroup var wg sync.WaitGroup
key := util.GenerateID() key := util.GenerateID()
stdinKey := streamKey(c.id, "attach-"+key, Stdin) stdinKey := streamKey(c.id, "attach-"+key, Stdin)
@ -202,21 +185,33 @@ func (c *ContainerIO) Attach(opts AttachOptions) error {
if opts.Stdout != nil { if opts.Stdout != nil {
wg.Add(1) wg.Add(1)
wc, close := cioutil.NewWriteCloseInformer(opts.Stdout) wc, close := cioutil.NewWriteCloseInformer(opts.Stdout)
if err := c.stdoutGroup.Add(stdoutKey, wc); err != nil { c.stdoutGroup.Add(stdoutKey, wc)
return err
}
go attachStream(stdoutKey, close) go attachStream(stdoutKey, close)
} }
if !opts.Tty && opts.Stderr != nil { if !opts.Tty && opts.Stderr != nil {
wg.Add(1) wg.Add(1)
wc, close := cioutil.NewWriteCloseInformer(opts.Stderr) wc, close := cioutil.NewWriteCloseInformer(opts.Stderr)
if err := c.stderrGroup.Add(stderrKey, wc); err != nil { c.stderrGroup.Add(stderrKey, wc)
return err
}
go attachStream(stderrKey, close) go attachStream(stderrKey, close)
} }
wg.Wait() wg.Wait()
return nil }
// AddOutput adds new write closers to the container stream, and returns existing
// write closers if there are any.
func (c *ContainerIO) AddOutput(name string, stdout, stderr io.WriteCloser) (io.WriteCloser, io.WriteCloser) {
var oldStdout, oldStderr io.WriteCloser
if stdout != nil {
key := streamKey(c.id, name, Stdout)
oldStdout = c.stdoutGroup.Get(key)
c.stdoutGroup.Add(key, stdout)
}
if stderr != nil {
key := streamKey(c.id, name, Stderr)
oldStderr = c.stderrGroup.Get(key)
c.stderrGroup.Add(key, stderr)
}
return oldStdout, oldStderr
} }
// Cancel cancels container io. // Cancel cancels container io.

View File

@ -161,11 +161,11 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir
} }
containerIO, err = cio.NewContainerIO(id, containerIO, err = cio.NewContainerIO(id,
cio.WithFIFOs(fifos), cio.WithFIFOs(fifos),
cio.WithOutput("log", stdoutWC, stderrWC),
) )
if err != nil { if err != nil {
return nil, err return nil, err
} }
containerIO.AddOutput("log", stdoutWC, stderrWC)
containerIO.Pipe() containerIO.Pipe()
return containerIO, nil return containerIO, nil
}) })