commit
1f3a73d79e
@ -19,9 +19,10 @@ package metadata
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store"
|
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
|
||||||
|
|
||||||
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
|
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
|
||||||
|
|
||||||
|
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
// The code is very similar with sandbox.go, but there is no template support
|
// The code is very similar with sandbox.go, but there is no template support
|
||||||
@ -71,8 +72,13 @@ type ContainerMetadata struct {
|
|||||||
// In fact, this field doesn't need to be checkpointed.
|
// In fact, this field doesn't need to be checkpointed.
|
||||||
// TODO(random-liu): Skip this during serialization when we put object
|
// TODO(random-liu): Skip this during serialization when we put object
|
||||||
// into the store directly.
|
// into the store directly.
|
||||||
// TODO(random-liu): Reset this field to false during state recoverry.
|
// TODO(random-liu): Reset this field to false during state recovery.
|
||||||
Removing bool
|
Removing bool
|
||||||
|
// TODO(random-liu): Remove following field after switching to new containerd
|
||||||
|
// client.
|
||||||
|
// Not including them in unit test now because they will be removed soon.
|
||||||
|
// Spec is the oci runtime spec used to run the container.
|
||||||
|
Spec *runtimespec.Spec
|
||||||
}
|
}
|
||||||
|
|
||||||
// State returns current state of the container based on the metadata.
|
// State returns current state of the container based on the metadata.
|
||||||
|
@ -160,6 +160,7 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C
|
|||||||
|
|
||||||
// Update container CreatedAt.
|
// Update container CreatedAt.
|
||||||
meta.CreatedAt = time.Now().UnixNano()
|
meta.CreatedAt = time.Now().UnixNano()
|
||||||
|
meta.Spec = spec
|
||||||
// Add container into container store.
|
// Add container into container store.
|
||||||
if err := c.containerStore.Create(meta); err != nil {
|
if err := c.containerStore.Create(meta); err != nil {
|
||||||
return nil, fmt.Errorf("failed to add container metadata %+v into store: %v",
|
return nil, fmt.Errorf("failed to add container metadata %+v into store: %v",
|
||||||
|
@ -565,13 +565,6 @@ func TestCreateContainer(t *testing.T) {
|
|||||||
id := resp.GetContainerId()
|
id := resp.GetContainerId()
|
||||||
assert.True(t, rootExists)
|
assert.True(t, rootExists)
|
||||||
assert.Equal(t, getContainerRootDir(c.rootDir, id), rootPath, "root directory should be created")
|
assert.Equal(t, getContainerRootDir(c.rootDir, id), rootPath, "root directory should be created")
|
||||||
meta, err := c.containerStore.Get(id)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
require.NotNil(t, meta)
|
|
||||||
test.expectMeta.ID = id
|
|
||||||
// TODO(random-liu): Use fake clock to test CreatedAt.
|
|
||||||
test.expectMeta.CreatedAt = meta.CreatedAt
|
|
||||||
assert.Equal(t, test.expectMeta, meta, "container metadata should be created")
|
|
||||||
|
|
||||||
// Check runtime spec
|
// Check runtime spec
|
||||||
containersCalls := fake.GetCalledDetails()
|
containersCalls := fake.GetCalledDetails()
|
||||||
@ -593,5 +586,14 @@ func TestCreateContainer(t *testing.T) {
|
|||||||
Key: id,
|
Key: id,
|
||||||
Parent: testChainID,
|
Parent: testChainID,
|
||||||
}, prepareOpts, "prepare request should be correct")
|
}, prepareOpts, "prepare request should be correct")
|
||||||
|
|
||||||
|
meta, err := c.containerStore.Get(id)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
require.NotNil(t, meta)
|
||||||
|
test.expectMeta.ID = id
|
||||||
|
// TODO(random-liu): Use fake clock to test CreatedAt.
|
||||||
|
test.expectMeta.CreatedAt = meta.CreatedAt
|
||||||
|
test.expectMeta.Spec = spec
|
||||||
|
assert.Equal(t, test.expectMeta, meta, "container metadata should be created")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,15 +17,136 @@ limitations under the License.
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd/api/services/execution"
|
||||||
|
"github.com/containerd/containerd/api/types/task"
|
||||||
|
prototypes "github.com/gogo/protobuf/types"
|
||||||
|
"github.com/golang/glog"
|
||||||
|
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
|
||||||
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
|
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ExecSync executes a command in the container, and returns the stdout output.
|
// ExecSync executes a command in the container, and returns the stdout output.
|
||||||
// If command exits with a non-zero exit code, an error is returned.
|
// If command exits with a non-zero exit code, an error is returned.
|
||||||
func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) {
|
func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (retRes *runtime.ExecSyncResponse, retErr error) {
|
||||||
return nil, errors.New("not implemented")
|
glog.V(2).Infof("ExecSync for %q with command %+v and timeout %d (s)", r.GetContainerId(), r.GetCmd(), r.GetTimeout())
|
||||||
|
defer func() {
|
||||||
|
if retErr == nil {
|
||||||
|
glog.V(2).Infof("ExecSync for %q returns with exit code %d", r.GetContainerId(), retRes.GetExitCode())
|
||||||
|
glog.V(4).Infof("ExecSync for %q outputs - stdout: %q, stderr: %q", r.GetContainerId(),
|
||||||
|
retRes.GetStdout(), retRes.GetStderr())
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Get container config from container store.
|
||||||
|
meta, err := c.containerStore.Get(r.GetContainerId())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err)
|
||||||
|
}
|
||||||
|
id := meta.ID
|
||||||
|
|
||||||
|
if meta.State() != runtime.ContainerState_CONTAINER_RUNNING {
|
||||||
|
return nil, fmt.Errorf("container %q is in %s state", id, criContainerStateToString(meta.State()))
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(random-liu): Replace the following logic with containerd client and add unit test.
|
||||||
|
// Prepare streaming pipes.
|
||||||
|
execDir, err := ioutil.TempDir(getContainerRootDir(c.rootDir, id), "exec")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create exec streaming directory: %v", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err = c.os.RemoveAll(execDir); err != nil {
|
||||||
|
glog.Errorf("Failed to remove exec streaming directory %q: %v", execDir, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
_, stdout, stderr := getStreamingPipes(execDir)
|
||||||
|
_, stdoutPipe, stderrPipe, err := c.prepareStreamingPipes(ctx, "", stdout, stderr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to prepare streaming pipes: %v", err)
|
||||||
|
}
|
||||||
|
defer stdoutPipe.Close()
|
||||||
|
defer stderrPipe.Close()
|
||||||
|
|
||||||
|
// Start redirecting exec output.
|
||||||
|
stdoutBuf, stderrBuf := new(bytes.Buffer), new(bytes.Buffer)
|
||||||
|
go io.Copy(stdoutBuf, stdoutPipe) // nolint: errcheck
|
||||||
|
go io.Copy(stderrBuf, stderrPipe) // nolint: errcheck
|
||||||
|
|
||||||
|
// Get containerd event client first, so that we won't miss any events.
|
||||||
|
// TODO(random-liu): Handle this in event handler. Create an events client for
|
||||||
|
// each exec introduces unnecessary overhead.
|
||||||
|
cancellable, cancel := context.WithCancel(ctx)
|
||||||
|
events, err := c.taskService.Events(cancellable, &execution.EventsRequest{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get containerd event: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
spec := &meta.Spec.Process
|
||||||
|
spec.Args = r.GetCmd()
|
||||||
|
rawSpec, err := json.Marshal(spec)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to marshal oci spec %+v: %v", spec, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.taskService.Exec(ctx, &execution.ExecRequest{
|
||||||
|
ContainerID: id,
|
||||||
|
Terminal: false,
|
||||||
|
Stdout: stdout,
|
||||||
|
Stderr: stderr,
|
||||||
|
Spec: &prototypes.Any{
|
||||||
|
TypeUrl: runtimespec.Version,
|
||||||
|
Value: rawSpec,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to exec in container %q: %v", id, err)
|
||||||
|
}
|
||||||
|
exitCode, err := waitContainerExec(cancel, events, id, resp.Pid, r.GetTimeout())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to wait for exec in container %q to finish: %v", id, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(random-liu): Make sure stdout/stderr are drained.
|
||||||
|
return &runtime.ExecSyncResponse{
|
||||||
|
Stdout: stdoutBuf.Bytes(),
|
||||||
|
Stderr: stderrBuf.Bytes(),
|
||||||
|
ExitCode: int32(exitCode),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitContainerExec waits for container exec to finish and returns the exit code.
|
||||||
|
func waitContainerExec(cancel context.CancelFunc, events execution.Tasks_EventsClient, id string,
|
||||||
|
pid uint32, timeout int64) (uint32, error) {
|
||||||
|
// TODO(random-liu): [P1] Support ExecSync timeout.
|
||||||
|
// TODO(random-liu): Delete process after containerd upgrade.
|
||||||
|
defer func() {
|
||||||
|
// Stop events and drain the event channel. grpc-go#188
|
||||||
|
cancel()
|
||||||
|
for {
|
||||||
|
_, err := events.Recv()
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
for {
|
||||||
|
e, err := events.Recv()
|
||||||
|
if err != nil {
|
||||||
|
// Return non-zero exit code just in case.
|
||||||
|
return unknownExitCode, err
|
||||||
|
}
|
||||||
|
if e.Type != task.Event_EXIT {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if e.ID == id && e.Pid == pid {
|
||||||
|
return e.ExitStatus, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -50,6 +50,8 @@ const (
|
|||||||
completeExitReason = "Completed"
|
completeExitReason = "Completed"
|
||||||
// errorExitReason is the exit reason when container exits with code non-zero.
|
// errorExitReason is the exit reason when container exits with code non-zero.
|
||||||
errorExitReason = "Error"
|
errorExitReason = "Error"
|
||||||
|
// unknownExitCode is the exit code when exit reason is unknown.
|
||||||
|
unknownExitCode = 255
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
Loading…
Reference in New Issue
Block a user