Implement streaming CRI methods in dockershim

This commit is contained in:
Tim St. Clair 2016-10-26 16:34:45 -07:00
parent bc7ae399f8
commit c60db99536
No known key found for this signature in database
GPG Key ID: 434D16BCEF479EAB
20 changed files with 432 additions and 64 deletions

View File

@ -17,7 +17,7 @@ limitations under the License.
package api package api
import ( import (
"io" "time"
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
) )
@ -43,8 +43,13 @@ type ContainerManager interface {
ListContainers(filter *runtimeApi.ContainerFilter) ([]*runtimeApi.Container, error) ListContainers(filter *runtimeApi.ContainerFilter) ([]*runtimeApi.Container, error)
// ContainerStatus returns the status of the container. // ContainerStatus returns the status of the container.
ContainerStatus(containerID string) (*runtimeApi.ContainerStatus, error) ContainerStatus(containerID string) (*runtimeApi.ContainerStatus, error)
// Exec executes a command in the container. // ExecSync executes a command in the container, and returns the stdout output.
Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error // If command exits with a non-zero exit code, an error is returned.
ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error)
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
Exec(*runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error)
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error)
} }
// PodSandboxManager contains methods for operating on PodSandboxes. The methods // PodSandboxManager contains methods for operating on PodSandboxes. The methods
@ -63,6 +68,8 @@ type PodSandboxManager interface {
PodSandboxStatus(podSandboxID string) (*runtimeApi.PodSandboxStatus, error) PodSandboxStatus(podSandboxID string) (*runtimeApi.PodSandboxStatus, error)
// ListPodSandbox returns a list of Sandbox. // ListPodSandbox returns a list of Sandbox.
ListPodSandbox(filter *runtimeApi.PodSandboxFilter) ([]*runtimeApi.PodSandbox, error) ListPodSandbox(filter *runtimeApi.PodSandboxFilter) ([]*runtimeApi.PodSandbox, error)
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
PortForward(*runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error)
} }
// RuntimeService interface should be implemented by a container runtime. // RuntimeService interface should be implemented by a container runtime.

View File

@ -18,7 +18,6 @@ package testing
import ( import (
"fmt" "fmt"
"io"
"reflect" "reflect"
"sync" "sync"
"time" "time"
@ -214,6 +213,14 @@ func (r *FakeRuntimeService) ListPodSandbox(filter *runtimeApi.PodSandboxFilter)
return result, nil return result, nil
} }
func (r *FakeRuntimeService) PortForward(*runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) {
r.Lock()
defer r.Unlock()
r.Called = append(r.Called, "PortForward")
return &runtimeApi.PortForwardResponse{}, nil
}
func (r *FakeRuntimeService) CreateContainer(podSandboxID string, config *runtimeApi.ContainerConfig, sandboxConfig *runtimeApi.PodSandboxConfig) (string, error) { func (r *FakeRuntimeService) CreateContainer(podSandboxID string, config *runtimeApi.ContainerConfig, sandboxConfig *runtimeApi.PodSandboxConfig) (string, error) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
@ -349,12 +356,28 @@ func (r *FakeRuntimeService) ContainerStatus(containerID string) (*runtimeApi.Co
return &status, nil return &status, nil
} }
func (r *FakeRuntimeService) Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error { func (r *FakeRuntimeService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
r.Lock()
defer r.Unlock()
r.Called = append(r.Called, "ExecSync")
return nil, nil, nil
}
func (r *FakeRuntimeService) Exec(*runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
r.Called = append(r.Called, "Exec") r.Called = append(r.Called, "Exec")
return nil return &runtimeApi.ExecResponse{}, nil
}
func (r *FakeRuntimeService) Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) {
r.Lock()
defer r.Unlock()
r.Called = append(r.Called, "Attach")
return &runtimeApi.AttachResponse{}, nil
} }
func (r *FakeRuntimeService) UpdateRuntimeConfig(runtimeCOnfig *runtimeApi.RuntimeConfig) error { func (r *FakeRuntimeService) UpdateRuntimeConfig(runtimeCOnfig *runtimeApi.RuntimeConfig) error {

View File

@ -19,6 +19,7 @@ go_library(
"docker_image.go", "docker_image.go",
"docker_sandbox.go", "docker_sandbox.go",
"docker_service.go", "docker_service.go",
"docker_streaming.go",
"helpers.go", "helpers.go",
"legacy.go", "legacy.go",
"naming.go", "naming.go",
@ -32,7 +33,9 @@ go_library(
"//pkg/kubelet/dockertools:go_default_library", "//pkg/kubelet/dockertools:go_default_library",
"//pkg/kubelet/leaky:go_default_library", "//pkg/kubelet/leaky:go_default_library",
"//pkg/kubelet/qos:go_default_library", "//pkg/kubelet/qos:go_default_library",
"//pkg/kubelet/server/streaming:go_default_library",
"//pkg/kubelet/types:go_default_library", "//pkg/kubelet/types:go_default_library",
"//pkg/kubelet/util/ioutils:go_default_library",
"//pkg/util/term:go_default_library", "//pkg/util/term:go_default_library",
"//vendor:github.com/docker/engine-api/types", "//vendor:github.com/docker/engine-api/types",
"//vendor:github.com/docker/engine-api/types/container", "//vendor:github.com/docker/engine-api/types/container",

View File

@ -18,7 +18,6 @@ package dockershim
import ( import (
"fmt" "fmt"
"io"
"os" "os"
"path/filepath" "path/filepath"
"time" "time"
@ -388,10 +387,3 @@ func (ds *dockerService) ContainerStatus(containerID string) (*runtimeApi.Contai
Annotations: annotations, Annotations: annotations,
}, nil }, nil
} }
// Exec execute a command in the container.
// TODO: Need to handle terminal resizing before implementing this function.
// https://github.com/kubernetes/kubernetes/issues/29579.
func (ds *dockerService) Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error {
return fmt.Errorf("not implemented")
}

View File

@ -25,6 +25,7 @@ import (
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
"k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/util/term"
) )
@ -55,13 +56,27 @@ const (
var internalLabelKeys []string = []string{containerTypeLabelKey, containerLogPathLabelKey, sandboxIDLabelKey} var internalLabelKeys []string = []string{containerTypeLabelKey, containerLogPathLabelKey, sandboxIDLabelKey}
// NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process. // NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process.
func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot string, podSandboxImage string) DockerService { func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot string, podSandboxImage string, streamingConfig *streaming.Config) (DockerService, error) {
return &dockerService{ ds := &dockerService{
seccompProfileRoot: seccompProfileRoot, seccompProfileRoot: seccompProfileRoot,
client: dockertools.NewInstrumentedDockerInterface(client), client: dockertools.NewInstrumentedDockerInterface(client),
os: kubecontainer.RealOS{}, os: kubecontainer.RealOS{},
podSandboxImage: podSandboxImage, podSandboxImage: podSandboxImage,
streamingRuntime: &streamingRuntime{
client: client,
// Only the native exec handling is supported for now.
// TODO(#35747) - Either deprecate nsenter exec handling, or add support for it here.
execHandler: &dockertools.NativeExecHandler{},
},
} }
if streamingConfig != nil {
var err error
ds.streamingServer, err = streaming.NewServer(*streamingConfig, ds.streamingRuntime)
if err != nil {
return nil, err
}
}
return ds, nil
} }
// DockerService is an interface that embeds both the new RuntimeService and // DockerService is an interface that embeds both the new RuntimeService and
@ -78,11 +93,9 @@ type DockerService interface {
type DockerLegacyService interface { type DockerLegacyService interface {
// Supporting legacy methods for docker. // Supporting legacy methods for docker.
GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error)
kubecontainer.ContainerAttacher LegacyExec(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
PortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error LegacyAttach(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
LegacyPortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error
// TODO: Remove this once exec is properly defined in CRI.
ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
} }
type dockerService struct { type dockerService struct {
@ -90,6 +103,8 @@ type dockerService struct {
client dockertools.DockerInterface client dockertools.DockerInterface
os kubecontainer.OSInterface os kubecontainer.OSInterface
podSandboxImage string podSandboxImage string
streamingRuntime *streamingRuntime
streamingServer streaming.Server
} }
// Version returns the runtime name, runtime version and runtime API version // Version returns the runtime name, runtime version and runtime API version

View File

@ -0,0 +1,149 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dockershim
import (
"bytes"
"fmt"
"io"
"math"
"time"
dockertypes "github.com/docker/engine-api/types"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
"k8s.io/kubernetes/pkg/util/term"
)
type streamingRuntime struct {
client dockertools.DockerInterface
execHandler dockertools.ExecHandler
}
var _ streaming.Runtime = &streamingRuntime{}
func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
return r.exec(containerID, cmd, in, out, err, tty, resize, 0)
}
// Internal version of Exec adds a timeout.
func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
container, err := checkContainerStatus(r.client, containerID)
if err != nil {
return err
}
// TODO(timstclair): Clean this up once PR#33366 merges.
if timeout <= 0 {
// Run until command exits.
return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize)
}
errCh := make(chan error)
go func() {
errCh <- r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize)
}()
select {
case err := <-errCh:
return err
case <-time.After(timeout):
return streaming.ErrorTimeout("exec", timeout)
}
}
func (r *streamingRuntime) Attach(containerID string, in io.Reader, out, errw io.WriteCloser, resize <-chan term.Size) error {
container, err := checkContainerStatus(r.client, containerID)
if err != nil {
return err
}
tty := container.Config.Tty
return dockertools.AttachContainer(r.client, containerID, in, out, errw, tty, resize)
}
func (r *streamingRuntime) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
if port < 0 || port > math.MaxUint16 {
return fmt.Errorf("invalid port %d", port)
}
return dockertools.PortForward(r.client, podSandboxID, uint16(port), stream)
}
// ExecSync executes a command in the container, and returns the stdout output.
// If command exits with a non-zero exit code, an error is returned.
func (ds *dockerService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
var stdoutBuffer, stderrBuffer bytes.Buffer
err = ds.streamingRuntime.exec(containerID, cmd,
nil, // in
ioutils.WriteCloserWrapper(&stdoutBuffer),
ioutils.WriteCloserWrapper(&stderrBuffer),
false, // tty
nil, // resize
timeout)
return stdoutBuffer.Bytes(), stderrBuffer.Bytes(), err
}
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (ds *dockerService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
if ds.streamingServer == nil {
return nil, streaming.ErrorStreamingDisabled("exec")
}
_, err := checkContainerStatus(ds.client, req.GetContainerId())
if err != nil {
return nil, err
}
return ds.streamingServer.GetExec(req)
}
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
func (ds *dockerService) Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
if ds.streamingServer == nil {
return nil, streaming.ErrorStreamingDisabled("attach")
}
container, err := checkContainerStatus(ds.client, req.GetContainerId())
if err != nil {
return nil, err
}
tty := container.Config.Tty
return ds.streamingServer.GetAttach(req, tty)
}
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
func (ds *dockerService) PortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
if ds.streamingServer == nil {
return nil, streaming.ErrorStreamingDisabled("port forward")
}
_, err := checkContainerStatus(ds.client, req.GetPodSandboxId())
if err != nil {
return nil, err
}
// TODO(timstclair): Verify that ports are exposed.
return ds.streamingServer.GetPortForward(req)
}
func checkContainerStatus(client dockertools.DockerInterface, containerID string) (*dockertypes.ContainerJSON, error) {
container, err := client.InspectContainer(containerID)
if err != nil {
return nil, err
}
if !container.State.Running {
return nil, fmt.Errorf("container not running (%s)", container.ID)
}
return container, nil
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package dockershim package dockershim
import ( import (
"fmt"
"io" "io"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
@ -31,8 +30,8 @@ import (
// directly. // directly.
// TODO: implement the methods in this file. // TODO: implement the methods in this file.
func (ds *dockerService) AttachContainer(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) { func (ds *dockerService) LegacyAttach(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) {
return dockertools.AttachContainer(ds.client, id, stdin, stdout, stderr, tty, resize) return ds.streamingRuntime.Attach(id.ID, stdin, stdout, stderr, resize)
} }
func (ds *dockerService) GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) { func (ds *dockerService) GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) {
@ -43,19 +42,10 @@ func (ds *dockerService) GetContainerLogs(pod *api.Pod, containerID kubecontaine
return dockertools.GetContainerLogs(ds.client, pod, containerID, logOptions, stdout, stderr, container.Config.Tty) return dockertools.GetContainerLogs(ds.client, pod, containerID, logOptions, stdout, stderr, container.Config.Tty)
} }
func (ds *dockerService) PortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error { func (ds *dockerService) LegacyPortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error {
return dockertools.PortForward(ds.client, sandboxID, port, stream) return ds.streamingRuntime.PortForward(sandboxID, int32(port), stream)
} }
func (ds *dockerService) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { func (ds *dockerService) LegacyExec(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
container, err := ds.client.InspectContainer(containerID.ID) return ds.streamingRuntime.Exec(containerID.ID, cmd, stdin, stdout, stderr, tty, resize)
if err != nil {
return err
}
if !container.State.Running {
return fmt.Errorf("container not running (%s)", container.ID)
}
handler := &dockertools.NativeExecHandler{}
return handler.ExecInContainer(ds.client, container, cmd, stdin, stdout, stderr, tty, resize)
} }

View File

@ -1269,16 +1269,16 @@ func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID,
} }
func (dm *DockerManager) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { func (dm *DockerManager) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
return AttachContainer(dm.client, containerID, stdin, stdout, stderr, tty, resize) return AttachContainer(dm.client, containerID.ID, stdin, stdout, stderr, tty, resize)
} }
// Temporarily export this function to share with dockershim. // Temporarily export this function to share with dockershim.
// TODO: clean this up. // TODO: clean this up.
func AttachContainer(client DockerInterface, containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { func AttachContainer(client DockerInterface, containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
// Have to start this before the call to client.AttachToContainer because client.AttachToContainer is a blocking // Have to start this before the call to client.AttachToContainer because client.AttachToContainer is a blocking
// call :-( Otherwise, resize events don't get processed and the terminal never resizes. // call :-( Otherwise, resize events don't get processed and the terminal never resizes.
kubecontainer.HandleResizing(resize, func(size term.Size) { kubecontainer.HandleResizing(resize, func(size term.Size) {
client.ResizeContainerTTY(containerID.ID, int(size.Height), int(size.Width)) client.ResizeContainerTTY(containerID, int(size.Height), int(size.Width))
}) })
// TODO(random-liu): Do we really use the *Logs* field here? // TODO(random-liu): Do we really use the *Logs* field here?
@ -1294,7 +1294,7 @@ func AttachContainer(client DockerInterface, containerID kubecontainer.Container
ErrorStream: stderr, ErrorStream: stderr,
RawTerminal: tty, RawTerminal: tty,
} }
return client.AttachToContainer(containerID.ID, opts, sopts) return client.AttachToContainer(containerID, opts, sopts)
} }
func noPodInfraContainerError(podName, podNamespace string) error { func noPodInfraContainerError(podName, podNamespace string) error {

View File

@ -489,7 +489,10 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
case "cri": case "cri":
// Use the new CRI shim for docker. This is needed for testing the // Use the new CRI shim for docker. This is needed for testing the
// docker integration through CRI, and may be removed in the future. // docker integration through CRI, and may be removed in the future.
dockerService := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage) dockerService, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, nil)
if err != nil {
return nil, err
}
runtimeService := dockerService.(internalApi.RuntimeService) runtimeService := dockerService.(internalApi.RuntimeService)
imageService := dockerService.(internalApi.ImageManagerService) imageService := dockerService.(internalApi.ImageManagerService)

View File

@ -17,7 +17,6 @@ limitations under the License.
package kuberuntime package kuberuntime
import ( import (
"io"
"time" "time"
internalApi "k8s.io/kubernetes/pkg/kubelet/api" internalApi "k8s.io/kubernetes/pkg/kubelet/api"
@ -123,13 +122,31 @@ func (in instrumentedRuntimeService) ContainerStatus(containerID string) (*runti
return out, err return out, err
} }
func (in instrumentedRuntimeService) Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error { func (in instrumentedRuntimeService) ExecSync(containerID string, cmd []string, timeout time.Duration) ([]byte, []byte, error) {
const operation = "exec_sync"
defer recordOperation(operation, time.Now())
stdout, stderr, err := in.service.ExecSync(containerID, cmd, timeout)
recordError(operation, err)
return stdout, stderr, err
}
func (in instrumentedRuntimeService) Exec(req *runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) {
const operation = "exec" const operation = "exec"
defer recordOperation(operation, time.Now()) defer recordOperation(operation, time.Now())
err := in.service.Exec(containerID, cmd, tty, stdin, stdout, stderr) resp, err := in.service.Exec(req)
recordError(operation, err) recordError(operation, err)
return err return resp, err
}
func (in instrumentedRuntimeService) Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) {
const operation = "attach"
defer recordOperation(operation, time.Now())
resp, err := in.service.Attach(req)
recordError(operation, err)
return resp, err
} }
func (in instrumentedRuntimeService) RunPodSandbox(config *runtimeApi.PodSandboxConfig) (string, error) { func (in instrumentedRuntimeService) RunPodSandbox(config *runtimeApi.PodSandboxConfig) (string, error) {
@ -177,6 +194,15 @@ func (in instrumentedRuntimeService) ListPodSandbox(filter *runtimeApi.PodSandbo
return out, err return out, err
} }
func (in instrumentedRuntimeService) PortForward(req *runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) {
const operation = "port_forward"
defer recordOperation(operation, time.Now())
resp, err := in.service.PortForward(req)
recordError(operation, err)
return resp, err
}
func (in instrumentedRuntimeService) UpdateRuntimeConfig(runtimeConfig *runtimeApi.RuntimeConfig) error { func (in instrumentedRuntimeService) UpdateRuntimeConfig(runtimeConfig *runtimeApi.RuntimeConfig) error {
const operation = "update_runtime_config" const operation = "update_runtime_config"
defer recordOperation(operation, time.Now()) defer recordOperation(operation, time.Now())

View File

@ -668,7 +668,7 @@ func (m *kubeGenericRuntimeManager) AttachContainer(id kubecontainer.ContainerID
// now to unblock other tests. // now to unblock other tests.
// TODO: remove this hack after attach is defined in CRI. // TODO: remove this hack after attach is defined in CRI.
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok { if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {
return ds.AttachContainer(id, stdin, stdout, stderr, tty, resize) return ds.LegacyAttach(id, stdin, stdout, stderr, tty, resize)
} }
return fmt.Errorf("not implemented") return fmt.Errorf("not implemented")
} }
@ -694,7 +694,7 @@ func (m *kubeGenericRuntimeManager) ExecInContainer(containerID kubecontainer.Co
// now to unblock other tests. // now to unblock other tests.
// TODO: remove this hack after exec is defined in CRI. // TODO: remove this hack after exec is defined in CRI.
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok { if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {
return ds.ExecInContainer(containerID, cmd, stdin, stdout, stderr, tty, resize) return ds.LegacyExec(containerID, cmd, stdin, stdout, stderr, tty, resize)
} }
return fmt.Errorf("not implemented") return fmt.Errorf("not implemented")
} }

View File

@ -1001,7 +1001,7 @@ func (m *kubeGenericRuntimeManager) PortForward(pod *kubecontainer.Pod, port uin
// now to unblock other tests. // now to unblock other tests.
// TODO: remove this hack after portforward is defined in CRI. // TODO: remove this hack after portforward is defined in CRI.
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok { if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {
return ds.PortForward(pod.Sandboxes[0].ID.ID, port, stream) return ds.LegacyPortForward(pod.Sandboxes[0].ID.ID, port, stream)
} }
return fmt.Errorf("not implemented") return fmt.Errorf("not implemented")

View File

@ -22,6 +22,7 @@ go_library(
deps = [ deps = [
"//pkg/kubelet/api:go_default_library", "//pkg/kubelet/api:go_default_library",
"//pkg/kubelet/api/v1alpha1/runtime:go_default_library", "//pkg/kubelet/api/v1alpha1/runtime:go_default_library",
"//pkg/util/exec:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
"//vendor:golang.org/x/net/context", "//vendor:golang.org/x/net/context",
"//vendor:google.golang.org/grpc", "//vendor:google.golang.org/grpc",

View File

@ -17,14 +17,15 @@ limitations under the License.
package remote package remote
import ( import (
"io" "fmt"
"strings"
"time" "time"
"fmt"
"github.com/golang/glog" "github.com/golang/glog"
"google.golang.org/grpc" "google.golang.org/grpc"
internalApi "k8s.io/kubernetes/pkg/kubelet/api" internalApi "k8s.io/kubernetes/pkg/kubelet/api"
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
utilexec "k8s.io/kubernetes/pkg/util/exec"
) )
// RemoteRuntimeService is a gRPC implementation of internalApi.RuntimeService. // RemoteRuntimeService is a gRPC implementation of internalApi.RuntimeService.
@ -247,10 +248,75 @@ func (r *RemoteRuntimeService) ContainerStatus(containerID string) (*runtimeApi.
return resp.Status, nil return resp.Status, nil
} }
// Exec executes a command in the container. // ExecSync executes a command in the container, and returns the stdout output.
// TODO: support terminal resizing for exec, refer https://github.com/kubernetes/kubernetes/issues/29579. // If command exits with a non-zero exit code, an error is returned.
func (r *RemoteRuntimeService) Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error { func (r *RemoteRuntimeService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
return fmt.Errorf("Not implemented") ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
timeoutSeconds := int64(timeout.Seconds())
req := &runtimeApi.ExecSyncRequest{
ContainerId: &containerID,
Cmd: cmd,
Timeout: &timeoutSeconds,
}
resp, err := r.runtimeClient.ExecSync(ctx, req)
if err != nil {
glog.Errorf("ExecSync %s '%s' from runtime service failed: %v", containerID, strings.Join(cmd, " "), err)
return nil, nil, err
}
err = nil
if resp.GetExitCode() != 0 {
err = utilexec.CodeExitError{
Err: fmt.Errorf("command '%s' exited with %d: %s", strings.Join(cmd, " "), resp.GetExitCode(), resp.GetStderr()),
Code: int(resp.GetExitCode()),
}
}
return resp.GetStdout(), resp.GetStderr(), err
}
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (r *RemoteRuntimeService) Exec(req *runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.Exec(ctx, req)
if err != nil {
glog.Errorf("Exec %s '%s' from runtime service failed: %v", req.GetContainerId(), strings.Join(req.GetCmd(), " "), err)
return nil, err
}
return resp, nil
}
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
func (r *RemoteRuntimeService) Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.Attach(ctx, req)
if err != nil {
glog.Errorf("Attach %s from runtime service failed: %v", req.GetContainerId(), err)
return nil, err
}
return resp, nil
}
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
func (r *RemoteRuntimeService) PortForward(req *runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) {
ctx, cancel := getContextWithTimeout(r.timeout)
defer cancel()
resp, err := r.runtimeClient.PortForward(ctx, req)
if err != nil {
glog.Errorf("PortForward %s from runtime service failed: %v", req.GetPodSandboxId(), err)
return nil, err
}
return resp, nil
} }
func (r *RemoteRuntimeService) UpdateRuntimeConfig(runtimeConfig *runtimeApi.RuntimeConfig) error { func (r *RemoteRuntimeService) UpdateRuntimeConfig(runtimeConfig *runtimeApi.RuntimeConfig) error {

View File

@ -23,6 +23,7 @@ go_library(
deps = [ deps = [
"//pkg/kubelet/api:go_default_library", "//pkg/kubelet/api:go_default_library",
"//pkg/kubelet/api/v1alpha1/runtime:go_default_library", "//pkg/kubelet/api/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/util/ioutils:go_default_library",
], ],
) )

View File

@ -17,7 +17,7 @@ limitations under the License.
package rktshim package rktshim
import ( import (
"io" "time"
kubeletApi "k8s.io/kubernetes/pkg/kubelet/api" kubeletApi "k8s.io/kubernetes/pkg/kubelet/api"
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
@ -65,7 +65,18 @@ func (*Runtime) ContainerStatus(string) (*runtimeApi.ContainerStatus, error) {
panic("not implemented") panic("not implemented")
} }
// Exec executes a command inside an app running inside a pod sanbox. // ExecSync executes a command in the container, and returns the stdout output.
func (*Runtime) Exec(string, []string, bool, io.Reader, io.WriteCloser, io.WriteCloser) error { // If command exits with a non-zero exit code, an error is returned.
func (*Runtime) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
panic("not implemented")
}
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (*Runtime) Exec(*runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) {
panic("not implemented")
}
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
func (*Runtime) Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) {
panic("not implemented") panic("not implemented")
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package rktshim package rktshim
import ( import (
"bytes"
"errors" "errors"
"io" "io"
"math/rand" "math/rand"
@ -24,6 +25,12 @@ import (
kubeletApi "k8s.io/kubernetes/pkg/kubelet/api" kubeletApi "k8s.io/kubernetes/pkg/kubelet/api"
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
)
const (
FakeStreamingHost = "localhost"
FakeStreamingPort = "12345"
) )
func init() { func init() {
@ -200,16 +207,34 @@ func (r *FakeRuntime) ContainerStatus(id string) (*runtimeApi.ContainerStatus, e
return &c.Status, nil return &c.Status, nil
} }
func (r *FakeRuntime) Exec(id string, cmd []string, tty bool, in io.Reader, out, err io.WriteCloser) error { func (r *FakeRuntime) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
c, ok := r.Containers[id] c, ok := r.Containers[containerID]
if !ok { if !ok {
return ErrContainerNotFound return nil, nil, ErrContainerNotFound
} }
// TODO(tmrts): Validate the assumption that container has to be running for exec to work. // TODO(tmrts): Validate the assumption that container has to be running for exec to work.
if c.State != runtimeApi.ContainerState_RUNNING { if c.State != runtimeApi.ContainerState_RUNNING {
return ErrInvalidContainerStateTransition return nil, nil, ErrInvalidContainerStateTransition
} }
return c.Exec(cmd, in, out, err) var stdoutBuffer, stderrBuffer bytes.Buffer
err = c.Exec(cmd, nil,
ioutils.WriteCloserWrapper(&stdoutBuffer),
ioutils.WriteCloserWrapper(&stderrBuffer))
return stdoutBuffer.Bytes(), stderrBuffer.Bytes(), err
}
func (r *FakeRuntime) Exec(req *runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) {
url := "http://" + FakeStreamingHost + ":" + FakeStreamingPort + "/exec/" + req.GetContainerId()
return &runtimeApi.ExecResponse{
Url: &url,
}, nil
}
func (r *FakeRuntime) Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) {
url := "http://" + FakeStreamingHost + ":" + FakeStreamingPort + "/attach/" + req.GetContainerId()
return &runtimeApi.AttachResponse{
Url: &url,
}, nil
} }

View File

@ -57,3 +57,8 @@ func (*PodSandboxManager) PodSandboxStatus(string) (*runtimeApi.PodSandboxStatus
func (*PodSandboxManager) ListPodSandbox(*runtimeApi.PodSandboxFilter) ([]*runtimeApi.PodSandbox, error) { func (*PodSandboxManager) ListPodSandbox(*runtimeApi.PodSandboxFilter) ([]*runtimeApi.PodSandbox, error) {
panic("not implemented") panic("not implemented")
} }
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
func (*PodSandboxManager) PortForward(*runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) {
panic("not implemented")
}

View File

@ -12,7 +12,10 @@ load(
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = ["server.go"], srcs = [
"errors.go",
"server.go",
],
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/kubelet/api/v1alpha1/runtime:go_default_library", "//pkg/kubelet/api/v1alpha1/runtime:go_default_library",
@ -21,6 +24,7 @@ go_library(
"//pkg/types:go_default_library", "//pkg/types:go_default_library",
"//pkg/util/term:go_default_library", "//pkg/util/term:go_default_library",
"//vendor:github.com/emicklei/go-restful", "//vendor:github.com/emicklei/go-restful",
"//vendor:google.golang.org/grpc/codes",
"//vendor:k8s.io/client-go/pkg/api", "//vendor:k8s.io/client-go/pkg/api",
], ],
) )

View File

@ -0,0 +1,47 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package streaming
import (
"fmt"
"time"
"google.golang.org/grpc/codes"
)
type ResponseError struct {
Err string
Code codes.Code
}
func (e *ResponseError) Error() string {
return e.Err
}
func ErrorStreamingDisabled(method string) error {
return &ResponseError{
Err: fmt.Sprintf("streaming method %s disabled", method),
Code: codes.NotFound,
}
}
func ErrorTimeout(op string, timeout time.Duration) error {
return &ResponseError{
Err: fmt.Sprintf("%s timed out after %s", op, timeout.String()),
Code: codes.DeadlineExceeded,
}
}