Merge pull request #35661 from timstclair/dockershim
Automatic merge from submit-queue Implement streaming CRI methods in dockershim *NOTE: Temporarily includes commit from https://github.com/kubernetes/kubernetes/pull/35330 - only review the second commit.* Builds on https://github.com/kubernetes/kubernetes/pull/35330, using the library to implement the streaming methods in various CRI shims. This does not actually wire up the new streaming methods in the kubelet (that will be my next PR). Once the new methods are wired up, I will delete the `Legacy{Exec,Attach,PortForward}` methods. /cc @kubernetes/sig-node @feiskyer
This commit is contained in:
@@ -17,7 +17,7 @@ limitations under the License.
|
||||
package api
|
||||
|
||||
import (
|
||||
"io"
|
||||
"time"
|
||||
|
||||
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||
)
|
||||
@@ -43,8 +43,13 @@ type ContainerManager interface {
|
||||
ListContainers(filter *runtimeApi.ContainerFilter) ([]*runtimeApi.Container, error)
|
||||
// ContainerStatus returns the status of the container.
|
||||
ContainerStatus(containerID string) (*runtimeApi.ContainerStatus, error)
|
||||
// Exec executes a command in the container.
|
||||
Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error
|
||||
// ExecSync executes a command in the container, and returns the stdout output.
|
||||
// If command exits with a non-zero exit code, an error is returned.
|
||||
ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error)
|
||||
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
|
||||
Exec(*runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error)
|
||||
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
|
||||
Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error)
|
||||
}
|
||||
|
||||
// PodSandboxManager contains methods for operating on PodSandboxes. The methods
|
||||
@@ -63,6 +68,8 @@ type PodSandboxManager interface {
|
||||
PodSandboxStatus(podSandboxID string) (*runtimeApi.PodSandboxStatus, error)
|
||||
// ListPodSandbox returns a list of Sandbox.
|
||||
ListPodSandbox(filter *runtimeApi.PodSandboxFilter) ([]*runtimeApi.PodSandbox, error)
|
||||
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
|
||||
PortForward(*runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error)
|
||||
}
|
||||
|
||||
// RuntimeService interface should be implemented by a container runtime.
|
||||
|
@@ -18,7 +18,6 @@ package testing
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -214,6 +213,14 @@ func (r *FakeRuntimeService) ListPodSandbox(filter *runtimeApi.PodSandboxFilter)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (r *FakeRuntimeService) PortForward(*runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
r.Called = append(r.Called, "PortForward")
|
||||
return &runtimeApi.PortForwardResponse{}, nil
|
||||
}
|
||||
|
||||
func (r *FakeRuntimeService) CreateContainer(podSandboxID string, config *runtimeApi.ContainerConfig, sandboxConfig *runtimeApi.PodSandboxConfig) (string, error) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
@@ -349,12 +356,28 @@ func (r *FakeRuntimeService) ContainerStatus(containerID string) (*runtimeApi.Co
|
||||
return &status, nil
|
||||
}
|
||||
|
||||
func (r *FakeRuntimeService) Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error {
|
||||
func (r *FakeRuntimeService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
r.Called = append(r.Called, "ExecSync")
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
func (r *FakeRuntimeService) Exec(*runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
r.Called = append(r.Called, "Exec")
|
||||
return nil
|
||||
return &runtimeApi.ExecResponse{}, nil
|
||||
}
|
||||
|
||||
func (r *FakeRuntimeService) Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
r.Called = append(r.Called, "Attach")
|
||||
return &runtimeApi.AttachResponse{}, nil
|
||||
}
|
||||
|
||||
func (r *FakeRuntimeService) UpdateRuntimeConfig(runtimeCOnfig *runtimeApi.RuntimeConfig) error {
|
||||
|
@@ -19,6 +19,7 @@ go_library(
|
||||
"docker_image.go",
|
||||
"docker_sandbox.go",
|
||||
"docker_service.go",
|
||||
"docker_streaming.go",
|
||||
"helpers.go",
|
||||
"legacy.go",
|
||||
"naming.go",
|
||||
@@ -32,7 +33,9 @@ go_library(
|
||||
"//pkg/kubelet/dockertools:go_default_library",
|
||||
"//pkg/kubelet/leaky:go_default_library",
|
||||
"//pkg/kubelet/qos:go_default_library",
|
||||
"//pkg/kubelet/server/streaming:go_default_library",
|
||||
"//pkg/kubelet/types:go_default_library",
|
||||
"//pkg/kubelet/util/ioutils:go_default_library",
|
||||
"//pkg/util/term:go_default_library",
|
||||
"//vendor:github.com/docker/engine-api/types",
|
||||
"//vendor:github.com/docker/engine-api/types/container",
|
||||
|
@@ -18,7 +18,6 @@ package dockershim
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
@@ -388,10 +387,3 @@ func (ds *dockerService) ContainerStatus(containerID string) (*runtimeApi.Contai
|
||||
Annotations: annotations,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Exec execute a command in the container.
|
||||
// TODO: Need to handle terminal resizing before implementing this function.
|
||||
// https://github.com/kubernetes/kubernetes/issues/29579.
|
||||
func (ds *dockerService) Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error {
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
@@ -25,6 +25,7 @@ import (
|
||||
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/dockertools"
|
||||
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
|
||||
"k8s.io/kubernetes/pkg/util/term"
|
||||
)
|
||||
|
||||
@@ -55,13 +56,27 @@ const (
|
||||
var internalLabelKeys []string = []string{containerTypeLabelKey, containerLogPathLabelKey, sandboxIDLabelKey}
|
||||
|
||||
// NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process.
|
||||
func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot string, podSandboxImage string) DockerService {
|
||||
return &dockerService{
|
||||
func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot string, podSandboxImage string, streamingConfig *streaming.Config) (DockerService, error) {
|
||||
ds := &dockerService{
|
||||
seccompProfileRoot: seccompProfileRoot,
|
||||
client: dockertools.NewInstrumentedDockerInterface(client),
|
||||
os: kubecontainer.RealOS{},
|
||||
podSandboxImage: podSandboxImage,
|
||||
streamingRuntime: &streamingRuntime{
|
||||
client: client,
|
||||
// Only the native exec handling is supported for now.
|
||||
// TODO(#35747) - Either deprecate nsenter exec handling, or add support for it here.
|
||||
execHandler: &dockertools.NativeExecHandler{},
|
||||
},
|
||||
}
|
||||
if streamingConfig != nil {
|
||||
var err error
|
||||
ds.streamingServer, err = streaming.NewServer(*streamingConfig, ds.streamingRuntime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return ds, nil
|
||||
}
|
||||
|
||||
// DockerService is an interface that embeds both the new RuntimeService and
|
||||
@@ -78,11 +93,9 @@ type DockerService interface {
|
||||
type DockerLegacyService interface {
|
||||
// Supporting legacy methods for docker.
|
||||
GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error)
|
||||
kubecontainer.ContainerAttacher
|
||||
PortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error
|
||||
|
||||
// TODO: Remove this once exec is properly defined in CRI.
|
||||
ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
|
||||
LegacyExec(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
|
||||
LegacyAttach(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
|
||||
LegacyPortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error
|
||||
}
|
||||
|
||||
type dockerService struct {
|
||||
@@ -90,6 +103,8 @@ type dockerService struct {
|
||||
client dockertools.DockerInterface
|
||||
os kubecontainer.OSInterface
|
||||
podSandboxImage string
|
||||
streamingRuntime *streamingRuntime
|
||||
streamingServer streaming.Server
|
||||
}
|
||||
|
||||
// Version returns the runtime name, runtime version and runtime API version
|
||||
|
149
pkg/kubelet/dockershim/docker_streaming.go
Normal file
149
pkg/kubelet/dockershim/docker_streaming.go
Normal file
@@ -0,0 +1,149 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package dockershim
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
dockertypes "github.com/docker/engine-api/types"
|
||||
runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||
"k8s.io/kubernetes/pkg/kubelet/dockertools"
|
||||
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
|
||||
"k8s.io/kubernetes/pkg/util/term"
|
||||
)
|
||||
|
||||
type streamingRuntime struct {
|
||||
client dockertools.DockerInterface
|
||||
execHandler dockertools.ExecHandler
|
||||
}
|
||||
|
||||
var _ streaming.Runtime = &streamingRuntime{}
|
||||
|
||||
func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
|
||||
return r.exec(containerID, cmd, in, out, err, tty, resize, 0)
|
||||
}
|
||||
|
||||
// Internal version of Exec adds a timeout.
|
||||
func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
|
||||
container, err := checkContainerStatus(r.client, containerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO(timstclair): Clean this up once PR#33366 merges.
|
||||
if timeout <= 0 {
|
||||
// Run until command exits.
|
||||
return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize)
|
||||
}
|
||||
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
errCh <- r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize)
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
return err
|
||||
case <-time.After(timeout):
|
||||
return streaming.ErrorTimeout("exec", timeout)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *streamingRuntime) Attach(containerID string, in io.Reader, out, errw io.WriteCloser, resize <-chan term.Size) error {
|
||||
container, err := checkContainerStatus(r.client, containerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tty := container.Config.Tty
|
||||
return dockertools.AttachContainer(r.client, containerID, in, out, errw, tty, resize)
|
||||
}
|
||||
|
||||
func (r *streamingRuntime) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
|
||||
if port < 0 || port > math.MaxUint16 {
|
||||
return fmt.Errorf("invalid port %d", port)
|
||||
}
|
||||
return dockertools.PortForward(r.client, podSandboxID, uint16(port), stream)
|
||||
}
|
||||
|
||||
// ExecSync executes a command in the container, and returns the stdout output.
|
||||
// If command exits with a non-zero exit code, an error is returned.
|
||||
func (ds *dockerService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
|
||||
var stdoutBuffer, stderrBuffer bytes.Buffer
|
||||
err = ds.streamingRuntime.exec(containerID, cmd,
|
||||
nil, // in
|
||||
ioutils.WriteCloserWrapper(&stdoutBuffer),
|
||||
ioutils.WriteCloserWrapper(&stderrBuffer),
|
||||
false, // tty
|
||||
nil, // resize
|
||||
timeout)
|
||||
return stdoutBuffer.Bytes(), stderrBuffer.Bytes(), err
|
||||
}
|
||||
|
||||
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
|
||||
func (ds *dockerService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
|
||||
if ds.streamingServer == nil {
|
||||
return nil, streaming.ErrorStreamingDisabled("exec")
|
||||
}
|
||||
_, err := checkContainerStatus(ds.client, req.GetContainerId())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ds.streamingServer.GetExec(req)
|
||||
}
|
||||
|
||||
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
|
||||
func (ds *dockerService) Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
|
||||
if ds.streamingServer == nil {
|
||||
return nil, streaming.ErrorStreamingDisabled("attach")
|
||||
}
|
||||
container, err := checkContainerStatus(ds.client, req.GetContainerId())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tty := container.Config.Tty
|
||||
return ds.streamingServer.GetAttach(req, tty)
|
||||
}
|
||||
|
||||
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
|
||||
func (ds *dockerService) PortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
|
||||
if ds.streamingServer == nil {
|
||||
return nil, streaming.ErrorStreamingDisabled("port forward")
|
||||
}
|
||||
_, err := checkContainerStatus(ds.client, req.GetPodSandboxId())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO(timstclair): Verify that ports are exposed.
|
||||
return ds.streamingServer.GetPortForward(req)
|
||||
}
|
||||
|
||||
func checkContainerStatus(client dockertools.DockerInterface, containerID string) (*dockertypes.ContainerJSON, error) {
|
||||
container, err := client.InspectContainer(containerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !container.State.Running {
|
||||
return nil, fmt.Errorf("container not running (%s)", container.ID)
|
||||
}
|
||||
return container, nil
|
||||
}
|
@@ -17,7 +17,6 @@ limitations under the License.
|
||||
package dockershim
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
@@ -31,8 +30,8 @@ import (
|
||||
// directly.
|
||||
|
||||
// TODO: implement the methods in this file.
|
||||
func (ds *dockerService) AttachContainer(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) {
|
||||
return dockertools.AttachContainer(ds.client, id, stdin, stdout, stderr, tty, resize)
|
||||
func (ds *dockerService) LegacyAttach(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) {
|
||||
return ds.streamingRuntime.Attach(id.ID, stdin, stdout, stderr, resize)
|
||||
}
|
||||
|
||||
func (ds *dockerService) GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) {
|
||||
@@ -43,19 +42,10 @@ func (ds *dockerService) GetContainerLogs(pod *api.Pod, containerID kubecontaine
|
||||
return dockertools.GetContainerLogs(ds.client, pod, containerID, logOptions, stdout, stderr, container.Config.Tty)
|
||||
}
|
||||
|
||||
func (ds *dockerService) PortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error {
|
||||
return dockertools.PortForward(ds.client, sandboxID, port, stream)
|
||||
func (ds *dockerService) LegacyPortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error {
|
||||
return ds.streamingRuntime.PortForward(sandboxID, int32(port), stream)
|
||||
}
|
||||
|
||||
func (ds *dockerService) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
|
||||
container, err := ds.client.InspectContainer(containerID.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !container.State.Running {
|
||||
return fmt.Errorf("container not running (%s)", container.ID)
|
||||
}
|
||||
|
||||
handler := &dockertools.NativeExecHandler{}
|
||||
return handler.ExecInContainer(ds.client, container, cmd, stdin, stdout, stderr, tty, resize)
|
||||
func (ds *dockerService) LegacyExec(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
|
||||
return ds.streamingRuntime.Exec(containerID.ID, cmd, stdin, stdout, stderr, tty, resize)
|
||||
}
|
||||
|
@@ -1269,16 +1269,16 @@ func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID,
|
||||
}
|
||||
|
||||
func (dm *DockerManager) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
|
||||
return AttachContainer(dm.client, containerID, stdin, stdout, stderr, tty, resize)
|
||||
return AttachContainer(dm.client, containerID.ID, stdin, stdout, stderr, tty, resize)
|
||||
}
|
||||
|
||||
// Temporarily export this function to share with dockershim.
|
||||
// TODO: clean this up.
|
||||
func AttachContainer(client DockerInterface, containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
|
||||
func AttachContainer(client DockerInterface, containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
|
||||
// Have to start this before the call to client.AttachToContainer because client.AttachToContainer is a blocking
|
||||
// call :-( Otherwise, resize events don't get processed and the terminal never resizes.
|
||||
kubecontainer.HandleResizing(resize, func(size term.Size) {
|
||||
client.ResizeContainerTTY(containerID.ID, int(size.Height), int(size.Width))
|
||||
client.ResizeContainerTTY(containerID, int(size.Height), int(size.Width))
|
||||
})
|
||||
|
||||
// TODO(random-liu): Do we really use the *Logs* field here?
|
||||
@@ -1294,7 +1294,7 @@ func AttachContainer(client DockerInterface, containerID kubecontainer.Container
|
||||
ErrorStream: stderr,
|
||||
RawTerminal: tty,
|
||||
}
|
||||
return client.AttachToContainer(containerID.ID, opts, sopts)
|
||||
return client.AttachToContainer(containerID, opts, sopts)
|
||||
}
|
||||
|
||||
func noPodInfraContainerError(podName, podNamespace string) error {
|
||||
|
@@ -489,7 +489,10 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
||||
case "cri":
|
||||
// Use the new CRI shim for docker. This is needed for testing the
|
||||
// docker integration through CRI, and may be removed in the future.
|
||||
dockerService := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage)
|
||||
dockerService, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
runtimeService := dockerService.(internalApi.RuntimeService)
|
||||
imageService := dockerService.(internalApi.ImageManagerService)
|
||||
|
||||
|
@@ -17,7 +17,6 @@ limitations under the License.
|
||||
package kuberuntime
|
||||
|
||||
import (
|
||||
"io"
|
||||
"time"
|
||||
|
||||
internalApi "k8s.io/kubernetes/pkg/kubelet/api"
|
||||
@@ -123,13 +122,31 @@ func (in instrumentedRuntimeService) ContainerStatus(containerID string) (*runti
|
||||
return out, err
|
||||
}
|
||||
|
||||
func (in instrumentedRuntimeService) Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error {
|
||||
func (in instrumentedRuntimeService) ExecSync(containerID string, cmd []string, timeout time.Duration) ([]byte, []byte, error) {
|
||||
const operation = "exec_sync"
|
||||
defer recordOperation(operation, time.Now())
|
||||
|
||||
stdout, stderr, err := in.service.ExecSync(containerID, cmd, timeout)
|
||||
recordError(operation, err)
|
||||
return stdout, stderr, err
|
||||
}
|
||||
|
||||
func (in instrumentedRuntimeService) Exec(req *runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) {
|
||||
const operation = "exec"
|
||||
defer recordOperation(operation, time.Now())
|
||||
|
||||
err := in.service.Exec(containerID, cmd, tty, stdin, stdout, stderr)
|
||||
resp, err := in.service.Exec(req)
|
||||
recordError(operation, err)
|
||||
return err
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (in instrumentedRuntimeService) Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) {
|
||||
const operation = "attach"
|
||||
defer recordOperation(operation, time.Now())
|
||||
|
||||
resp, err := in.service.Attach(req)
|
||||
recordError(operation, err)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (in instrumentedRuntimeService) RunPodSandbox(config *runtimeApi.PodSandboxConfig) (string, error) {
|
||||
@@ -177,6 +194,15 @@ func (in instrumentedRuntimeService) ListPodSandbox(filter *runtimeApi.PodSandbo
|
||||
return out, err
|
||||
}
|
||||
|
||||
func (in instrumentedRuntimeService) PortForward(req *runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) {
|
||||
const operation = "port_forward"
|
||||
defer recordOperation(operation, time.Now())
|
||||
|
||||
resp, err := in.service.PortForward(req)
|
||||
recordError(operation, err)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (in instrumentedRuntimeService) UpdateRuntimeConfig(runtimeConfig *runtimeApi.RuntimeConfig) error {
|
||||
const operation = "update_runtime_config"
|
||||
defer recordOperation(operation, time.Now())
|
||||
|
@@ -668,7 +668,7 @@ func (m *kubeGenericRuntimeManager) AttachContainer(id kubecontainer.ContainerID
|
||||
// now to unblock other tests.
|
||||
// TODO: remove this hack after attach is defined in CRI.
|
||||
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {
|
||||
return ds.AttachContainer(id, stdin, stdout, stderr, tty, resize)
|
||||
return ds.LegacyAttach(id, stdin, stdout, stderr, tty, resize)
|
||||
}
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
@@ -694,7 +694,7 @@ func (m *kubeGenericRuntimeManager) ExecInContainer(containerID kubecontainer.Co
|
||||
// now to unblock other tests.
|
||||
// TODO: remove this hack after exec is defined in CRI.
|
||||
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {
|
||||
return ds.ExecInContainer(containerID, cmd, stdin, stdout, stderr, tty, resize)
|
||||
return ds.LegacyExec(containerID, cmd, stdin, stdout, stderr, tty, resize)
|
||||
}
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
@@ -1001,7 +1001,7 @@ func (m *kubeGenericRuntimeManager) PortForward(pod *kubecontainer.Pod, port uin
|
||||
// now to unblock other tests.
|
||||
// TODO: remove this hack after portforward is defined in CRI.
|
||||
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {
|
||||
return ds.PortForward(pod.Sandboxes[0].ID.ID, port, stream)
|
||||
return ds.LegacyPortForward(pod.Sandboxes[0].ID.ID, port, stream)
|
||||
}
|
||||
|
||||
return fmt.Errorf("not implemented")
|
||||
|
@@ -22,6 +22,7 @@ go_library(
|
||||
deps = [
|
||||
"//pkg/kubelet/api:go_default_library",
|
||||
"//pkg/kubelet/api/v1alpha1/runtime:go_default_library",
|
||||
"//pkg/util/exec:go_default_library",
|
||||
"//vendor:github.com/golang/glog",
|
||||
"//vendor:golang.org/x/net/context",
|
||||
"//vendor:google.golang.org/grpc",
|
||||
|
@@ -17,14 +17,15 @@ limitations under the License.
|
||||
package remote
|
||||
|
||||
import (
|
||||
"io"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"fmt"
|
||||
"github.com/golang/glog"
|
||||
"google.golang.org/grpc"
|
||||
internalApi "k8s.io/kubernetes/pkg/kubelet/api"
|
||||
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||
utilexec "k8s.io/kubernetes/pkg/util/exec"
|
||||
)
|
||||
|
||||
// RemoteRuntimeService is a gRPC implementation of internalApi.RuntimeService.
|
||||
@@ -247,10 +248,75 @@ func (r *RemoteRuntimeService) ContainerStatus(containerID string) (*runtimeApi.
|
||||
return resp.Status, nil
|
||||
}
|
||||
|
||||
// Exec executes a command in the container.
|
||||
// TODO: support terminal resizing for exec, refer https://github.com/kubernetes/kubernetes/issues/29579.
|
||||
func (r *RemoteRuntimeService) Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error {
|
||||
return fmt.Errorf("Not implemented")
|
||||
// ExecSync executes a command in the container, and returns the stdout output.
|
||||
// If command exits with a non-zero exit code, an error is returned.
|
||||
func (r *RemoteRuntimeService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
|
||||
ctx, cancel := getContextWithTimeout(r.timeout)
|
||||
defer cancel()
|
||||
|
||||
timeoutSeconds := int64(timeout.Seconds())
|
||||
req := &runtimeApi.ExecSyncRequest{
|
||||
ContainerId: &containerID,
|
||||
Cmd: cmd,
|
||||
Timeout: &timeoutSeconds,
|
||||
}
|
||||
resp, err := r.runtimeClient.ExecSync(ctx, req)
|
||||
if err != nil {
|
||||
glog.Errorf("ExecSync %s '%s' from runtime service failed: %v", containerID, strings.Join(cmd, " "), err)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
err = nil
|
||||
if resp.GetExitCode() != 0 {
|
||||
err = utilexec.CodeExitError{
|
||||
Err: fmt.Errorf("command '%s' exited with %d: %s", strings.Join(cmd, " "), resp.GetExitCode(), resp.GetStderr()),
|
||||
Code: int(resp.GetExitCode()),
|
||||
}
|
||||
}
|
||||
|
||||
return resp.GetStdout(), resp.GetStderr(), err
|
||||
}
|
||||
|
||||
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
|
||||
func (r *RemoteRuntimeService) Exec(req *runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) {
|
||||
ctx, cancel := getContextWithTimeout(r.timeout)
|
||||
defer cancel()
|
||||
|
||||
resp, err := r.runtimeClient.Exec(ctx, req)
|
||||
if err != nil {
|
||||
glog.Errorf("Exec %s '%s' from runtime service failed: %v", req.GetContainerId(), strings.Join(req.GetCmd(), " "), err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
|
||||
func (r *RemoteRuntimeService) Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) {
|
||||
ctx, cancel := getContextWithTimeout(r.timeout)
|
||||
defer cancel()
|
||||
|
||||
resp, err := r.runtimeClient.Attach(ctx, req)
|
||||
if err != nil {
|
||||
glog.Errorf("Attach %s from runtime service failed: %v", req.GetContainerId(), err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
|
||||
func (r *RemoteRuntimeService) PortForward(req *runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) {
|
||||
ctx, cancel := getContextWithTimeout(r.timeout)
|
||||
defer cancel()
|
||||
|
||||
resp, err := r.runtimeClient.PortForward(ctx, req)
|
||||
if err != nil {
|
||||
glog.Errorf("PortForward %s from runtime service failed: %v", req.GetPodSandboxId(), err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (r *RemoteRuntimeService) UpdateRuntimeConfig(runtimeConfig *runtimeApi.RuntimeConfig) error {
|
||||
|
@@ -23,6 +23,7 @@ go_library(
|
||||
deps = [
|
||||
"//pkg/kubelet/api:go_default_library",
|
||||
"//pkg/kubelet/api/v1alpha1/runtime:go_default_library",
|
||||
"//pkg/kubelet/util/ioutils:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@@ -17,7 +17,7 @@ limitations under the License.
|
||||
package rktshim
|
||||
|
||||
import (
|
||||
"io"
|
||||
"time"
|
||||
|
||||
kubeletApi "k8s.io/kubernetes/pkg/kubelet/api"
|
||||
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||
@@ -65,7 +65,18 @@ func (*Runtime) ContainerStatus(string) (*runtimeApi.ContainerStatus, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
// Exec executes a command inside an app running inside a pod sanbox.
|
||||
func (*Runtime) Exec(string, []string, bool, io.Reader, io.WriteCloser, io.WriteCloser) error {
|
||||
// ExecSync executes a command in the container, and returns the stdout output.
|
||||
// If command exits with a non-zero exit code, an error is returned.
|
||||
func (*Runtime) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
|
||||
func (*Runtime) Exec(*runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
|
||||
func (*Runtime) Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package rktshim
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io"
|
||||
"math/rand"
|
||||
@@ -24,6 +25,12 @@ import (
|
||||
|
||||
kubeletApi "k8s.io/kubernetes/pkg/kubelet/api"
|
||||
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
|
||||
)
|
||||
|
||||
const (
|
||||
FakeStreamingHost = "localhost"
|
||||
FakeStreamingPort = "12345"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -200,16 +207,34 @@ func (r *FakeRuntime) ContainerStatus(id string) (*runtimeApi.ContainerStatus, e
|
||||
return &c.Status, nil
|
||||
}
|
||||
|
||||
func (r *FakeRuntime) Exec(id string, cmd []string, tty bool, in io.Reader, out, err io.WriteCloser) error {
|
||||
c, ok := r.Containers[id]
|
||||
func (r *FakeRuntime) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
|
||||
c, ok := r.Containers[containerID]
|
||||
if !ok {
|
||||
return ErrContainerNotFound
|
||||
return nil, nil, ErrContainerNotFound
|
||||
}
|
||||
|
||||
// TODO(tmrts): Validate the assumption that container has to be running for exec to work.
|
||||
if c.State != runtimeApi.ContainerState_RUNNING {
|
||||
return ErrInvalidContainerStateTransition
|
||||
return nil, nil, ErrInvalidContainerStateTransition
|
||||
}
|
||||
|
||||
return c.Exec(cmd, in, out, err)
|
||||
var stdoutBuffer, stderrBuffer bytes.Buffer
|
||||
err = c.Exec(cmd, nil,
|
||||
ioutils.WriteCloserWrapper(&stdoutBuffer),
|
||||
ioutils.WriteCloserWrapper(&stderrBuffer))
|
||||
return stdoutBuffer.Bytes(), stderrBuffer.Bytes(), err
|
||||
}
|
||||
|
||||
func (r *FakeRuntime) Exec(req *runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) {
|
||||
url := "http://" + FakeStreamingHost + ":" + FakeStreamingPort + "/exec/" + req.GetContainerId()
|
||||
return &runtimeApi.ExecResponse{
|
||||
Url: &url,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *FakeRuntime) Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) {
|
||||
url := "http://" + FakeStreamingHost + ":" + FakeStreamingPort + "/attach/" + req.GetContainerId()
|
||||
return &runtimeApi.AttachResponse{
|
||||
Url: &url,
|
||||
}, nil
|
||||
}
|
||||
|
@@ -57,3 +57,8 @@ func (*PodSandboxManager) PodSandboxStatus(string) (*runtimeApi.PodSandboxStatus
|
||||
func (*PodSandboxManager) ListPodSandbox(*runtimeApi.PodSandboxFilter) ([]*runtimeApi.PodSandbox, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
|
||||
func (*PodSandboxManager) PortForward(*runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
@@ -12,7 +12,10 @@ load(
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["server.go"],
|
||||
srcs = [
|
||||
"errors.go",
|
||||
"server.go",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/kubelet/api/v1alpha1/runtime:go_default_library",
|
||||
@@ -21,6 +24,7 @@ go_library(
|
||||
"//pkg/types:go_default_library",
|
||||
"//pkg/util/term:go_default_library",
|
||||
"//vendor:github.com/emicklei/go-restful",
|
||||
"//vendor:google.golang.org/grpc/codes",
|
||||
"//vendor:k8s.io/client-go/pkg/api",
|
||||
],
|
||||
)
|
||||
|
47
pkg/kubelet/server/streaming/errors.go
Normal file
47
pkg/kubelet/server/streaming/errors.go
Normal file
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package streaming
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/codes"
|
||||
)
|
||||
|
||||
type ResponseError struct {
|
||||
Err string
|
||||
Code codes.Code
|
||||
}
|
||||
|
||||
func (e *ResponseError) Error() string {
|
||||
return e.Err
|
||||
}
|
||||
|
||||
func ErrorStreamingDisabled(method string) error {
|
||||
return &ResponseError{
|
||||
Err: fmt.Sprintf("streaming method %s disabled", method),
|
||||
Code: codes.NotFound,
|
||||
}
|
||||
}
|
||||
|
||||
func ErrorTimeout(op string, timeout time.Duration) error {
|
||||
return &ResponseError{
|
||||
Err: fmt.Sprintf("%s timed out after %s", op, timeout.String()),
|
||||
Code: codes.DeadlineExceeded,
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user