From 77b703f1e701a79ade3fc25fc60c40add44fe476 Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Fri, 11 Aug 2017 03:03:14 +0000 Subject: [PATCH 1/2] Move generateID to util. Signed-off-by: Lantao Liu --- pkg/server/container_create.go | 3 ++- pkg/server/container_execsync.go | 4 +++- pkg/server/helpers.go | 6 ------ pkg/server/sandbox_run.go | 3 ++- pkg/util/id.go | 24 ++++++++++++++++++++++++ 5 files changed, 31 insertions(+), 9 deletions(-) create mode 100644 pkg/util/id.go diff --git a/pkg/server/container_create.go b/pkg/server/container_create.go index cea3a58db..136e8b8ca 100644 --- a/pkg/server/container_create.go +++ b/pkg/server/container_create.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container" + "github.com/kubernetes-incubator/cri-containerd/pkg/util" ) // CreateContainer creates a new container in the given PodSandbox. @@ -54,7 +55,7 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C // Generate unique id and name for the container and reserve the name. // Reserve the container name to avoid concurrent `CreateContainer` request creating // the same container. - id := generateID() + id := util.GenerateID() name := makeContainerName(config.GetMetadata(), sandboxConfig.GetMetadata()) if err = c.containerNameIndex.Reserve(name, id); err != nil { return nil, fmt.Errorf("failed to reserve container name %q: %v", name, err) diff --git a/pkg/server/container_execsync.go b/pkg/server/container_execsync.go index 17be309e3..80180386a 100644 --- a/pkg/server/container_execsync.go +++ b/pkg/server/container_execsync.go @@ -29,6 +29,8 @@ import ( "golang.org/x/sys/unix" "k8s.io/client-go/tools/remotecommand" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/util" ) // ExecSync executes a command in the container, and returns the stdout output. @@ -116,7 +118,7 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o // Create empty buffer if stdin is nil. opts.stdin = new(bytes.Buffer) } - execID := generateID() + execID := util.GenerateID() process, err := task.Exec(ctx, execID, pspec, containerd.NewIOWithTerminal( opts.stdin, opts.stdout, diff --git a/pkg/server/helpers.go b/pkg/server/helpers.go index d43851297..b4ecffbcc 100644 --- a/pkg/server/helpers.go +++ b/pkg/server/helpers.go @@ -29,7 +29,6 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" "github.com/docker/distribution/reference" - "github.com/docker/docker/pkg/stringid" imagedigest "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/identity" imagespec "github.com/opencontainers/image-spec/specs-go/v1" @@ -102,11 +101,6 @@ const ( containerMetadataLabel = "io.cri-containerd.container.metadata" ) -// generateID generates a random unique id. -func generateID() string { - return stringid.GenerateNonCryptoID() -} - // makeSandboxName generates sandbox name from sandbox metadata. The name // generated is unique as long as sandbox metadata is unique. func makeSandboxName(s *runtime.PodSandboxMetadata) string { diff --git a/pkg/server/sandbox_run.go b/pkg/server/sandbox_run.go index 983fbf82a..46d1bec17 100644 --- a/pkg/server/sandbox_run.go +++ b/pkg/server/sandbox_run.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox" + "github.com/kubernetes-incubator/cri-containerd/pkg/util" ) // RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure @@ -50,7 +51,7 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run config := r.GetConfig() // Generate unique id and name for the sandbox and reserve the name. - id := generateID() + id := util.GenerateID() name := makeSandboxName(config.GetMetadata()) // Reserve the sandbox name to avoid concurrent `RunPodSandbox` request starting the // same sandbox. diff --git a/pkg/util/id.go b/pkg/util/id.go new file mode 100644 index 000000000..ae7fbdcc6 --- /dev/null +++ b/pkg/util/id.go @@ -0,0 +1,24 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import "github.com/docker/docker/pkg/stringid" + +// GenerateID generates a random unique id. +func GenerateID() string { + return stringid.GenerateNonCryptoID() +} From 45ee2e554a226a86bef50889b579d94b4180e5fc Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Thu, 10 Aug 2017 21:27:31 +0000 Subject: [PATCH 2/2] Add container attach support. Signed-off-by: Lantao Liu --- hack/test-cri.sh | 2 +- pkg/ioutil/write_closer.go | 68 ++++ pkg/ioutil/write_closer_test.go | 49 +++ pkg/ioutil/writer_group.go | 103 ++++++ pkg/ioutil/writer_group_test.go | 94 ++++++ pkg/server/agents/agents.go | 51 --- .../agents/testing/fake_agent_factory.go | 50 --- pkg/server/container_attach.go | 63 +++- pkg/server/container_create.go | 24 +- pkg/server/container_execsync.go | 8 +- pkg/server/container_start.go | 75 +++-- pkg/server/events.go | 6 +- pkg/server/helpers.go | 33 -- pkg/server/helpers_test.go | 94 ------ pkg/server/io/io.go | 317 ++++++++++++++++++ pkg/server/{agents => io}/logger.go | 66 ++-- pkg/server/{agents => io}/logger_test.go | 21 +- pkg/server/sandbox_run.go | 22 +- pkg/server/sandbox_stop.go | 1 + pkg/server/service.go | 4 - pkg/server/service_test.go | 11 - pkg/server/streaming.go | 3 +- pkg/store/container/container.go | 12 +- pkg/store/container/container_test.go | 34 ++ 24 files changed, 840 insertions(+), 371 deletions(-) create mode 100644 pkg/ioutil/write_closer.go create mode 100644 pkg/ioutil/write_closer_test.go create mode 100644 pkg/ioutil/writer_group.go create mode 100644 pkg/ioutil/writer_group_test.go delete mode 100644 pkg/server/agents/agents.go delete mode 100644 pkg/server/agents/testing/fake_agent_factory.go create mode 100644 pkg/server/io/io.go rename pkg/server/{agents => io}/logger.go (58%) rename pkg/server/{agents => io}/logger_test.go (82%) diff --git a/hack/test-cri.sh b/hack/test-cri.sh index ac1db448e..4925481dd 100755 --- a/hack/test-cri.sh +++ b/hack/test-cri.sh @@ -21,7 +21,7 @@ source $(dirname "${BASH_SOURCE[0]}")/test-utils.sh # FOCUS focuses the test to run. FOCUS=${FOCUS:-} # SKIP skips the test to skip. -SKIP=${SKIP:-"attach|RunAsUser"} +SKIP=${SKIP:-"RunAsUser"} REPORT_DIR=${REPORT_DIR:-"/tmp/test-cri"} if [[ -z "${GOPATH}" ]]; then diff --git a/pkg/ioutil/write_closer.go b/pkg/ioutil/write_closer.go new file mode 100644 index 000000000..d1fa8ed50 --- /dev/null +++ b/pkg/ioutil/write_closer.go @@ -0,0 +1,68 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ioutil + +import "io" + +// writeCloseInformer wraps passed in write closer with a close channel. +// Caller could wait on the close channel for the write closer to be +// closed. +type writeCloseInformer struct { + close chan struct{} + wc io.WriteCloser +} + +// NewWriteCloseInformer creates the writeCloseInformer from a write closer. +func NewWriteCloseInformer(wc io.WriteCloser) (io.WriteCloser, <-chan struct{}) { + close := make(chan struct{}) + return &writeCloseInformer{ + close: close, + wc: wc, + }, close +} + +// Write passes through the data into the internal write closer. +func (w *writeCloseInformer) Write(p []byte) (int, error) { + return w.wc.Write(p) +} + +// Close closes the internal write closer and inform the close channel. +func (w *writeCloseInformer) Close() error { + err := w.wc.Close() + close(w.close) + return err +} + +// nopWriteCloser wraps passed in writer with a nop close function. +type nopWriteCloser struct { + w io.Writer +} + +// NewNopWriteCloser creates the nopWriteCloser from a writer. +func NewNopWriteCloser(w io.Writer) io.WriteCloser { + return &nopWriteCloser{w: w} +} + +// Write passes through the data into the internal writer. +func (n *nopWriteCloser) Write(p []byte) (int, error) { + return n.w.Write(p) +} + +// Close is a nop close function. +func (n *nopWriteCloser) Close() error { + return nil +} diff --git a/pkg/ioutil/write_closer_test.go b/pkg/ioutil/write_closer_test.go new file mode 100644 index 000000000..67f6b02ed --- /dev/null +++ b/pkg/ioutil/write_closer_test.go @@ -0,0 +1,49 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ioutil + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWriteCloseInformer(t *testing.T) { + original := &writeCloser{} + wci, close := NewWriteCloseInformer(original) + data := "test" + + n, err := wci.Write([]byte(data)) + assert.Equal(t, len(data), n) + assert.Equal(t, data, original.buf.String()) + assert.NoError(t, err) + + select { + case <-close: + assert.Fail(t, "write closer closed") + default: + } + + wci.Close() + assert.True(t, original.closed) + + select { + case <-close: + default: + assert.Fail(t, "write closer not closed") + } +} diff --git a/pkg/ioutil/writer_group.go b/pkg/ioutil/writer_group.go new file mode 100644 index 000000000..603ecc7e5 --- /dev/null +++ b/pkg/ioutil/writer_group.go @@ -0,0 +1,103 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ioutil + +import ( + "errors" + "io" + "sync" + + "github.com/golang/glog" +) + +// WriterGroup is a group of writers. Writer could be dynamically +// added and removed. +type WriterGroup struct { + mu sync.Mutex + writers map[string]io.WriteCloser + closed bool +} + +var _ io.Writer = &WriterGroup{} + +// NewWriterGroup creates an empty writer group. +func NewWriterGroup() *WriterGroup { + return &WriterGroup{ + writers: make(map[string]io.WriteCloser), + } +} + +// Add adds a writer into the group, returns an error when writer +// group is closed. +func (g *WriterGroup) Add(key string, w io.WriteCloser) error { + g.mu.Lock() + defer g.mu.Unlock() + if g.closed { + return errors.New("wait group closed") + } + g.writers[key] = w + return nil +} + +// Remove removes a writer from the group. +func (g *WriterGroup) Remove(key string) { + g.mu.Lock() + defer g.mu.Unlock() + w, ok := g.writers[key] + if !ok { + return + } + w.Close() + delete(g.writers, key) +} + +// Write writes data into each writer. If a writer returns error, +// it will be closed and removed from the writer group. It returns +// error if writer group is empty. +func (g *WriterGroup) Write(p []byte) (int, error) { + g.mu.Lock() + defer g.mu.Unlock() + for k, w := range g.writers { + n, err := w.Write(p) + if err != nil { + glog.Errorf("Writer %q write error: %v", k, err) + } else if len(p) != n { + glog.Errorf("Writer %q short write error", k) + } else { + continue + } + // The writer is closed or in bad state, remove it. + w.Close() + delete(g.writers, k) + } + if len(g.writers) == 0 { + return 0, errors.New("writer group is empty") + } + return len(p), nil +} + +// Close closes the writer group. Write or Add will return error +// after closed. +func (g *WriterGroup) Close() { + g.mu.Lock() + defer g.mu.Unlock() + for _, w := range g.writers { + w.Close() + } + g.writers = nil + g.closed = true +} diff --git a/pkg/ioutil/writer_group_test.go b/pkg/ioutil/writer_group_test.go new file mode 100644 index 000000000..26bc940b5 --- /dev/null +++ b/pkg/ioutil/writer_group_test.go @@ -0,0 +1,94 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ioutil + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/assert" +) + +type writeCloser struct { + buf bytes.Buffer + closed bool +} + +func (wc *writeCloser) Write(p []byte) (int, error) { + return wc.buf.Write(p) +} + +func (wc *writeCloser) Close() error { + wc.closed = true + return nil +} + +func TestEmptyWriterGroup(t *testing.T) { + wg := NewWriterGroup() + _, err := wg.Write([]byte("test")) + assert.Error(t, err) +} + +func TestClosedWriterGroup(t *testing.T) { + wg := NewWriterGroup() + wc := &writeCloser{} + key, data := "test key", "test data" + + err := wg.Add(key, wc) + assert.NoError(t, err) + + n, err := wg.Write([]byte(data)) + assert.Equal(t, len(data), n) + assert.Equal(t, data, wc.buf.String()) + assert.NoError(t, err) + + wg.Close() + assert.True(t, wc.closed) + + err = wg.Add(key, &writeCloser{}) + assert.Error(t, err) + + _, err = wg.Write([]byte(data)) + assert.Error(t, err) +} + +func TestAddRemoveWriter(t *testing.T) { + wg := NewWriterGroup() + wc1, wc2 := &writeCloser{}, &writeCloser{} + key1, key2 := "test key 1", "test key 2" + + err := wg.Add(key1, wc1) + assert.NoError(t, err) + _, err = wg.Write([]byte("test data 1")) + assert.NoError(t, err) + assert.Equal(t, "test data 1", wc1.buf.String()) + + err = wg.Add(key2, wc2) + assert.NoError(t, err) + _, err = wg.Write([]byte("test data 2")) + assert.NoError(t, err) + assert.Equal(t, "test data 1test data 2", wc1.buf.String()) + assert.Equal(t, "test data 2", wc2.buf.String()) + + wg.Remove(key1) + _, err = wg.Write([]byte("test data 3")) + assert.NoError(t, err) + assert.Equal(t, "test data 1test data 2", wc1.buf.String()) + assert.Equal(t, "test data 2test data 3", wc2.buf.String()) + + wg.Close() +} diff --git a/pkg/server/agents/agents.go b/pkg/server/agents/agents.go deleted file mode 100644 index 1da854d52..000000000 --- a/pkg/server/agents/agents.go +++ /dev/null @@ -1,51 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package agents - -import "io" - -// StreamType is the type of the stream, stdout/stderr. -type StreamType string - -const ( - // Stdout stream type. - Stdout StreamType = "stdout" - // Stderr stream type. - Stderr StreamType = "stderr" -) - -// Agent is the a running agent perform a specific task, e.g. redirect and -// decorate log, redirect stream etc. -type Agent interface { - // Start starts the logger. - Start() error -} - -// AgentFactory is the factory to create required agents. -type AgentFactory interface { - // NewSandboxLogger creates a sandbox logging agent. - NewSandboxLogger(io.ReadCloser) Agent - // NewContainerLogger creates a container logging agent. - NewContainerLogger(string, StreamType, io.ReadCloser) Agent -} - -type agentFactory struct{} - -// NewAgentFactory creates a new agent factory. -func NewAgentFactory() AgentFactory { - return &agentFactory{} -} diff --git a/pkg/server/agents/testing/fake_agent_factory.go b/pkg/server/agents/testing/fake_agent_factory.go deleted file mode 100644 index ffb6408a1..000000000 --- a/pkg/server/agents/testing/fake_agent_factory.go +++ /dev/null @@ -1,50 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package testing - -import ( - "io" - - "github.com/kubernetes-incubator/cri-containerd/pkg/server/agents" -) - -// FakeAgent is a fake agent for test. -type FakeAgent struct{} - -// Start always return nil not. -// TODO(random-liu): Inject error and test error. -func (f *FakeAgent) Start() error { - return nil -} - -// FakeAgentFactory is a fake agent factory for test. -type FakeAgentFactory struct{} - -// NewFakeAgentFactory creates fake agent factory. -func NewFakeAgentFactory() agents.AgentFactory { - return &FakeAgentFactory{} -} - -// NewSandboxLogger creates a fake agent as sandbox logger. -func (*FakeAgentFactory) NewSandboxLogger(rc io.ReadCloser) agents.Agent { - return &FakeAgent{} -} - -// NewContainerLogger creates a fake agent as container logger. -func (*FakeAgentFactory) NewContainerLogger(string, agents.StreamType, io.ReadCloser) agents.Agent { - return &FakeAgent{} -} diff --git a/pkg/server/container_attach.go b/pkg/server/container_attach.go index 6b33018b6..a7621480c 100644 --- a/pkg/server/container_attach.go +++ b/pkg/server/container_attach.go @@ -17,14 +17,69 @@ limitations under the License. package server import ( - "errors" + "fmt" + "io" + "github.com/containerd/containerd" + "github.com/golang/glog" "golang.org/x/net/context" - + "k8s.io/client-go/tools/remotecommand" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) // Attach prepares a streaming endpoint to attach to a running container, and returns the address. -func (c *criContainerdService) Attach(ctx context.Context, r *runtime.AttachRequest) (*runtime.AttachResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) Attach(ctx context.Context, r *runtime.AttachRequest) (retRes *runtime.AttachResponse, retErr error) { + glog.V(2).Infof("Attach for %q with tty %v and stdin %v", r.GetContainerId(), r.GetTty(), r.GetStdin()) + defer func() { + if retErr == nil { + glog.V(2).Infof("Attach for %q returns URL %q", r.GetContainerId(), retRes.Url) + } + }() + + cntr, err := c.containerStore.Get(r.GetContainerId()) + if err != nil { + return nil, fmt.Errorf("failed to find container in store: %v", err) + } + state := cntr.Status.Get().State() + if state != runtime.ContainerState_CONTAINER_RUNNING { + return nil, fmt.Errorf("container is in %s state", criContainerStateToString(state)) + } + return c.streamServer.GetAttach(r) +} + +func (c *criContainerdService) attachContainer(ctx context.Context, id string, stdin io.Reader, stdout, stderr io.WriteCloser, + tty bool, resize <-chan remotecommand.TerminalSize) error { + // Get container from our container store. + cntr, err := c.containerStore.Get(id) + if err != nil { + return fmt.Errorf("failed to find container in store: %v", err) + } + id = cntr.ID + + state := cntr.Status.Get().State() + if state != runtime.ContainerState_CONTAINER_RUNNING { + return fmt.Errorf("container is in %s state", criContainerStateToString(state)) + } + + task, err := cntr.Container.Task(ctx, nil) + if err != nil { + return fmt.Errorf("failed to load task: %v", err) + } + handleResizing(resize, func(size remotecommand.TerminalSize) { + if err := task.Resize(ctx, uint32(size.Width), uint32(size.Height)); err != nil { + glog.Errorf("Failed to resize task %q console: %v", id, err) + } + }) + + // TODO(random-liu): Figure out whether we need to support historical output. + if err := cntr.IO.Attach(stdin, stdout, stderr); err != nil { + return fmt.Errorf("failed to attach container: %v", err) + } + + // Close stdin after first attach if StdinOnce is specified, otherwise stdin will + // be kept open until container exits. + if cntr.Config.StdinOnce { + task.CloseIO(ctx, containerd.WithStdinCloser) + } + return nil } diff --git a/pkg/server/container_create.go b/pkg/server/container_create.go index 136e8b8ca..ed69d428b 100644 --- a/pkg/server/container_create.go +++ b/pkg/server/container_create.go @@ -30,6 +30,7 @@ import ( "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" + cio "github.com/kubernetes-incubator/cri-containerd/pkg/server/io" containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container" "github.com/kubernetes-incubator/cri-containerd/pkg/util" ) @@ -119,6 +120,21 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C } }() + containerIO, err := cio.NewContainerIO(id, + cio.WithStdin(config.GetStdin()), + cio.WithTerminal(config.GetTty()), + ) + if err != nil { + return nil, fmt.Errorf("failed to create container io: %v", err) + } + defer func() { + if retErr != nil { + if err := containerIO.Close(); err != nil { + glog.Errorf("Failed to close container io %q : %v", id, err) + } + } + }() + metaBytes, err := meta.Encode() if err != nil { return nil, fmt.Errorf("failed to convert sandbox metadata: %+v, %v", meta, err) @@ -143,9 +159,11 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C } }() - container, err := containerstore.NewContainer(meta, - containerstore.Status{CreatedAt: time.Now().UnixNano()}, - containerstore.WithContainer(cntr)) + status := containerstore.Status{CreatedAt: time.Now().UnixNano()} + container, err := containerstore.NewContainer(meta, status, + containerstore.WithContainer(cntr), + containerstore.WithContainerIO(containerIO), + ) if err != nil { return nil, fmt.Errorf("failed to create internal container object for %q: %v", id, err) diff --git a/pkg/server/container_execsync.go b/pkg/server/container_execsync.go index 80180386a..3c0f3c4ed 100644 --- a/pkg/server/container_execsync.go +++ b/pkg/server/container_execsync.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "io" + "io/ioutil" "time" "github.com/containerd/containerd" @@ -115,9 +116,14 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o pspec.Terminal = opts.tty if opts.stdin == nil { - // Create empty buffer if stdin is nil. opts.stdin = new(bytes.Buffer) } + if opts.stdout == nil { + opts.stdout = ioutil.Discard + } + if opts.stderr == nil { + opts.stderr = ioutil.Discard + } execID := util.GenerateID() process, err := task.Exec(ctx, execID, pspec, containerd.NewIOWithTerminal( opts.stdin, diff --git a/pkg/server/container_start.go b/pkg/server/container_start.go index 12c3d7724..73dde8b14 100644 --- a/pkg/server/container_start.go +++ b/pkg/server/container_start.go @@ -17,7 +17,6 @@ limitations under the License. package server import ( - "bytes" "fmt" "io" "path/filepath" @@ -28,7 +27,7 @@ import ( "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" - "github.com/kubernetes-incubator/cri-containerd/pkg/server/agents" + cio "github.com/kubernetes-incubator/cri-containerd/pkg/server/io" containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container" ) @@ -45,19 +44,18 @@ func (c *criContainerdService) StartContainer(ctx context.Context, r *runtime.St if err != nil { return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err) } - id := container.ID var startErr error // update container status in one transaction to avoid race with event monitor. if err := container.Status.Update(func(status containerstore.Status) (containerstore.Status, error) { // Always apply status change no matter startContainer fails or not. Because startContainer // may change container state no matter it fails or succeeds. - startErr = c.startContainer(ctx, container.Container, container.Metadata, &status) + startErr = c.startContainer(ctx, container, &status) return status, nil }); startErr != nil { return nil, startErr } else if err != nil { - return nil, fmt.Errorf("failed to update container %q metadata: %v", id, err) + return nil, fmt.Errorf("failed to update container %q metadata: %v", container.ID, err) } return &runtime.StartContainerResponse{}, nil } @@ -65,11 +63,12 @@ func (c *criContainerdService) StartContainer(ctx context.Context, r *runtime.St // startContainer actually starts the container. The function needs to be run in one transaction. Any updates // to the status passed in will be applied no matter the function returns error or not. func (c *criContainerdService) startContainer(ctx context.Context, - container containerd.Container, - meta containerstore.Metadata, + cntr containerstore.Container, status *containerstore.Status) (retErr error) { + id := cntr.ID + meta := cntr.Metadata + container := cntr.Container config := meta.Config - id := container.ID() // Return error if container is not in created state. if status.State() != runtime.ContainerState_CONTAINER_CREATED { @@ -109,37 +108,49 @@ func (c *criContainerdService) startContainer(ctx context.Context, if err != nil { return fmt.Errorf("failed to get task status for sandbox container %q: %v", id, err) } - if taskStatus.Status != containerd.Running { return fmt.Errorf("sandbox container %q is not running", sandboxID) } - // Redirect the stream to std for now. - // TODO(random-liu): [P1] Support StdinOnce after container logging is added. - rStdoutPipe, wStdoutPipe := io.Pipe() - rStderrPipe, wStderrPipe := io.Pipe() - stdin := new(bytes.Buffer) - defer func() { - if retErr != nil { - rStdoutPipe.Close() - rStderrPipe.Close() - } - }() - if config.GetLogPath() != "" { - // Only generate container log when log path is specified. - logPath := filepath.Join(sandboxConfig.GetLogDirectory(), config.GetLogPath()) - if err = c.agentFactory.NewContainerLogger(logPath, agents.Stdout, rStdoutPipe).Start(); err != nil { - return fmt.Errorf("failed to start container stdout logger: %v", err) - } - // Only redirect stderr when there is no tty. - if !config.GetTty() { - if err = c.agentFactory.NewContainerLogger(logPath, agents.Stderr, rStderrPipe).Start(); err != nil { - return fmt.Errorf("failed to start container stderr logger: %v", err) + ioCreation := func(id string) (_ containerd.IO, err error) { + var stdoutWC, stderrWC io.WriteCloser + defer func() { + if err != nil { + if stdoutWC != nil { + stdoutWC.Close() + } + if stderrWC != nil { + stderrWC.Close() + } } + }() + if config.GetLogPath() != "" { + // Only generate container log when log path is specified. + logPath := filepath.Join(sandboxConfig.GetLogDirectory(), config.GetLogPath()) + if stdoutWC, err = cio.NewCRILogger(logPath, cio.Stdout); err != nil { + return nil, fmt.Errorf("failed to start container stdout logger: %v", err) + } + // Only redirect stderr when there is no tty. + if !config.GetTty() { + if stderrWC, err = cio.NewCRILogger(logPath, cio.Stderr); err != nil { + return nil, fmt.Errorf("failed to start container stderr logger: %v", err) + } + } + } else { + stdoutWC = cio.NewDiscardLogger() + stderrWC = cio.NewDiscardLogger() } + + if err := cio.WithOutput("log", stdoutWC, stderrWC)(cntr.IO); err != nil { + return nil, fmt.Errorf("failed to add container log: %v", err) + } + if err := cntr.IO.Pipe(); err != nil { + return nil, fmt.Errorf("failed to pipe container io: %v", err) + } + return cntr.IO, nil } - //TODO(Abhi): close stdin/pass a managed IOCreation - task, err := container.NewTask(ctx, containerd.NewIO(stdin, wStdoutPipe, wStderrPipe)) + + task, err := container.NewTask(ctx, ioCreation) if err != nil { return fmt.Errorf("failed to create containerd task: %v", err) } diff --git a/pkg/server/events.go b/pkg/server/events.go index 004dbe657..02adcb1d4 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -110,11 +110,11 @@ func (c *criContainerdService) handleEvent(evt *events.Envelope) { glog.Errorf("failed to stop container, task not found for container %q: %v", e.ContainerID, err) return } - } - if task != nil { + } else { + // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker if _, err = task.Delete(context.Background()); err != nil { + // TODO(random-liu): [P0] Enqueue the event and retry. if !errdefs.IsNotFound(err) { - // TODO(random-liu): [P0] Enqueue the event and retry. glog.Errorf("failed to stop container %q: %v", e.ContainerID, err) return } diff --git a/pkg/server/helpers.go b/pkg/server/helpers.go index b4ecffbcc..ccfc45e83 100644 --- a/pkg/server/helpers.go +++ b/pkg/server/helpers.go @@ -19,12 +19,10 @@ package server import ( "encoding/json" "fmt" - "io" "os" "path/filepath" "strconv" "strings" - "syscall" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" @@ -157,37 +155,6 @@ func getSandboxDevShm(sandboxRootDir string) string { return filepath.Join(sandboxRootDir, "shm") } -// prepareStreamingPipes prepares stream named pipe for container. returns nil -// streaming handler if corresponding stream path is empty. -func (c *criContainerdService) prepareStreamingPipes(ctx context.Context, stdin, stdout, stderr string) ( - i io.WriteCloser, o io.ReadCloser, e io.ReadCloser, retErr error) { - pipes := map[string]io.ReadWriteCloser{} - for t, stream := range map[string]struct { - path string - flag int - }{ - "stdin": {stdin, syscall.O_WRONLY | syscall.O_CREAT | syscall.O_NONBLOCK}, - "stdout": {stdout, syscall.O_RDONLY | syscall.O_CREAT | syscall.O_NONBLOCK}, - "stderr": {stderr, syscall.O_RDONLY | syscall.O_CREAT | syscall.O_NONBLOCK}, - } { - if stream.path == "" { - continue - } - s, err := c.os.OpenFifo(ctx, stream.path, stream.flag, 0700) - if err != nil { - return nil, nil, nil, fmt.Errorf("failed to open named pipe %q: %v", - stream.path, err) - } - defer func(cl io.Closer) { - if retErr != nil { - cl.Close() - } - }(s) - pipes[t] = s - } - return pipes["stdin"], pipes["stdout"], pipes["stderr"], nil -} - // getNetworkNamespace returns the network namespace of a process. func getNetworkNamespace(pid uint32) string { return fmt.Sprintf(netNSFormat, pid) diff --git a/pkg/server/helpers_test.go b/pkg/server/helpers_test.go index 02fd1b7bc..40302f105 100644 --- a/pkg/server/helpers_test.go +++ b/pkg/server/helpers_test.go @@ -17,107 +17,13 @@ limitations under the License. package server import ( - "fmt" - "io" - "os" - "syscall" "testing" "github.com/containerd/containerd/reference" imagedigest "github.com/opencontainers/go-digest" "github.com/stretchr/testify/assert" - "golang.org/x/net/context" - - ostesting "github.com/kubernetes-incubator/cri-containerd/pkg/os/testing" ) -func TestPrepareStreamingPipes(t *testing.T) { - for desc, test := range map[string]struct { - stdin string - stdout string - stderr string - }{ - "empty stdin": { - stdout: "/test/stdout", - stderr: "/test/stderr", - }, - "empty stdout/stderr": { - stdin: "/test/stdin", - }, - "non-empty stdio": { - stdin: "/test/stdin", - stdout: "/test/stdout", - stderr: "/test/stderr", - }, - "empty stdio": {}, - } { - t.Logf("TestCase %q", desc) - c := newTestCRIContainerdService() - fakeOS := c.os.(*ostesting.FakeOS) - fakeOS.OpenFifoFn = func(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) { - expectFlag := syscall.O_RDONLY | syscall.O_CREAT | syscall.O_NONBLOCK - if fn == test.stdin { - expectFlag = syscall.O_WRONLY | syscall.O_CREAT | syscall.O_NONBLOCK - } - assert.Equal(t, expectFlag, flag) - assert.Equal(t, os.FileMode(0700), perm) - return nopReadWriteCloser{}, nil - } - i, o, e, err := c.prepareStreamingPipes(context.Background(), test.stdin, test.stdout, test.stderr) - assert.NoError(t, err) - assert.Equal(t, test.stdin != "", i != nil) - assert.Equal(t, test.stdout != "", o != nil) - assert.Equal(t, test.stderr != "", e != nil) - } -} - -type closeTestReadWriteCloser struct { - CloseFn func() error - nopReadWriteCloser -} - -func (c closeTestReadWriteCloser) Close() error { - return c.CloseFn() -} - -func TestPrepareStreamingPipesError(t *testing.T) { - stdin, stdout, stderr := "/test/stdin", "/test/stdout", "/test/stderr" - for desc, inject := range map[string]map[string]error{ - "should cleanup on stdin error": {stdin: fmt.Errorf("stdin error")}, - "should cleanup on stdout error": {stdout: fmt.Errorf("stdout error")}, - "should cleanup on stderr error": {stderr: fmt.Errorf("stderr error")}, - } { - t.Logf("TestCase %q", desc) - c := newTestCRIContainerdService() - fakeOS := c.os.(*ostesting.FakeOS) - openFlags := map[string]bool{ - stdin: false, - stdout: false, - stderr: false, - } - fakeOS.OpenFifoFn = func(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) { - if inject[fn] != nil { - return nil, inject[fn] - } - openFlags[fn] = !openFlags[fn] - testCloser := closeTestReadWriteCloser{} - testCloser.CloseFn = func() error { - openFlags[fn] = !openFlags[fn] - return nil - } - return testCloser, nil - } - i, o, e, err := c.prepareStreamingPipes(context.Background(), stdin, stdout, stderr) - assert.Error(t, err) - assert.Nil(t, i) - assert.Nil(t, o) - assert.Nil(t, e) - assert.False(t, openFlags[stdin]) - assert.False(t, openFlags[stdout]) - assert.False(t, openFlags[stderr]) - } -} - func TestNormalizeImageRef(t *testing.T) { for _, test := range []struct { input string diff --git a/pkg/server/io/io.go b/pkg/server/io/io.go new file mode 100644 index 000000000..855d9ed11 --- /dev/null +++ b/pkg/server/io/io.go @@ -0,0 +1,317 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package agents + +import ( + "errors" + "io" + "os" + "strings" + "sync" + "syscall" + + "github.com/containerd/containerd" + "github.com/containerd/fifo" + "github.com/golang/glog" + "golang.org/x/net/context" + + "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil" + "github.com/kubernetes-incubator/cri-containerd/pkg/util" +) + +// StreamType is the type of the stream, stdout/stderr. +type StreamType string + +const ( + // Stdin stream type. + Stdin StreamType = "stdin" + // Stdout stream type. + Stdout StreamType = "stdout" + // Stderr stream type. + Stderr StreamType = "stderr" +) + +type wgCloser struct { + ctx context.Context + wg *sync.WaitGroup + set []io.Closer + cancel context.CancelFunc +} + +func (g *wgCloser) Wait() { + g.wg.Wait() +} + +func (g *wgCloser) Close() { + for _, f := range g.set { + f.Close() + } +} + +func (g *wgCloser) Cancel() { + g.cancel() +} + +// streamKey generates a key for the stream. +func streamKey(id, name string, stream StreamType) string { + return strings.Join([]string{id, name, string(stream)}, "-") +} + +// ContainerIO holds the container io. +type ContainerIO struct { + dir string + stdinPath string + stdoutPath string + stderrPath string + + id string + tty bool + stdin bool + stdout *ioutil.WriterGroup + stderr *ioutil.WriterGroup + + closer *wgCloser +} + +var _ containerd.IO = &ContainerIO{} + +// Opts sets specific information to newly created ContainerIO. +type Opts func(*ContainerIO) error + +// WithStdin enables stdin of the container io. +func WithStdin(stdin bool) Opts { + return func(c *ContainerIO) error { + c.stdin = stdin + return nil + } +} + +// WithOutput adds output stream to the container io. +func WithOutput(name string, stdout, stderr io.WriteCloser) Opts { + return func(c *ContainerIO) error { + if stdout != nil { + if err := c.stdout.Add(streamKey(c.id, name, Stdout), stdout); err != nil { + return err + } + } + if stderr != nil { + if err := c.stderr.Add(streamKey(c.id, name, Stderr), stderr); err != nil { + return err + } + } + return nil + } +} + +// WithTerminal enables tty of the container io. +func WithTerminal(tty bool) Opts { + return func(c *ContainerIO) error { + c.tty = tty + return nil + } +} + +// NewContainerIO creates container io. +func NewContainerIO(id string, opts ...Opts) (*ContainerIO, error) { + fifos, err := containerd.NewFifos(id) + if err != nil { + return nil, err + } + c := &ContainerIO{ + id: id, + dir: fifos.Dir, + stdoutPath: fifos.Out, + stderrPath: fifos.Err, + stdout: ioutil.NewWriterGroup(), + stderr: ioutil.NewWriterGroup(), + } + for _, opt := range opts { + if err := opt(c); err != nil { + return nil, err + } + } + if c.stdin { + c.stdinPath = fifos.In + } + return c, nil +} + +// Config returns io config. +func (c *ContainerIO) Config() containerd.IOConfig { + return containerd.IOConfig{ + Terminal: c.tty, + Stdin: c.stdinPath, + Stdout: c.stdoutPath, + Stderr: c.stderrPath, + } +} + +// Pipe creates container fifos and pipe container output +// to output stream. +func (c *ContainerIO) Pipe() (err error) { + var ( + f io.ReadWriteCloser + set []io.Closer + ctx, cancel = context.WithCancel(context.Background()) + wg = &sync.WaitGroup{} + ) + defer func() { + if err != nil { + for _, f := range set { + f.Close() + } + cancel() + } + }() + if c.stdinPath != "" { + // Just create the stdin, only open it when used. + if f, err = fifo.OpenFifo(ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + return err + } + f.Close() + } + + if f, err = fifo.OpenFifo(ctx, c.stdoutPath, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + return err + } + set = append(set, f) + wg.Add(1) + go func(r io.ReadCloser) { + if _, err := io.Copy(c.stdout, r); err != nil { + glog.Errorf("Failed to redirect stdout of container %q: %v", c.id, err) + } + r.Close() + c.stdout.Close() + wg.Done() + glog.V(2).Infof("Finish piping stdout of container %q", c.id) + }(f) + + if f, err = fifo.OpenFifo(ctx, c.stderrPath, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { + return err + } + set = append(set, f) + if !c.tty { + wg.Add(1) + go func(r io.ReadCloser) { + if _, err := io.Copy(c.stderr, r); err != nil { + glog.Errorf("Failed to redirect stderr of container %q: %v", c.id, err) + } + r.Close() + c.stderr.Close() + wg.Done() + glog.V(2).Infof("Finish piping stderr of container %q", c.id) + }(f) + } + c.closer = &wgCloser{ + wg: wg, + set: set, + ctx: ctx, + cancel: cancel, + } + return nil +} + +// Attach attaches container stdio. +func (c *ContainerIO) Attach(stdin io.Reader, stdout, stderr io.WriteCloser) error { + if c.closer == nil { + return errors.New("container io is not initialized") + } + var wg sync.WaitGroup + key := util.GenerateID() + stdinKey := streamKey(c.id, "attach-"+key, Stdin) + stdoutKey := streamKey(c.id, "attach-"+key, Stdout) + stderrKey := streamKey(c.id, "attach-"+key, Stderr) + + var stdinCloser io.Closer + if c.stdinPath != "" && stdin != nil { + f, err := fifo.OpenFifo(c.closer.ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_NONBLOCK, 0700) + if err != nil { + return err + } + // Also increase wait group here, so that `closer.Wait` will + // also wait for this fifo to be closed. + c.closer.wg.Add(1) + wg.Add(1) + go func(w io.WriteCloser) { + if _, err := io.Copy(w, stdin); err != nil { + glog.Errorf("Failed to redirect stdin for container attach %q: %v", c.id, err) + } + w.Close() + glog.V(2).Infof("Attach stream %q closed", stdinKey) + // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr + if stdout != nil { + c.stdout.Remove(stdoutKey) + } + if stderr != nil { + c.stderr.Remove(stderrKey) + } + wg.Done() + c.closer.wg.Done() + }(f) + stdinCloser = f + } + + attachStream := func(key string, close <-chan struct{}) { + <-close + glog.V(2).Infof("Attach stream %q closed", key) + // Make sure stdin gets closed. + if stdinCloser != nil { + stdinCloser.Close() + } + wg.Done() + } + + if stdout != nil { + wg.Add(1) + wc, close := ioutil.NewWriteCloseInformer(stdout) + if err := c.stdout.Add(stdoutKey, wc); err != nil { + return err + } + go attachStream(stdoutKey, close) + } + if !c.tty && stderr != nil { + wg.Add(1) + wc, close := ioutil.NewWriteCloseInformer(stderr) + if err := c.stderr.Add(stderrKey, wc); err != nil { + return err + } + go attachStream(stderrKey, close) + } + wg.Wait() + return nil +} + +// Cancel cancels container io. +func (c *ContainerIO) Cancel() { + c.closer.Cancel() +} + +// Wait waits container io to finish. +func (c *ContainerIO) Wait() { + c.closer.Wait() +} + +// Close closes all FIFOs. +func (c *ContainerIO) Close() error { + if c.closer != nil { + c.closer.Close() + } + if c.dir != "" { + return os.RemoveAll(c.dir) + } + return nil +} diff --git a/pkg/server/agents/logger.go b/pkg/server/io/logger.go similarity index 58% rename from pkg/server/agents/logger.go rename to pkg/server/io/logger.go index ebcb748d3..871507051 100644 --- a/pkg/server/agents/logger.go +++ b/pkg/server/io/logger.go @@ -26,6 +26,8 @@ import ( "time" "github.com/golang/glog" + + cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil" ) const ( @@ -42,67 +44,39 @@ const ( bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - 2 /*2 delimiter*/ - 1 /*eol*/ ) -// sandboxLogger is the log agent used for sandbox. -// It discards sandbox all output for now. -type sandboxLogger struct { - rc io.ReadCloser +// NewDiscardLogger creates logger which discards all the input. +func NewDiscardLogger() io.WriteCloser { + return cioutil.NewNopWriteCloser(ioutil.Discard) } -func (*agentFactory) NewSandboxLogger(rc io.ReadCloser) Agent { - return &sandboxLogger{rc: rc} -} - -func (s *sandboxLogger) Start() error { - go func() { - // Discard the output for now. - io.Copy(ioutil.Discard, s.rc) // nolint: errcheck - s.rc.Close() - }() - return nil -} - -// containerLogger is the log agent used for container. -// It redirect container log into CRI log file, and decorate the log -// line into CRI defined format. -type containerLogger struct { - path string - stream StreamType - rc io.ReadCloser -} - -func (*agentFactory) NewContainerLogger(path string, stream StreamType, rc io.ReadCloser) Agent { - return &containerLogger{ - path: path, - stream: stream, - rc: rc, - } -} - -func (c *containerLogger) Start() error { - glog.V(4).Infof("Start reading log file %q", c.path) - wc, err := os.OpenFile(c.path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640) +// NewCRILogger returns a write closer which redirect container log into +// log file, and decorate the log line into CRI defined format. +func NewCRILogger(path string, stream StreamType) (io.WriteCloser, error) { + glog.V(4).Infof("Start reading log file %q", path) + prc, pwc := io.Pipe() + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640) if err != nil { - return fmt.Errorf("failed to open log file %q: %v", c.path, err) + return nil, fmt.Errorf("failed to open log file: %v", err) } - go c.redirectLogs(wc) - return nil + go redirectLogs(path, prc, f, stream) + return pwc, nil } -func (c *containerLogger) redirectLogs(wc io.WriteCloser) { - defer c.rc.Close() +func redirectLogs(path string, rc io.ReadCloser, wc io.WriteCloser, stream StreamType) { + defer rc.Close() defer wc.Close() - streamBytes := []byte(c.stream) + streamBytes := []byte(stream) delimiterBytes := []byte{delimiter} - r := bufio.NewReaderSize(c.rc, bufSize) + r := bufio.NewReaderSize(rc, bufSize) for { // TODO(random-liu): Better define CRI log format, and escape newline in log. lineBytes, _, err := r.ReadLine() if err == io.EOF { - glog.V(4).Infof("Finish redirecting log file %q", c.path) + glog.V(4).Infof("Finish redirecting log file %q", path) return } if err != nil { - glog.Errorf("An error occurred when redirecting log file %q: %v", c.path, err) + glog.Errorf("An error occurred when redirecting log file %q: %v", path, err) return } timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano) diff --git a/pkg/server/agents/logger_test.go b/pkg/server/io/logger_test.go similarity index 82% rename from pkg/server/agents/logger_test.go rename to pkg/server/io/logger_test.go index c7a958530..746768ba6 100644 --- a/pkg/server/agents/logger_test.go +++ b/pkg/server/io/logger_test.go @@ -25,20 +25,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil" ) -// writeCloserBuffer is a writecloser wrapper for bytes.Buffer -// with a nop closer. -type writeCloserBuffer struct { - *bytes.Buffer -} - -// nop close -func (*writeCloserBuffer) Close() error { return nil } - func TestRedirectLogs(t *testing.T) { - f := NewAgentFactory() - // f.NewContainerLogger( for desc, test := range map[string]struct { input string stream StreamType @@ -71,10 +62,10 @@ func TestRedirectLogs(t *testing.T) { } { t.Logf("TestCase %q", desc) rc := ioutil.NopCloser(strings.NewReader(test.input)) - c := f.NewContainerLogger("test-path", test.stream, rc).(*containerLogger) - wc := &writeCloserBuffer{bytes.NewBuffer(nil)} - c.redirectLogs(wc) - output := wc.String() + buf := bytes.NewBuffer(nil) + wc := cioutil.NewNopWriteCloser(buf) + redirectLogs("test-path", rc, wc, test.stream) + output := buf.String() lines := strings.Split(output, "\n") lines = lines[:len(lines)-1] // Discard empty string after last \n assert.Len(t, lines, len(test.content)) diff --git a/pkg/server/sandbox_run.go b/pkg/server/sandbox_run.go index 46d1bec17..ee17a8c82 100644 --- a/pkg/server/sandbox_run.go +++ b/pkg/server/sandbox_run.go @@ -17,9 +17,7 @@ limitations under the License. package server import ( - "bytes" "fmt" - "io" "os" "strings" "time" @@ -136,22 +134,6 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run } }() - // Discard sandbox container output because we don't care about it. - rStdoutPipe, wStdoutPipe := io.Pipe() - rStderrPipe, wStderrPipe := io.Pipe() - defer func() { - if retErr != nil { - rStdoutPipe.Close() - rStderrPipe.Close() - } - }() - if err := c.agentFactory.NewSandboxLogger(rStdoutPipe).Start(); err != nil { - return nil, fmt.Errorf("failed to start sandbox stdout logger: %v", err) - } - if err := c.agentFactory.NewSandboxLogger(rStderrPipe).Start(); err != nil { - return nil, fmt.Errorf("failed to start sandbox stderr logger: %v", err) - } - // Setup sandbox /dev/shm, /etc/hosts and /etc/resolv.conf. if err = c.setupSandboxFiles(sandboxRootDir, config); err != nil { return nil, fmt.Errorf("failed to setup sandbox files: %v", err) @@ -168,8 +150,8 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run // Create sandbox task in containerd. glog.V(5).Infof("Create sandbox container (id=%q, name=%q).", id, name) - //TODO(Abhi): close the stdin or pass newIOCreation with /dev/null stdin - task, err := container.NewTask(ctx, containerd.NewIO(new(bytes.Buffer), wStdoutPipe, wStderrPipe)) + // We don't need stdio for sandbox container. + task, err := container.NewTask(ctx, containerd.NullIO) if err != nil { return nil, fmt.Errorf("failed to create task for sandbox %q: %v", id, err) } diff --git a/pkg/server/sandbox_stop.go b/pkg/server/sandbox_stop.go index 924915ffb..13e1a504c 100644 --- a/pkg/server/sandbox_stop.go +++ b/pkg/server/sandbox_stop.go @@ -107,5 +107,6 @@ func (c *criContainerdService) stopSandboxContainer(ctx context.Context, contain if err != nil && !errdefs.IsNotFound(err) { return fmt.Errorf("failed to delete sandbox container: %v", err) } + return nil } diff --git a/pkg/server/service.go b/pkg/server/service.go index 5a70dfd0d..76f8cda07 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -31,7 +31,6 @@ import ( osinterface "github.com/kubernetes-incubator/cri-containerd/pkg/os" "github.com/kubernetes-incubator/cri-containerd/pkg/registrar" - "github.com/kubernetes-incubator/cri-containerd/pkg/server/agents" containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container" imagestore "github.com/kubernetes-incubator/cri-containerd/pkg/store/image" sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox" @@ -79,8 +78,6 @@ type criContainerdService struct { eventService events.EventsClient // netPlugin is used to setup and teardown network when run/stop pod sandbox. netPlugin ocicni.CNIPlugin - // agentFactory is the factory to create agent used in the cri containerd service. - agentFactory agents.AgentFactory // client is an instance of the containerd client client *containerd.Client // streamServer is the streaming server serves container streaming request. @@ -111,7 +108,6 @@ func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, n imageStoreService: client.ImageService(), eventService: client.EventService(), contentStoreService: client.ContentStore(), - agentFactory: agents.NewAgentFactory(), client: client, } diff --git a/pkg/server/service_test.go b/pkg/server/service_test.go index 7d92dc3fc..5ad55b88d 100644 --- a/pkg/server/service_test.go +++ b/pkg/server/service_test.go @@ -17,24 +17,14 @@ limitations under the License. package server import ( - "io" - ostesting "github.com/kubernetes-incubator/cri-containerd/pkg/os/testing" "github.com/kubernetes-incubator/cri-containerd/pkg/registrar" - agentstesting "github.com/kubernetes-incubator/cri-containerd/pkg/server/agents/testing" servertesting "github.com/kubernetes-incubator/cri-containerd/pkg/server/testing" containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container" imagestore "github.com/kubernetes-incubator/cri-containerd/pkg/store/image" sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox" ) -type nopReadWriteCloser struct{} - -// Return error directly to avoid read/write. -func (nopReadWriteCloser) Read(p []byte) (n int, err error) { return 0, io.EOF } -func (nopReadWriteCloser) Write(p []byte) (n int, err error) { return 0, io.ErrShortWrite } -func (nopReadWriteCloser) Close() error { return nil } - const ( testRootDir = "/test/rootfs" // Use an image id as test sandbox image to avoid image name resolve. @@ -55,6 +45,5 @@ func newTestCRIContainerdService() *criContainerdService { containerStore: containerstore.NewStore(), containerNameIndex: registrar.NewRegistrar(), netPlugin: servertesting.NewFakeCNIPlugin(), - agentFactory: agentstesting.NewFakeAgentFactory(), } } diff --git a/pkg/server/streaming.go b/pkg/server/streaming.go index 9cdd36ab1..d089db848 100644 --- a/pkg/server/streaming.go +++ b/pkg/server/streaming.go @@ -17,7 +17,6 @@ limitations under the License. package server import ( - "errors" "fmt" "io" "math" @@ -79,7 +78,7 @@ func (s *streamRuntime) Exec(containerID string, cmd []string, stdin io.Reader, func (s *streamRuntime) Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { - return errors.New("not implemented") + return s.c.attachContainer(context.Background(), containerID, in, out, err, tty, resize) } func (s *streamRuntime) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error { diff --git a/pkg/store/container/container.go b/pkg/store/container/container.go index 33cfa5e45..057b1dd70 100644 --- a/pkg/store/container/container.go +++ b/pkg/store/container/container.go @@ -21,6 +21,7 @@ import ( "github.com/containerd/containerd" + cio "github.com/kubernetes-incubator/cri-containerd/pkg/server/io" "github.com/kubernetes-incubator/cri-containerd/pkg/store" ) @@ -33,6 +34,9 @@ type Container struct { Status StatusStorage // Containerd container Container containerd.Container + // Container IO + IO *cio.ContainerIO + // TODO(random-liu): Add stop channel to get rid of stop poll waiting. } // Opts sets specific information to newly created Container. @@ -45,6 +49,13 @@ func WithContainer(cntr containerd.Container) Opts { } } +// WithContainerIO adds IO into the container. +func WithContainerIO(io *cio.ContainerIO) Opts { + return func(c *Container) { + c.IO = io + } +} + // NewContainer creates an internally used container type. func NewContainer(metadata Metadata, status Status, opts ...Opts) (Container, error) { s, err := StoreStatus(metadata.ID, status) @@ -55,7 +66,6 @@ func NewContainer(metadata Metadata, status Status, opts ...Opts) (Container, er Metadata: metadata, Status: s, } - for _, o := range opts { o(&c) } diff --git a/pkg/store/container/container_test.go b/pkg/store/container/container_test.go index 0e40b556e..176d29f61 100644 --- a/pkg/store/container/container_test.go +++ b/pkg/store/container/container_test.go @@ -23,6 +23,7 @@ import ( assertlib "github.com/stretchr/testify/assert" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" + cio "github.com/kubernetes-incubator/cri-containerd/pkg/server/io" "github.com/kubernetes-incubator/cri-containerd/pkg/store" ) @@ -136,3 +137,36 @@ func TestContainerStore(t *testing.T) { assert.Equal(Container{}, c) assert.Equal(store.ErrNotExist, err) } + +func TestWithContainerIO(t *testing.T) { + meta := Metadata{ + ID: "1", + Name: "Container-1", + SandboxID: "Sandbox-1", + Config: &runtime.ContainerConfig{ + Metadata: &runtime.ContainerMetadata{ + Name: "TestPod-1", + Attempt: 1, + }, + }, + ImageRef: "TestImage-1", + } + status := Status{ + Pid: 1, + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + FinishedAt: time.Now().UnixNano(), + ExitCode: 1, + Reason: "TestReason-1", + Message: "TestMessage-1", + } + assert := assertlib.New(t) + + c, err := NewContainer(meta, status) + assert.NoError(err) + assert.Nil(c.IO) + + c, err = NewContainer(meta, status, WithContainerIO(&cio.ContainerIO{})) + assert.NoError(err) + assert.NotNil(c.IO) +}