Merge pull request #128 from Random-Liu/add-container-attach

Add container attach
This commit is contained in:
Lantao Liu 2017-08-23 17:20:04 -07:00 committed by GitHub
commit 60c7f5127e
25 changed files with 871 additions and 380 deletions

View File

@ -21,7 +21,7 @@ source $(dirname "${BASH_SOURCE[0]}")/test-utils.sh
# FOCUS focuses the test to run. # FOCUS focuses the test to run.
FOCUS=${FOCUS:-} FOCUS=${FOCUS:-}
# SKIP skips the test to skip. # SKIP skips the test to skip.
SKIP=${SKIP:-"attach|RunAsUser"} SKIP=${SKIP:-"RunAsUser"}
REPORT_DIR=${REPORT_DIR:-"/tmp/test-cri"} REPORT_DIR=${REPORT_DIR:-"/tmp/test-cri"}
if [[ -z "${GOPATH}" ]]; then if [[ -z "${GOPATH}" ]]; then

View File

@ -0,0 +1,68 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ioutil
import "io"
// writeCloseInformer wraps passed in write closer with a close channel.
// Caller could wait on the close channel for the write closer to be
// closed.
type writeCloseInformer struct {
close chan struct{}
wc io.WriteCloser
}
// NewWriteCloseInformer creates the writeCloseInformer from a write closer.
func NewWriteCloseInformer(wc io.WriteCloser) (io.WriteCloser, <-chan struct{}) {
close := make(chan struct{})
return &writeCloseInformer{
close: close,
wc: wc,
}, close
}
// Write passes through the data into the internal write closer.
func (w *writeCloseInformer) Write(p []byte) (int, error) {
return w.wc.Write(p)
}
// Close closes the internal write closer and inform the close channel.
func (w *writeCloseInformer) Close() error {
err := w.wc.Close()
close(w.close)
return err
}
// nopWriteCloser wraps passed in writer with a nop close function.
type nopWriteCloser struct {
w io.Writer
}
// NewNopWriteCloser creates the nopWriteCloser from a writer.
func NewNopWriteCloser(w io.Writer) io.WriteCloser {
return &nopWriteCloser{w: w}
}
// Write passes through the data into the internal writer.
func (n *nopWriteCloser) Write(p []byte) (int, error) {
return n.w.Write(p)
}
// Close is a nop close function.
func (n *nopWriteCloser) Close() error {
return nil
}

View File

@ -0,0 +1,49 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ioutil
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestWriteCloseInformer(t *testing.T) {
original := &writeCloser{}
wci, close := NewWriteCloseInformer(original)
data := "test"
n, err := wci.Write([]byte(data))
assert.Equal(t, len(data), n)
assert.Equal(t, data, original.buf.String())
assert.NoError(t, err)
select {
case <-close:
assert.Fail(t, "write closer closed")
default:
}
wci.Close()
assert.True(t, original.closed)
select {
case <-close:
default:
assert.Fail(t, "write closer not closed")
}
}

103
pkg/ioutil/writer_group.go Normal file
View File

@ -0,0 +1,103 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ioutil
import (
"errors"
"io"
"sync"
"github.com/golang/glog"
)
// WriterGroup is a group of writers. Writer could be dynamically
// added and removed.
type WriterGroup struct {
mu sync.Mutex
writers map[string]io.WriteCloser
closed bool
}
var _ io.Writer = &WriterGroup{}
// NewWriterGroup creates an empty writer group.
func NewWriterGroup() *WriterGroup {
return &WriterGroup{
writers: make(map[string]io.WriteCloser),
}
}
// Add adds a writer into the group, returns an error when writer
// group is closed.
func (g *WriterGroup) Add(key string, w io.WriteCloser) error {
g.mu.Lock()
defer g.mu.Unlock()
if g.closed {
return errors.New("wait group closed")
}
g.writers[key] = w
return nil
}
// Remove removes a writer from the group.
func (g *WriterGroup) Remove(key string) {
g.mu.Lock()
defer g.mu.Unlock()
w, ok := g.writers[key]
if !ok {
return
}
w.Close()
delete(g.writers, key)
}
// Write writes data into each writer. If a writer returns error,
// it will be closed and removed from the writer group. It returns
// error if writer group is empty.
func (g *WriterGroup) Write(p []byte) (int, error) {
g.mu.Lock()
defer g.mu.Unlock()
for k, w := range g.writers {
n, err := w.Write(p)
if err != nil {
glog.Errorf("Writer %q write error: %v", k, err)
} else if len(p) != n {
glog.Errorf("Writer %q short write error", k)
} else {
continue
}
// The writer is closed or in bad state, remove it.
w.Close()
delete(g.writers, k)
}
if len(g.writers) == 0 {
return 0, errors.New("writer group is empty")
}
return len(p), nil
}
// Close closes the writer group. Write or Add will return error
// after closed.
func (g *WriterGroup) Close() {
g.mu.Lock()
defer g.mu.Unlock()
for _, w := range g.writers {
w.Close()
}
g.writers = nil
g.closed = true
}

View File

@ -0,0 +1,94 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ioutil
import (
"bytes"
"testing"
"github.com/stretchr/testify/assert"
)
type writeCloser struct {
buf bytes.Buffer
closed bool
}
func (wc *writeCloser) Write(p []byte) (int, error) {
return wc.buf.Write(p)
}
func (wc *writeCloser) Close() error {
wc.closed = true
return nil
}
func TestEmptyWriterGroup(t *testing.T) {
wg := NewWriterGroup()
_, err := wg.Write([]byte("test"))
assert.Error(t, err)
}
func TestClosedWriterGroup(t *testing.T) {
wg := NewWriterGroup()
wc := &writeCloser{}
key, data := "test key", "test data"
err := wg.Add(key, wc)
assert.NoError(t, err)
n, err := wg.Write([]byte(data))
assert.Equal(t, len(data), n)
assert.Equal(t, data, wc.buf.String())
assert.NoError(t, err)
wg.Close()
assert.True(t, wc.closed)
err = wg.Add(key, &writeCloser{})
assert.Error(t, err)
_, err = wg.Write([]byte(data))
assert.Error(t, err)
}
func TestAddRemoveWriter(t *testing.T) {
wg := NewWriterGroup()
wc1, wc2 := &writeCloser{}, &writeCloser{}
key1, key2 := "test key 1", "test key 2"
err := wg.Add(key1, wc1)
assert.NoError(t, err)
_, err = wg.Write([]byte("test data 1"))
assert.NoError(t, err)
assert.Equal(t, "test data 1", wc1.buf.String())
err = wg.Add(key2, wc2)
assert.NoError(t, err)
_, err = wg.Write([]byte("test data 2"))
assert.NoError(t, err)
assert.Equal(t, "test data 1test data 2", wc1.buf.String())
assert.Equal(t, "test data 2", wc2.buf.String())
wg.Remove(key1)
_, err = wg.Write([]byte("test data 3"))
assert.NoError(t, err)
assert.Equal(t, "test data 1test data 2", wc1.buf.String())
assert.Equal(t, "test data 2test data 3", wc2.buf.String())
wg.Close()
}

View File

@ -1,51 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package agents
import "io"
// StreamType is the type of the stream, stdout/stderr.
type StreamType string
const (
// Stdout stream type.
Stdout StreamType = "stdout"
// Stderr stream type.
Stderr StreamType = "stderr"
)
// Agent is the a running agent perform a specific task, e.g. redirect and
// decorate log, redirect stream etc.
type Agent interface {
// Start starts the logger.
Start() error
}
// AgentFactory is the factory to create required agents.
type AgentFactory interface {
// NewSandboxLogger creates a sandbox logging agent.
NewSandboxLogger(io.ReadCloser) Agent
// NewContainerLogger creates a container logging agent.
NewContainerLogger(string, StreamType, io.ReadCloser) Agent
}
type agentFactory struct{}
// NewAgentFactory creates a new agent factory.
func NewAgentFactory() AgentFactory {
return &agentFactory{}
}

View File

@ -1,50 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"io"
"github.com/kubernetes-incubator/cri-containerd/pkg/server/agents"
)
// FakeAgent is a fake agent for test.
type FakeAgent struct{}
// Start always return nil not.
// TODO(random-liu): Inject error and test error.
func (f *FakeAgent) Start() error {
return nil
}
// FakeAgentFactory is a fake agent factory for test.
type FakeAgentFactory struct{}
// NewFakeAgentFactory creates fake agent factory.
func NewFakeAgentFactory() agents.AgentFactory {
return &FakeAgentFactory{}
}
// NewSandboxLogger creates a fake agent as sandbox logger.
func (*FakeAgentFactory) NewSandboxLogger(rc io.ReadCloser) agents.Agent {
return &FakeAgent{}
}
// NewContainerLogger creates a fake agent as container logger.
func (*FakeAgentFactory) NewContainerLogger(string, agents.StreamType, io.ReadCloser) agents.Agent {
return &FakeAgent{}
}

View File

@ -17,14 +17,69 @@ limitations under the License.
package server package server
import ( import (
"errors" "fmt"
"io"
"github.com/containerd/containerd"
"github.com/golang/glog"
"golang.org/x/net/context" "golang.org/x/net/context"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
) )
// Attach prepares a streaming endpoint to attach to a running container, and returns the address. // Attach prepares a streaming endpoint to attach to a running container, and returns the address.
func (c *criContainerdService) Attach(ctx context.Context, r *runtime.AttachRequest) (*runtime.AttachResponse, error) { func (c *criContainerdService) Attach(ctx context.Context, r *runtime.AttachRequest) (retRes *runtime.AttachResponse, retErr error) {
return nil, errors.New("not implemented") glog.V(2).Infof("Attach for %q with tty %v and stdin %v", r.GetContainerId(), r.GetTty(), r.GetStdin())
defer func() {
if retErr == nil {
glog.V(2).Infof("Attach for %q returns URL %q", r.GetContainerId(), retRes.Url)
}
}()
cntr, err := c.containerStore.Get(r.GetContainerId())
if err != nil {
return nil, fmt.Errorf("failed to find container in store: %v", err)
}
state := cntr.Status.Get().State()
if state != runtime.ContainerState_CONTAINER_RUNNING {
return nil, fmt.Errorf("container is in %s state", criContainerStateToString(state))
}
return c.streamServer.GetAttach(r)
}
func (c *criContainerdService) attachContainer(ctx context.Context, id string, stdin io.Reader, stdout, stderr io.WriteCloser,
tty bool, resize <-chan remotecommand.TerminalSize) error {
// Get container from our container store.
cntr, err := c.containerStore.Get(id)
if err != nil {
return fmt.Errorf("failed to find container in store: %v", err)
}
id = cntr.ID
state := cntr.Status.Get().State()
if state != runtime.ContainerState_CONTAINER_RUNNING {
return fmt.Errorf("container is in %s state", criContainerStateToString(state))
}
task, err := cntr.Container.Task(ctx, nil)
if err != nil {
return fmt.Errorf("failed to load task: %v", err)
}
handleResizing(resize, func(size remotecommand.TerminalSize) {
if err := task.Resize(ctx, uint32(size.Width), uint32(size.Height)); err != nil {
glog.Errorf("Failed to resize task %q console: %v", id, err)
}
})
// TODO(random-liu): Figure out whether we need to support historical output.
if err := cntr.IO.Attach(stdin, stdout, stderr); err != nil {
return fmt.Errorf("failed to attach container: %v", err)
}
// Close stdin after first attach if StdinOnce is specified, otherwise stdin will
// be kept open until container exits.
if cntr.Config.StdinOnce {
task.CloseIO(ctx, containerd.WithStdinCloser)
}
return nil
} }

View File

@ -30,7 +30,9 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
cio "github.com/kubernetes-incubator/cri-containerd/pkg/server/io"
containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container" containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container"
"github.com/kubernetes-incubator/cri-containerd/pkg/util"
) )
// CreateContainer creates a new container in the given PodSandbox. // CreateContainer creates a new container in the given PodSandbox.
@ -54,7 +56,7 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C
// Generate unique id and name for the container and reserve the name. // Generate unique id and name for the container and reserve the name.
// Reserve the container name to avoid concurrent `CreateContainer` request creating // Reserve the container name to avoid concurrent `CreateContainer` request creating
// the same container. // the same container.
id := generateID() id := util.GenerateID()
name := makeContainerName(config.GetMetadata(), sandboxConfig.GetMetadata()) name := makeContainerName(config.GetMetadata(), sandboxConfig.GetMetadata())
if err = c.containerNameIndex.Reserve(name, id); err != nil { if err = c.containerNameIndex.Reserve(name, id); err != nil {
return nil, fmt.Errorf("failed to reserve container name %q: %v", name, err) return nil, fmt.Errorf("failed to reserve container name %q: %v", name, err)
@ -118,6 +120,21 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C
} }
}() }()
containerIO, err := cio.NewContainerIO(id,
cio.WithStdin(config.GetStdin()),
cio.WithTerminal(config.GetTty()),
)
if err != nil {
return nil, fmt.Errorf("failed to create container io: %v", err)
}
defer func() {
if retErr != nil {
if err := containerIO.Close(); err != nil {
glog.Errorf("Failed to close container io %q : %v", id, err)
}
}
}()
metaBytes, err := meta.Encode() metaBytes, err := meta.Encode()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to convert sandbox metadata: %+v, %v", meta, err) return nil, fmt.Errorf("failed to convert sandbox metadata: %+v, %v", meta, err)
@ -142,9 +159,11 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C
} }
}() }()
container, err := containerstore.NewContainer(meta, status := containerstore.Status{CreatedAt: time.Now().UnixNano()}
containerstore.Status{CreatedAt: time.Now().UnixNano()}, container, err := containerstore.NewContainer(meta, status,
containerstore.WithContainer(cntr)) containerstore.WithContainer(cntr),
containerstore.WithContainerIO(containerIO),
)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create internal container object for %q: %v", return nil, fmt.Errorf("failed to create internal container object for %q: %v",
id, err) id, err)

View File

@ -20,6 +20,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"time" "time"
"github.com/containerd/containerd" "github.com/containerd/containerd"
@ -29,6 +30,8 @@ import (
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/tools/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"github.com/kubernetes-incubator/cri-containerd/pkg/util"
) )
// ExecSync executes a command in the container, and returns the stdout output. // ExecSync executes a command in the container, and returns the stdout output.
@ -113,10 +116,15 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o
pspec.Terminal = opts.tty pspec.Terminal = opts.tty
if opts.stdin == nil { if opts.stdin == nil {
// Create empty buffer if stdin is nil.
opts.stdin = new(bytes.Buffer) opts.stdin = new(bytes.Buffer)
} }
execID := generateID() if opts.stdout == nil {
opts.stdout = ioutil.Discard
}
if opts.stderr == nil {
opts.stderr = ioutil.Discard
}
execID := util.GenerateID()
process, err := task.Exec(ctx, execID, pspec, containerd.NewIOWithTerminal( process, err := task.Exec(ctx, execID, pspec, containerd.NewIOWithTerminal(
opts.stdin, opts.stdin,
opts.stdout, opts.stdout,

View File

@ -17,7 +17,6 @@ limitations under the License.
package server package server
import ( import (
"bytes"
"fmt" "fmt"
"io" "io"
"path/filepath" "path/filepath"
@ -28,7 +27,7 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"github.com/kubernetes-incubator/cri-containerd/pkg/server/agents" cio "github.com/kubernetes-incubator/cri-containerd/pkg/server/io"
containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container" containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container"
) )
@ -45,19 +44,18 @@ func (c *criContainerdService) StartContainer(ctx context.Context, r *runtime.St
if err != nil { if err != nil {
return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err) return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err)
} }
id := container.ID
var startErr error var startErr error
// update container status in one transaction to avoid race with event monitor. // update container status in one transaction to avoid race with event monitor.
if err := container.Status.Update(func(status containerstore.Status) (containerstore.Status, error) { if err := container.Status.Update(func(status containerstore.Status) (containerstore.Status, error) {
// Always apply status change no matter startContainer fails or not. Because startContainer // Always apply status change no matter startContainer fails or not. Because startContainer
// may change container state no matter it fails or succeeds. // may change container state no matter it fails or succeeds.
startErr = c.startContainer(ctx, container.Container, container.Metadata, &status) startErr = c.startContainer(ctx, container, &status)
return status, nil return status, nil
}); startErr != nil { }); startErr != nil {
return nil, startErr return nil, startErr
} else if err != nil { } else if err != nil {
return nil, fmt.Errorf("failed to update container %q metadata: %v", id, err) return nil, fmt.Errorf("failed to update container %q metadata: %v", container.ID, err)
} }
return &runtime.StartContainerResponse{}, nil return &runtime.StartContainerResponse{}, nil
} }
@ -65,11 +63,12 @@ func (c *criContainerdService) StartContainer(ctx context.Context, r *runtime.St
// startContainer actually starts the container. The function needs to be run in one transaction. Any updates // startContainer actually starts the container. The function needs to be run in one transaction. Any updates
// to the status passed in will be applied no matter the function returns error or not. // to the status passed in will be applied no matter the function returns error or not.
func (c *criContainerdService) startContainer(ctx context.Context, func (c *criContainerdService) startContainer(ctx context.Context,
container containerd.Container, cntr containerstore.Container,
meta containerstore.Metadata,
status *containerstore.Status) (retErr error) { status *containerstore.Status) (retErr error) {
id := cntr.ID
meta := cntr.Metadata
container := cntr.Container
config := meta.Config config := meta.Config
id := container.ID()
// Return error if container is not in created state. // Return error if container is not in created state.
if status.State() != runtime.ContainerState_CONTAINER_CREATED { if status.State() != runtime.ContainerState_CONTAINER_CREATED {
@ -109,37 +108,49 @@ func (c *criContainerdService) startContainer(ctx context.Context,
if err != nil { if err != nil {
return fmt.Errorf("failed to get task status for sandbox container %q: %v", id, err) return fmt.Errorf("failed to get task status for sandbox container %q: %v", id, err)
} }
if taskStatus.Status != containerd.Running { if taskStatus.Status != containerd.Running {
return fmt.Errorf("sandbox container %q is not running", sandboxID) return fmt.Errorf("sandbox container %q is not running", sandboxID)
} }
// Redirect the stream to std for now. ioCreation := func(id string) (_ containerd.IO, err error) {
// TODO(random-liu): [P1] Support StdinOnce after container logging is added. var stdoutWC, stderrWC io.WriteCloser
rStdoutPipe, wStdoutPipe := io.Pipe() defer func() {
rStderrPipe, wStderrPipe := io.Pipe() if err != nil {
stdin := new(bytes.Buffer) if stdoutWC != nil {
defer func() { stdoutWC.Close()
if retErr != nil { }
rStdoutPipe.Close() if stderrWC != nil {
rStderrPipe.Close() stderrWC.Close()
} }
}()
if config.GetLogPath() != "" {
// Only generate container log when log path is specified.
logPath := filepath.Join(sandboxConfig.GetLogDirectory(), config.GetLogPath())
if err = c.agentFactory.NewContainerLogger(logPath, agents.Stdout, rStdoutPipe).Start(); err != nil {
return fmt.Errorf("failed to start container stdout logger: %v", err)
}
// Only redirect stderr when there is no tty.
if !config.GetTty() {
if err = c.agentFactory.NewContainerLogger(logPath, agents.Stderr, rStderrPipe).Start(); err != nil {
return fmt.Errorf("failed to start container stderr logger: %v", err)
} }
}()
if config.GetLogPath() != "" {
// Only generate container log when log path is specified.
logPath := filepath.Join(sandboxConfig.GetLogDirectory(), config.GetLogPath())
if stdoutWC, err = cio.NewCRILogger(logPath, cio.Stdout); err != nil {
return nil, fmt.Errorf("failed to start container stdout logger: %v", err)
}
// Only redirect stderr when there is no tty.
if !config.GetTty() {
if stderrWC, err = cio.NewCRILogger(logPath, cio.Stderr); err != nil {
return nil, fmt.Errorf("failed to start container stderr logger: %v", err)
}
}
} else {
stdoutWC = cio.NewDiscardLogger()
stderrWC = cio.NewDiscardLogger()
} }
if err := cio.WithOutput("log", stdoutWC, stderrWC)(cntr.IO); err != nil {
return nil, fmt.Errorf("failed to add container log: %v", err)
}
if err := cntr.IO.Pipe(); err != nil {
return nil, fmt.Errorf("failed to pipe container io: %v", err)
}
return cntr.IO, nil
} }
//TODO(Abhi): close stdin/pass a managed IOCreation
task, err := container.NewTask(ctx, containerd.NewIO(stdin, wStdoutPipe, wStderrPipe)) task, err := container.NewTask(ctx, ioCreation)
if err != nil { if err != nil {
return fmt.Errorf("failed to create containerd task: %v", err) return fmt.Errorf("failed to create containerd task: %v", err)
} }

View File

@ -110,11 +110,11 @@ func (c *criContainerdService) handleEvent(evt *events.Envelope) {
glog.Errorf("failed to stop container, task not found for container %q: %v", e.ContainerID, err) glog.Errorf("failed to stop container, task not found for container %q: %v", e.ContainerID, err)
return return
} }
} } else {
if task != nil { // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(context.Background()); err != nil { if _, err = task.Delete(context.Background()); err != nil {
// TODO(random-liu): [P0] Enqueue the event and retry.
if !errdefs.IsNotFound(err) { if !errdefs.IsNotFound(err) {
// TODO(random-liu): [P0] Enqueue the event and retry.
glog.Errorf("failed to stop container %q: %v", e.ContainerID, err) glog.Errorf("failed to stop container %q: %v", e.ContainerID, err)
return return
} }

View File

@ -19,17 +19,14 @@ package server
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"syscall"
"github.com/containerd/containerd/content" "github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/docker/distribution/reference" "github.com/docker/distribution/reference"
"github.com/docker/docker/pkg/stringid"
imagedigest "github.com/opencontainers/go-digest" imagedigest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/identity" "github.com/opencontainers/image-spec/identity"
imagespec "github.com/opencontainers/image-spec/specs-go/v1" imagespec "github.com/opencontainers/image-spec/specs-go/v1"
@ -102,11 +99,6 @@ const (
containerMetadataLabel = "io.cri-containerd.container.metadata" containerMetadataLabel = "io.cri-containerd.container.metadata"
) )
// generateID generates a random unique id.
func generateID() string {
return stringid.GenerateNonCryptoID()
}
// makeSandboxName generates sandbox name from sandbox metadata. The name // makeSandboxName generates sandbox name from sandbox metadata. The name
// generated is unique as long as sandbox metadata is unique. // generated is unique as long as sandbox metadata is unique.
func makeSandboxName(s *runtime.PodSandboxMetadata) string { func makeSandboxName(s *runtime.PodSandboxMetadata) string {
@ -163,37 +155,6 @@ func getSandboxDevShm(sandboxRootDir string) string {
return filepath.Join(sandboxRootDir, "shm") return filepath.Join(sandboxRootDir, "shm")
} }
// prepareStreamingPipes prepares stream named pipe for container. returns nil
// streaming handler if corresponding stream path is empty.
func (c *criContainerdService) prepareStreamingPipes(ctx context.Context, stdin, stdout, stderr string) (
i io.WriteCloser, o io.ReadCloser, e io.ReadCloser, retErr error) {
pipes := map[string]io.ReadWriteCloser{}
for t, stream := range map[string]struct {
path string
flag int
}{
"stdin": {stdin, syscall.O_WRONLY | syscall.O_CREAT | syscall.O_NONBLOCK},
"stdout": {stdout, syscall.O_RDONLY | syscall.O_CREAT | syscall.O_NONBLOCK},
"stderr": {stderr, syscall.O_RDONLY | syscall.O_CREAT | syscall.O_NONBLOCK},
} {
if stream.path == "" {
continue
}
s, err := c.os.OpenFifo(ctx, stream.path, stream.flag, 0700)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to open named pipe %q: %v",
stream.path, err)
}
defer func(cl io.Closer) {
if retErr != nil {
cl.Close()
}
}(s)
pipes[t] = s
}
return pipes["stdin"], pipes["stdout"], pipes["stderr"], nil
}
// getNetworkNamespace returns the network namespace of a process. // getNetworkNamespace returns the network namespace of a process.
func getNetworkNamespace(pid uint32) string { func getNetworkNamespace(pid uint32) string {
return fmt.Sprintf(netNSFormat, pid) return fmt.Sprintf(netNSFormat, pid)

View File

@ -17,107 +17,13 @@ limitations under the License.
package server package server
import ( import (
"fmt"
"io"
"os"
"syscall"
"testing" "testing"
"github.com/containerd/containerd/reference" "github.com/containerd/containerd/reference"
imagedigest "github.com/opencontainers/go-digest" imagedigest "github.com/opencontainers/go-digest"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"golang.org/x/net/context"
ostesting "github.com/kubernetes-incubator/cri-containerd/pkg/os/testing"
) )
func TestPrepareStreamingPipes(t *testing.T) {
for desc, test := range map[string]struct {
stdin string
stdout string
stderr string
}{
"empty stdin": {
stdout: "/test/stdout",
stderr: "/test/stderr",
},
"empty stdout/stderr": {
stdin: "/test/stdin",
},
"non-empty stdio": {
stdin: "/test/stdin",
stdout: "/test/stdout",
stderr: "/test/stderr",
},
"empty stdio": {},
} {
t.Logf("TestCase %q", desc)
c := newTestCRIContainerdService()
fakeOS := c.os.(*ostesting.FakeOS)
fakeOS.OpenFifoFn = func(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) {
expectFlag := syscall.O_RDONLY | syscall.O_CREAT | syscall.O_NONBLOCK
if fn == test.stdin {
expectFlag = syscall.O_WRONLY | syscall.O_CREAT | syscall.O_NONBLOCK
}
assert.Equal(t, expectFlag, flag)
assert.Equal(t, os.FileMode(0700), perm)
return nopReadWriteCloser{}, nil
}
i, o, e, err := c.prepareStreamingPipes(context.Background(), test.stdin, test.stdout, test.stderr)
assert.NoError(t, err)
assert.Equal(t, test.stdin != "", i != nil)
assert.Equal(t, test.stdout != "", o != nil)
assert.Equal(t, test.stderr != "", e != nil)
}
}
type closeTestReadWriteCloser struct {
CloseFn func() error
nopReadWriteCloser
}
func (c closeTestReadWriteCloser) Close() error {
return c.CloseFn()
}
func TestPrepareStreamingPipesError(t *testing.T) {
stdin, stdout, stderr := "/test/stdin", "/test/stdout", "/test/stderr"
for desc, inject := range map[string]map[string]error{
"should cleanup on stdin error": {stdin: fmt.Errorf("stdin error")},
"should cleanup on stdout error": {stdout: fmt.Errorf("stdout error")},
"should cleanup on stderr error": {stderr: fmt.Errorf("stderr error")},
} {
t.Logf("TestCase %q", desc)
c := newTestCRIContainerdService()
fakeOS := c.os.(*ostesting.FakeOS)
openFlags := map[string]bool{
stdin: false,
stdout: false,
stderr: false,
}
fakeOS.OpenFifoFn = func(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) {
if inject[fn] != nil {
return nil, inject[fn]
}
openFlags[fn] = !openFlags[fn]
testCloser := closeTestReadWriteCloser{}
testCloser.CloseFn = func() error {
openFlags[fn] = !openFlags[fn]
return nil
}
return testCloser, nil
}
i, o, e, err := c.prepareStreamingPipes(context.Background(), stdin, stdout, stderr)
assert.Error(t, err)
assert.Nil(t, i)
assert.Nil(t, o)
assert.Nil(t, e)
assert.False(t, openFlags[stdin])
assert.False(t, openFlags[stdout])
assert.False(t, openFlags[stderr])
}
}
func TestNormalizeImageRef(t *testing.T) { func TestNormalizeImageRef(t *testing.T) {
for _, test := range []struct { for _, test := range []struct {
input string input string

317
pkg/server/io/io.go Normal file
View File

@ -0,0 +1,317 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package agents
import (
"errors"
"io"
"os"
"strings"
"sync"
"syscall"
"github.com/containerd/containerd"
"github.com/containerd/fifo"
"github.com/golang/glog"
"golang.org/x/net/context"
"github.com/kubernetes-incubator/cri-containerd/pkg/ioutil"
"github.com/kubernetes-incubator/cri-containerd/pkg/util"
)
// StreamType is the type of the stream, stdout/stderr.
type StreamType string
const (
// Stdin stream type.
Stdin StreamType = "stdin"
// Stdout stream type.
Stdout StreamType = "stdout"
// Stderr stream type.
Stderr StreamType = "stderr"
)
type wgCloser struct {
ctx context.Context
wg *sync.WaitGroup
set []io.Closer
cancel context.CancelFunc
}
func (g *wgCloser) Wait() {
g.wg.Wait()
}
func (g *wgCloser) Close() {
for _, f := range g.set {
f.Close()
}
}
func (g *wgCloser) Cancel() {
g.cancel()
}
// streamKey generates a key for the stream.
func streamKey(id, name string, stream StreamType) string {
return strings.Join([]string{id, name, string(stream)}, "-")
}
// ContainerIO holds the container io.
type ContainerIO struct {
dir string
stdinPath string
stdoutPath string
stderrPath string
id string
tty bool
stdin bool
stdout *ioutil.WriterGroup
stderr *ioutil.WriterGroup
closer *wgCloser
}
var _ containerd.IO = &ContainerIO{}
// Opts sets specific information to newly created ContainerIO.
type Opts func(*ContainerIO) error
// WithStdin enables stdin of the container io.
func WithStdin(stdin bool) Opts {
return func(c *ContainerIO) error {
c.stdin = stdin
return nil
}
}
// WithOutput adds output stream to the container io.
func WithOutput(name string, stdout, stderr io.WriteCloser) Opts {
return func(c *ContainerIO) error {
if stdout != nil {
if err := c.stdout.Add(streamKey(c.id, name, Stdout), stdout); err != nil {
return err
}
}
if stderr != nil {
if err := c.stderr.Add(streamKey(c.id, name, Stderr), stderr); err != nil {
return err
}
}
return nil
}
}
// WithTerminal enables tty of the container io.
func WithTerminal(tty bool) Opts {
return func(c *ContainerIO) error {
c.tty = tty
return nil
}
}
// NewContainerIO creates container io.
func NewContainerIO(id string, opts ...Opts) (*ContainerIO, error) {
fifos, err := containerd.NewFifos(id)
if err != nil {
return nil, err
}
c := &ContainerIO{
id: id,
dir: fifos.Dir,
stdoutPath: fifos.Out,
stderrPath: fifos.Err,
stdout: ioutil.NewWriterGroup(),
stderr: ioutil.NewWriterGroup(),
}
for _, opt := range opts {
if err := opt(c); err != nil {
return nil, err
}
}
if c.stdin {
c.stdinPath = fifos.In
}
return c, nil
}
// Config returns io config.
func (c *ContainerIO) Config() containerd.IOConfig {
return containerd.IOConfig{
Terminal: c.tty,
Stdin: c.stdinPath,
Stdout: c.stdoutPath,
Stderr: c.stderrPath,
}
}
// Pipe creates container fifos and pipe container output
// to output stream.
func (c *ContainerIO) Pipe() (err error) {
var (
f io.ReadWriteCloser
set []io.Closer
ctx, cancel = context.WithCancel(context.Background())
wg = &sync.WaitGroup{}
)
defer func() {
if err != nil {
for _, f := range set {
f.Close()
}
cancel()
}
}()
if c.stdinPath != "" {
// Just create the stdin, only open it when used.
if f, err = fifo.OpenFifo(ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return err
}
f.Close()
}
if f, err = fifo.OpenFifo(ctx, c.stdoutPath, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return err
}
set = append(set, f)
wg.Add(1)
go func(r io.ReadCloser) {
if _, err := io.Copy(c.stdout, r); err != nil {
glog.Errorf("Failed to redirect stdout of container %q: %v", c.id, err)
}
r.Close()
c.stdout.Close()
wg.Done()
glog.V(2).Infof("Finish piping stdout of container %q", c.id)
}(f)
if f, err = fifo.OpenFifo(ctx, c.stderrPath, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return err
}
set = append(set, f)
if !c.tty {
wg.Add(1)
go func(r io.ReadCloser) {
if _, err := io.Copy(c.stderr, r); err != nil {
glog.Errorf("Failed to redirect stderr of container %q: %v", c.id, err)
}
r.Close()
c.stderr.Close()
wg.Done()
glog.V(2).Infof("Finish piping stderr of container %q", c.id)
}(f)
}
c.closer = &wgCloser{
wg: wg,
set: set,
ctx: ctx,
cancel: cancel,
}
return nil
}
// Attach attaches container stdio.
func (c *ContainerIO) Attach(stdin io.Reader, stdout, stderr io.WriteCloser) error {
if c.closer == nil {
return errors.New("container io is not initialized")
}
var wg sync.WaitGroup
key := util.GenerateID()
stdinKey := streamKey(c.id, "attach-"+key, Stdin)
stdoutKey := streamKey(c.id, "attach-"+key, Stdout)
stderrKey := streamKey(c.id, "attach-"+key, Stderr)
var stdinCloser io.Closer
if c.stdinPath != "" && stdin != nil {
f, err := fifo.OpenFifo(c.closer.ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_NONBLOCK, 0700)
if err != nil {
return err
}
// Also increase wait group here, so that `closer.Wait` will
// also wait for this fifo to be closed.
c.closer.wg.Add(1)
wg.Add(1)
go func(w io.WriteCloser) {
if _, err := io.Copy(w, stdin); err != nil {
glog.Errorf("Failed to redirect stdin for container attach %q: %v", c.id, err)
}
w.Close()
glog.V(2).Infof("Attach stream %q closed", stdinKey)
// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
if stdout != nil {
c.stdout.Remove(stdoutKey)
}
if stderr != nil {
c.stderr.Remove(stderrKey)
}
wg.Done()
c.closer.wg.Done()
}(f)
stdinCloser = f
}
attachStream := func(key string, close <-chan struct{}) {
<-close
glog.V(2).Infof("Attach stream %q closed", key)
// Make sure stdin gets closed.
if stdinCloser != nil {
stdinCloser.Close()
}
wg.Done()
}
if stdout != nil {
wg.Add(1)
wc, close := ioutil.NewWriteCloseInformer(stdout)
if err := c.stdout.Add(stdoutKey, wc); err != nil {
return err
}
go attachStream(stdoutKey, close)
}
if !c.tty && stderr != nil {
wg.Add(1)
wc, close := ioutil.NewWriteCloseInformer(stderr)
if err := c.stderr.Add(stderrKey, wc); err != nil {
return err
}
go attachStream(stderrKey, close)
}
wg.Wait()
return nil
}
// Cancel cancels container io.
func (c *ContainerIO) Cancel() {
c.closer.Cancel()
}
// Wait waits container io to finish.
func (c *ContainerIO) Wait() {
c.closer.Wait()
}
// Close closes all FIFOs.
func (c *ContainerIO) Close() error {
if c.closer != nil {
c.closer.Close()
}
if c.dir != "" {
return os.RemoveAll(c.dir)
}
return nil
}

View File

@ -26,6 +26,8 @@ import (
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil"
) )
const ( const (
@ -42,67 +44,39 @@ const (
bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - 2 /*2 delimiter*/ - 1 /*eol*/ bufSize = pipeBufSize - len(timestampFormat) - len(Stdout) - 2 /*2 delimiter*/ - 1 /*eol*/
) )
// sandboxLogger is the log agent used for sandbox. // NewDiscardLogger creates logger which discards all the input.
// It discards sandbox all output for now. func NewDiscardLogger() io.WriteCloser {
type sandboxLogger struct { return cioutil.NewNopWriteCloser(ioutil.Discard)
rc io.ReadCloser
} }
func (*agentFactory) NewSandboxLogger(rc io.ReadCloser) Agent { // NewCRILogger returns a write closer which redirect container log into
return &sandboxLogger{rc: rc} // log file, and decorate the log line into CRI defined format.
} func NewCRILogger(path string, stream StreamType) (io.WriteCloser, error) {
glog.V(4).Infof("Start reading log file %q", path)
func (s *sandboxLogger) Start() error { prc, pwc := io.Pipe()
go func() { f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
// Discard the output for now.
io.Copy(ioutil.Discard, s.rc) // nolint: errcheck
s.rc.Close()
}()
return nil
}
// containerLogger is the log agent used for container.
// It redirect container log into CRI log file, and decorate the log
// line into CRI defined format.
type containerLogger struct {
path string
stream StreamType
rc io.ReadCloser
}
func (*agentFactory) NewContainerLogger(path string, stream StreamType, rc io.ReadCloser) Agent {
return &containerLogger{
path: path,
stream: stream,
rc: rc,
}
}
func (c *containerLogger) Start() error {
glog.V(4).Infof("Start reading log file %q", c.path)
wc, err := os.OpenFile(c.path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
if err != nil { if err != nil {
return fmt.Errorf("failed to open log file %q: %v", c.path, err) return nil, fmt.Errorf("failed to open log file: %v", err)
} }
go c.redirectLogs(wc) go redirectLogs(path, prc, f, stream)
return nil return pwc, nil
} }
func (c *containerLogger) redirectLogs(wc io.WriteCloser) { func redirectLogs(path string, rc io.ReadCloser, wc io.WriteCloser, stream StreamType) {
defer c.rc.Close() defer rc.Close()
defer wc.Close() defer wc.Close()
streamBytes := []byte(c.stream) streamBytes := []byte(stream)
delimiterBytes := []byte{delimiter} delimiterBytes := []byte{delimiter}
r := bufio.NewReaderSize(c.rc, bufSize) r := bufio.NewReaderSize(rc, bufSize)
for { for {
// TODO(random-liu): Better define CRI log format, and escape newline in log. // TODO(random-liu): Better define CRI log format, and escape newline in log.
lineBytes, _, err := r.ReadLine() lineBytes, _, err := r.ReadLine()
if err == io.EOF { if err == io.EOF {
glog.V(4).Infof("Finish redirecting log file %q", c.path) glog.V(4).Infof("Finish redirecting log file %q", path)
return return
} }
if err != nil { if err != nil {
glog.Errorf("An error occurred when redirecting log file %q: %v", c.path, err) glog.Errorf("An error occurred when redirecting log file %q: %v", path, err)
return return
} }
timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano) timestampBytes := time.Now().AppendFormat(nil, time.RFC3339Nano)

View File

@ -25,20 +25,11 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil"
) )
// writeCloserBuffer is a writecloser wrapper for bytes.Buffer
// with a nop closer.
type writeCloserBuffer struct {
*bytes.Buffer
}
// nop close
func (*writeCloserBuffer) Close() error { return nil }
func TestRedirectLogs(t *testing.T) { func TestRedirectLogs(t *testing.T) {
f := NewAgentFactory()
// f.NewContainerLogger(
for desc, test := range map[string]struct { for desc, test := range map[string]struct {
input string input string
stream StreamType stream StreamType
@ -71,10 +62,10 @@ func TestRedirectLogs(t *testing.T) {
} { } {
t.Logf("TestCase %q", desc) t.Logf("TestCase %q", desc)
rc := ioutil.NopCloser(strings.NewReader(test.input)) rc := ioutil.NopCloser(strings.NewReader(test.input))
c := f.NewContainerLogger("test-path", test.stream, rc).(*containerLogger) buf := bytes.NewBuffer(nil)
wc := &writeCloserBuffer{bytes.NewBuffer(nil)} wc := cioutil.NewNopWriteCloser(buf)
c.redirectLogs(wc) redirectLogs("test-path", rc, wc, test.stream)
output := wc.String() output := buf.String()
lines := strings.Split(output, "\n") lines := strings.Split(output, "\n")
lines = lines[:len(lines)-1] // Discard empty string after last \n lines = lines[:len(lines)-1] // Discard empty string after last \n
assert.Len(t, lines, len(test.content)) assert.Len(t, lines, len(test.content))

View File

@ -17,9 +17,7 @@ limitations under the License.
package server package server
import ( import (
"bytes"
"fmt" "fmt"
"io"
"os" "os"
"strings" "strings"
"time" "time"
@ -35,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox" sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox"
"github.com/kubernetes-incubator/cri-containerd/pkg/util"
) )
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure // RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
@ -50,7 +49,7 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
config := r.GetConfig() config := r.GetConfig()
// Generate unique id and name for the sandbox and reserve the name. // Generate unique id and name for the sandbox and reserve the name.
id := generateID() id := util.GenerateID()
name := makeSandboxName(config.GetMetadata()) name := makeSandboxName(config.GetMetadata())
// Reserve the sandbox name to avoid concurrent `RunPodSandbox` request starting the // Reserve the sandbox name to avoid concurrent `RunPodSandbox` request starting the
// same sandbox. // same sandbox.
@ -135,22 +134,6 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
} }
}() }()
// Discard sandbox container output because we don't care about it.
rStdoutPipe, wStdoutPipe := io.Pipe()
rStderrPipe, wStderrPipe := io.Pipe()
defer func() {
if retErr != nil {
rStdoutPipe.Close()
rStderrPipe.Close()
}
}()
if err := c.agentFactory.NewSandboxLogger(rStdoutPipe).Start(); err != nil {
return nil, fmt.Errorf("failed to start sandbox stdout logger: %v", err)
}
if err := c.agentFactory.NewSandboxLogger(rStderrPipe).Start(); err != nil {
return nil, fmt.Errorf("failed to start sandbox stderr logger: %v", err)
}
// Setup sandbox /dev/shm, /etc/hosts and /etc/resolv.conf. // Setup sandbox /dev/shm, /etc/hosts and /etc/resolv.conf.
if err = c.setupSandboxFiles(sandboxRootDir, config); err != nil { if err = c.setupSandboxFiles(sandboxRootDir, config); err != nil {
return nil, fmt.Errorf("failed to setup sandbox files: %v", err) return nil, fmt.Errorf("failed to setup sandbox files: %v", err)
@ -167,8 +150,8 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
// Create sandbox task in containerd. // Create sandbox task in containerd.
glog.V(5).Infof("Create sandbox container (id=%q, name=%q).", glog.V(5).Infof("Create sandbox container (id=%q, name=%q).",
id, name) id, name)
//TODO(Abhi): close the stdin or pass newIOCreation with /dev/null stdin // We don't need stdio for sandbox container.
task, err := container.NewTask(ctx, containerd.NewIO(new(bytes.Buffer), wStdoutPipe, wStderrPipe)) task, err := container.NewTask(ctx, containerd.NullIO)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create task for sandbox %q: %v", id, err) return nil, fmt.Errorf("failed to create task for sandbox %q: %v", id, err)
} }

View File

@ -107,5 +107,6 @@ func (c *criContainerdService) stopSandboxContainer(ctx context.Context, contain
if err != nil && !errdefs.IsNotFound(err) { if err != nil && !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to delete sandbox container: %v", err) return fmt.Errorf("failed to delete sandbox container: %v", err)
} }
return nil return nil
} }

View File

@ -31,7 +31,6 @@ import (
osinterface "github.com/kubernetes-incubator/cri-containerd/pkg/os" osinterface "github.com/kubernetes-incubator/cri-containerd/pkg/os"
"github.com/kubernetes-incubator/cri-containerd/pkg/registrar" "github.com/kubernetes-incubator/cri-containerd/pkg/registrar"
"github.com/kubernetes-incubator/cri-containerd/pkg/server/agents"
containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container" containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container"
imagestore "github.com/kubernetes-incubator/cri-containerd/pkg/store/image" imagestore "github.com/kubernetes-incubator/cri-containerd/pkg/store/image"
sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox" sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox"
@ -79,8 +78,6 @@ type criContainerdService struct {
eventService events.EventsClient eventService events.EventsClient
// netPlugin is used to setup and teardown network when run/stop pod sandbox. // netPlugin is used to setup and teardown network when run/stop pod sandbox.
netPlugin ocicni.CNIPlugin netPlugin ocicni.CNIPlugin
// agentFactory is the factory to create agent used in the cri containerd service.
agentFactory agents.AgentFactory
// client is an instance of the containerd client // client is an instance of the containerd client
client *containerd.Client client *containerd.Client
// streamServer is the streaming server serves container streaming request. // streamServer is the streaming server serves container streaming request.
@ -111,7 +108,6 @@ func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, n
imageStoreService: client.ImageService(), imageStoreService: client.ImageService(),
eventService: client.EventService(), eventService: client.EventService(),
contentStoreService: client.ContentStore(), contentStoreService: client.ContentStore(),
agentFactory: agents.NewAgentFactory(),
client: client, client: client,
} }

View File

@ -17,24 +17,14 @@ limitations under the License.
package server package server
import ( import (
"io"
ostesting "github.com/kubernetes-incubator/cri-containerd/pkg/os/testing" ostesting "github.com/kubernetes-incubator/cri-containerd/pkg/os/testing"
"github.com/kubernetes-incubator/cri-containerd/pkg/registrar" "github.com/kubernetes-incubator/cri-containerd/pkg/registrar"
agentstesting "github.com/kubernetes-incubator/cri-containerd/pkg/server/agents/testing"
servertesting "github.com/kubernetes-incubator/cri-containerd/pkg/server/testing" servertesting "github.com/kubernetes-incubator/cri-containerd/pkg/server/testing"
containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container" containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container"
imagestore "github.com/kubernetes-incubator/cri-containerd/pkg/store/image" imagestore "github.com/kubernetes-incubator/cri-containerd/pkg/store/image"
sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox" sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox"
) )
type nopReadWriteCloser struct{}
// Return error directly to avoid read/write.
func (nopReadWriteCloser) Read(p []byte) (n int, err error) { return 0, io.EOF }
func (nopReadWriteCloser) Write(p []byte) (n int, err error) { return 0, io.ErrShortWrite }
func (nopReadWriteCloser) Close() error { return nil }
const ( const (
testRootDir = "/test/rootfs" testRootDir = "/test/rootfs"
// Use an image id as test sandbox image to avoid image name resolve. // Use an image id as test sandbox image to avoid image name resolve.
@ -55,6 +45,5 @@ func newTestCRIContainerdService() *criContainerdService {
containerStore: containerstore.NewStore(), containerStore: containerstore.NewStore(),
containerNameIndex: registrar.NewRegistrar(), containerNameIndex: registrar.NewRegistrar(),
netPlugin: servertesting.NewFakeCNIPlugin(), netPlugin: servertesting.NewFakeCNIPlugin(),
agentFactory: agentstesting.NewFakeAgentFactory(),
} }
} }

View File

@ -17,7 +17,6 @@ limitations under the License.
package server package server
import ( import (
"errors"
"fmt" "fmt"
"io" "io"
"math" "math"
@ -79,7 +78,7 @@ func (s *streamRuntime) Exec(containerID string, cmd []string, stdin io.Reader,
func (s *streamRuntime) Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool, func (s *streamRuntime) Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool,
resize <-chan remotecommand.TerminalSize) error { resize <-chan remotecommand.TerminalSize) error {
return errors.New("not implemented") return s.c.attachContainer(context.Background(), containerID, in, out, err, tty, resize)
} }
func (s *streamRuntime) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error { func (s *streamRuntime) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error {

View File

@ -21,6 +21,7 @@ import (
"github.com/containerd/containerd" "github.com/containerd/containerd"
cio "github.com/kubernetes-incubator/cri-containerd/pkg/server/io"
"github.com/kubernetes-incubator/cri-containerd/pkg/store" "github.com/kubernetes-incubator/cri-containerd/pkg/store"
) )
@ -33,6 +34,9 @@ type Container struct {
Status StatusStorage Status StatusStorage
// Containerd container // Containerd container
Container containerd.Container Container containerd.Container
// Container IO
IO *cio.ContainerIO
// TODO(random-liu): Add stop channel to get rid of stop poll waiting.
} }
// Opts sets specific information to newly created Container. // Opts sets specific information to newly created Container.
@ -45,6 +49,13 @@ func WithContainer(cntr containerd.Container) Opts {
} }
} }
// WithContainerIO adds IO into the container.
func WithContainerIO(io *cio.ContainerIO) Opts {
return func(c *Container) {
c.IO = io
}
}
// NewContainer creates an internally used container type. // NewContainer creates an internally used container type.
func NewContainer(metadata Metadata, status Status, opts ...Opts) (Container, error) { func NewContainer(metadata Metadata, status Status, opts ...Opts) (Container, error) {
s, err := StoreStatus(metadata.ID, status) s, err := StoreStatus(metadata.ID, status)
@ -55,7 +66,6 @@ func NewContainer(metadata Metadata, status Status, opts ...Opts) (Container, er
Metadata: metadata, Metadata: metadata,
Status: s, Status: s,
} }
for _, o := range opts { for _, o := range opts {
o(&c) o(&c)
} }

View File

@ -23,6 +23,7 @@ import (
assertlib "github.com/stretchr/testify/assert" assertlib "github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
cio "github.com/kubernetes-incubator/cri-containerd/pkg/server/io"
"github.com/kubernetes-incubator/cri-containerd/pkg/store" "github.com/kubernetes-incubator/cri-containerd/pkg/store"
) )
@ -136,3 +137,36 @@ func TestContainerStore(t *testing.T) {
assert.Equal(Container{}, c) assert.Equal(Container{}, c)
assert.Equal(store.ErrNotExist, err) assert.Equal(store.ErrNotExist, err)
} }
func TestWithContainerIO(t *testing.T) {
meta := Metadata{
ID: "1",
Name: "Container-1",
SandboxID: "Sandbox-1",
Config: &runtime.ContainerConfig{
Metadata: &runtime.ContainerMetadata{
Name: "TestPod-1",
Attempt: 1,
},
},
ImageRef: "TestImage-1",
}
status := Status{
Pid: 1,
CreatedAt: time.Now().UnixNano(),
StartedAt: time.Now().UnixNano(),
FinishedAt: time.Now().UnixNano(),
ExitCode: 1,
Reason: "TestReason-1",
Message: "TestMessage-1",
}
assert := assertlib.New(t)
c, err := NewContainer(meta, status)
assert.NoError(err)
assert.Nil(c.IO)
c, err = NewContainer(meta, status, WithContainerIO(&cio.ContainerIO{}))
assert.NoError(err)
assert.NotNil(c.IO)
}

24
pkg/util/id.go Normal file
View File

@ -0,0 +1,24 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import "github.com/docker/docker/pkg/stringid"
// GenerateID generates a random unique id.
func GenerateID() string {
return stringid.GenerateNonCryptoID()
}