diff --git a/pkg/kubelet/api/services.go b/pkg/kubelet/api/services.go index ed1e5510097..0111e7fc354 100644 --- a/pkg/kubelet/api/services.go +++ b/pkg/kubelet/api/services.go @@ -17,7 +17,7 @@ limitations under the License. package api import ( - "io" + "time" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" ) @@ -43,8 +43,13 @@ type ContainerManager interface { ListContainers(filter *runtimeApi.ContainerFilter) ([]*runtimeApi.Container, error) // ContainerStatus returns the status of the container. ContainerStatus(containerID string) (*runtimeApi.ContainerStatus, error) - // Exec executes a command in the container. - Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error + // ExecSync executes a command in the container, and returns the stdout output. + // If command exits with a non-zero exit code, an error is returned. + ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) + // Exec prepares a streaming endpoint to execute a command in the container, and returns the address. + Exec(*runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) + // Attach prepares a streaming endpoint to attach to a running container, and returns the address. + Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) } // PodSandboxManager contains methods for operating on PodSandboxes. The methods @@ -63,6 +68,8 @@ type PodSandboxManager interface { PodSandboxStatus(podSandboxID string) (*runtimeApi.PodSandboxStatus, error) // ListPodSandbox returns a list of Sandbox. ListPodSandbox(filter *runtimeApi.PodSandboxFilter) ([]*runtimeApi.PodSandbox, error) + // PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. + PortForward(*runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) } // RuntimeService interface should be implemented by a container runtime. diff --git a/pkg/kubelet/api/testing/fake_runtime_service.go b/pkg/kubelet/api/testing/fake_runtime_service.go index 40764ab237a..51a0d1c04c7 100644 --- a/pkg/kubelet/api/testing/fake_runtime_service.go +++ b/pkg/kubelet/api/testing/fake_runtime_service.go @@ -18,7 +18,6 @@ package testing import ( "fmt" - "io" "reflect" "sync" "time" @@ -214,6 +213,14 @@ func (r *FakeRuntimeService) ListPodSandbox(filter *runtimeApi.PodSandboxFilter) return result, nil } +func (r *FakeRuntimeService) PortForward(*runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) { + r.Lock() + defer r.Unlock() + + r.Called = append(r.Called, "PortForward") + return &runtimeApi.PortForwardResponse{}, nil +} + func (r *FakeRuntimeService) CreateContainer(podSandboxID string, config *runtimeApi.ContainerConfig, sandboxConfig *runtimeApi.PodSandboxConfig) (string, error) { r.Lock() defer r.Unlock() @@ -349,12 +356,28 @@ func (r *FakeRuntimeService) ContainerStatus(containerID string) (*runtimeApi.Co return &status, nil } -func (r *FakeRuntimeService) Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error { +func (r *FakeRuntimeService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) { + r.Lock() + defer r.Unlock() + + r.Called = append(r.Called, "ExecSync") + return nil, nil, nil +} + +func (r *FakeRuntimeService) Exec(*runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) { r.Lock() defer r.Unlock() r.Called = append(r.Called, "Exec") - return nil + return &runtimeApi.ExecResponse{}, nil +} + +func (r *FakeRuntimeService) Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) { + r.Lock() + defer r.Unlock() + + r.Called = append(r.Called, "Attach") + return &runtimeApi.AttachResponse{}, nil } func (r *FakeRuntimeService) UpdateRuntimeConfig(runtimeCOnfig *runtimeApi.RuntimeConfig) error { diff --git a/pkg/kubelet/dockershim/BUILD b/pkg/kubelet/dockershim/BUILD index e9a37bcefac..fcc0428a797 100644 --- a/pkg/kubelet/dockershim/BUILD +++ b/pkg/kubelet/dockershim/BUILD @@ -19,6 +19,7 @@ go_library( "docker_image.go", "docker_sandbox.go", "docker_service.go", + "docker_streaming.go", "helpers.go", "legacy.go", "naming.go", @@ -32,7 +33,9 @@ go_library( "//pkg/kubelet/dockertools:go_default_library", "//pkg/kubelet/leaky:go_default_library", "//pkg/kubelet/qos:go_default_library", + "//pkg/kubelet/server/streaming:go_default_library", "//pkg/kubelet/types:go_default_library", + "//pkg/kubelet/util/ioutils:go_default_library", "//pkg/util/term:go_default_library", "//vendor:github.com/docker/engine-api/types", "//vendor:github.com/docker/engine-api/types/container", diff --git a/pkg/kubelet/dockershim/docker_container.go b/pkg/kubelet/dockershim/docker_container.go index 08022326db0..74d3abf4a12 100644 --- a/pkg/kubelet/dockershim/docker_container.go +++ b/pkg/kubelet/dockershim/docker_container.go @@ -18,7 +18,6 @@ package dockershim import ( "fmt" - "io" "os" "path/filepath" "time" @@ -388,10 +387,3 @@ func (ds *dockerService) ContainerStatus(containerID string) (*runtimeApi.Contai Annotations: annotations, }, nil } - -// Exec execute a command in the container. -// TODO: Need to handle terminal resizing before implementing this function. -// https://github.com/kubernetes/kubernetes/issues/29579. -func (ds *dockerService) Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error { - return fmt.Errorf("not implemented") -} diff --git a/pkg/kubelet/dockershim/docker_service.go b/pkg/kubelet/dockershim/docker_service.go index d04f69ac49d..b37a714ec03 100644 --- a/pkg/kubelet/dockershim/docker_service.go +++ b/pkg/kubelet/dockershim/docker_service.go @@ -25,6 +25,7 @@ import ( runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" + "k8s.io/kubernetes/pkg/kubelet/server/streaming" "k8s.io/kubernetes/pkg/util/term" ) @@ -55,13 +56,27 @@ const ( var internalLabelKeys []string = []string{containerTypeLabelKey, containerLogPathLabelKey, sandboxIDLabelKey} // NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process. -func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot string, podSandboxImage string) DockerService { - return &dockerService{ +func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot string, podSandboxImage string, streamingConfig *streaming.Config) (DockerService, error) { + ds := &dockerService{ seccompProfileRoot: seccompProfileRoot, client: dockertools.NewInstrumentedDockerInterface(client), os: kubecontainer.RealOS{}, podSandboxImage: podSandboxImage, + streamingRuntime: &streamingRuntime{ + client: client, + // Only the native exec handling is supported for now. + // TODO(#35747) - Either deprecate nsenter exec handling, or add support for it here. + execHandler: &dockertools.NativeExecHandler{}, + }, } + if streamingConfig != nil { + var err error + ds.streamingServer, err = streaming.NewServer(*streamingConfig, ds.streamingRuntime) + if err != nil { + return nil, err + } + } + return ds, nil } // DockerService is an interface that embeds both the new RuntimeService and @@ -78,11 +93,9 @@ type DockerService interface { type DockerLegacyService interface { // Supporting legacy methods for docker. GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) - kubecontainer.ContainerAttacher - PortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error - - // TODO: Remove this once exec is properly defined in CRI. - ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error + LegacyExec(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error + LegacyAttach(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error + LegacyPortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error } type dockerService struct { @@ -90,6 +103,8 @@ type dockerService struct { client dockertools.DockerInterface os kubecontainer.OSInterface podSandboxImage string + streamingRuntime *streamingRuntime + streamingServer streaming.Server } // Version returns the runtime name, runtime version and runtime API version diff --git a/pkg/kubelet/dockershim/docker_streaming.go b/pkg/kubelet/dockershim/docker_streaming.go new file mode 100644 index 00000000000..da1d0a6e249 --- /dev/null +++ b/pkg/kubelet/dockershim/docker_streaming.go @@ -0,0 +1,149 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dockershim + +import ( + "bytes" + "fmt" + "io" + "math" + "time" + + dockertypes "github.com/docker/engine-api/types" + runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + "k8s.io/kubernetes/pkg/kubelet/dockertools" + "k8s.io/kubernetes/pkg/kubelet/server/streaming" + "k8s.io/kubernetes/pkg/kubelet/util/ioutils" + "k8s.io/kubernetes/pkg/util/term" +) + +type streamingRuntime struct { + client dockertools.DockerInterface + execHandler dockertools.ExecHandler +} + +var _ streaming.Runtime = &streamingRuntime{} + +func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error { + return r.exec(containerID, cmd, in, out, err, tty, resize, 0) +} + +// Internal version of Exec adds a timeout. +func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error { + container, err := checkContainerStatus(r.client, containerID) + if err != nil { + return err + } + + // TODO(timstclair): Clean this up once PR#33366 merges. + if timeout <= 0 { + // Run until command exits. + return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize) + } + + errCh := make(chan error) + go func() { + errCh <- r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize) + }() + + select { + case err := <-errCh: + return err + case <-time.After(timeout): + return streaming.ErrorTimeout("exec", timeout) + } +} + +func (r *streamingRuntime) Attach(containerID string, in io.Reader, out, errw io.WriteCloser, resize <-chan term.Size) error { + container, err := checkContainerStatus(r.client, containerID) + if err != nil { + return err + } + + tty := container.Config.Tty + return dockertools.AttachContainer(r.client, containerID, in, out, errw, tty, resize) +} + +func (r *streamingRuntime) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error { + if port < 0 || port > math.MaxUint16 { + return fmt.Errorf("invalid port %d", port) + } + return dockertools.PortForward(r.client, podSandboxID, uint16(port), stream) +} + +// ExecSync executes a command in the container, and returns the stdout output. +// If command exits with a non-zero exit code, an error is returned. +func (ds *dockerService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) { + var stdoutBuffer, stderrBuffer bytes.Buffer + err = ds.streamingRuntime.exec(containerID, cmd, + nil, // in + ioutils.WriteCloserWrapper(&stdoutBuffer), + ioutils.WriteCloserWrapper(&stderrBuffer), + false, // tty + nil, // resize + timeout) + return stdoutBuffer.Bytes(), stderrBuffer.Bytes(), err +} + +// Exec prepares a streaming endpoint to execute a command in the container, and returns the address. +func (ds *dockerService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { + if ds.streamingServer == nil { + return nil, streaming.ErrorStreamingDisabled("exec") + } + _, err := checkContainerStatus(ds.client, req.GetContainerId()) + if err != nil { + return nil, err + } + return ds.streamingServer.GetExec(req) +} + +// Attach prepares a streaming endpoint to attach to a running container, and returns the address. +func (ds *dockerService) Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) { + if ds.streamingServer == nil { + return nil, streaming.ErrorStreamingDisabled("attach") + } + container, err := checkContainerStatus(ds.client, req.GetContainerId()) + if err != nil { + return nil, err + } + tty := container.Config.Tty + return ds.streamingServer.GetAttach(req, tty) +} + +// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. +func (ds *dockerService) PortForward(req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) { + if ds.streamingServer == nil { + return nil, streaming.ErrorStreamingDisabled("port forward") + } + _, err := checkContainerStatus(ds.client, req.GetPodSandboxId()) + if err != nil { + return nil, err + } + // TODO(timstclair): Verify that ports are exposed. + return ds.streamingServer.GetPortForward(req) +} + +func checkContainerStatus(client dockertools.DockerInterface, containerID string) (*dockertypes.ContainerJSON, error) { + container, err := client.InspectContainer(containerID) + if err != nil { + return nil, err + } + if !container.State.Running { + return nil, fmt.Errorf("container not running (%s)", container.ID) + } + return container, nil +} diff --git a/pkg/kubelet/dockershim/legacy.go b/pkg/kubelet/dockershim/legacy.go index 1f5068c33ca..a3d14b4524e 100644 --- a/pkg/kubelet/dockershim/legacy.go +++ b/pkg/kubelet/dockershim/legacy.go @@ -17,7 +17,6 @@ limitations under the License. package dockershim import ( - "fmt" "io" "k8s.io/kubernetes/pkg/api" @@ -31,8 +30,8 @@ import ( // directly. // TODO: implement the methods in this file. -func (ds *dockerService) AttachContainer(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) { - return dockertools.AttachContainer(ds.client, id, stdin, stdout, stderr, tty, resize) +func (ds *dockerService) LegacyAttach(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) { + return ds.streamingRuntime.Attach(id.ID, stdin, stdout, stderr, resize) } func (ds *dockerService) GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) { @@ -43,19 +42,10 @@ func (ds *dockerService) GetContainerLogs(pod *api.Pod, containerID kubecontaine return dockertools.GetContainerLogs(ds.client, pod, containerID, logOptions, stdout, stderr, container.Config.Tty) } -func (ds *dockerService) PortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error { - return dockertools.PortForward(ds.client, sandboxID, port, stream) +func (ds *dockerService) LegacyPortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error { + return ds.streamingRuntime.PortForward(sandboxID, int32(port), stream) } -func (ds *dockerService) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { - container, err := ds.client.InspectContainer(containerID.ID) - if err != nil { - return err - } - if !container.State.Running { - return fmt.Errorf("container not running (%s)", container.ID) - } - - handler := &dockertools.NativeExecHandler{} - return handler.ExecInContainer(ds.client, container, cmd, stdin, stdout, stderr, tty, resize) +func (ds *dockerService) LegacyExec(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { + return ds.streamingRuntime.Exec(containerID.ID, cmd, stdin, stdout, stderr, tty, resize) } diff --git a/pkg/kubelet/dockertools/docker_manager.go b/pkg/kubelet/dockertools/docker_manager.go index 123e57a229c..90ece063cc6 100644 --- a/pkg/kubelet/dockertools/docker_manager.go +++ b/pkg/kubelet/dockertools/docker_manager.go @@ -1269,16 +1269,16 @@ func (dm *DockerManager) ExecInContainer(containerID kubecontainer.ContainerID, } func (dm *DockerManager) AttachContainer(containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { - return AttachContainer(dm.client, containerID, stdin, stdout, stderr, tty, resize) + return AttachContainer(dm.client, containerID.ID, stdin, stdout, stderr, tty, resize) } // Temporarily export this function to share with dockershim. // TODO: clean this up. -func AttachContainer(client DockerInterface, containerID kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { +func AttachContainer(client DockerInterface, containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { // Have to start this before the call to client.AttachToContainer because client.AttachToContainer is a blocking // call :-( Otherwise, resize events don't get processed and the terminal never resizes. kubecontainer.HandleResizing(resize, func(size term.Size) { - client.ResizeContainerTTY(containerID.ID, int(size.Height), int(size.Width)) + client.ResizeContainerTTY(containerID, int(size.Height), int(size.Width)) }) // TODO(random-liu): Do we really use the *Logs* field here? @@ -1294,7 +1294,7 @@ func AttachContainer(client DockerInterface, containerID kubecontainer.Container ErrorStream: stderr, RawTerminal: tty, } - return client.AttachToContainer(containerID.ID, opts, sopts) + return client.AttachToContainer(containerID, opts, sopts) } func noPodInfraContainerError(podName, podNamespace string) error { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ef93a1b63a1..372e8577ddd 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -489,7 +489,10 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub case "cri": // Use the new CRI shim for docker. This is needed for testing the // docker integration through CRI, and may be removed in the future. - dockerService := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage) + dockerService, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, nil) + if err != nil { + return nil, err + } runtimeService := dockerService.(internalApi.RuntimeService) imageService := dockerService.(internalApi.ImageManagerService) diff --git a/pkg/kubelet/kuberuntime/instrumented_services.go b/pkg/kubelet/kuberuntime/instrumented_services.go index 4aaed743d6a..0275bd09f6c 100644 --- a/pkg/kubelet/kuberuntime/instrumented_services.go +++ b/pkg/kubelet/kuberuntime/instrumented_services.go @@ -17,7 +17,6 @@ limitations under the License. package kuberuntime import ( - "io" "time" internalApi "k8s.io/kubernetes/pkg/kubelet/api" @@ -123,13 +122,31 @@ func (in instrumentedRuntimeService) ContainerStatus(containerID string) (*runti return out, err } -func (in instrumentedRuntimeService) Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error { +func (in instrumentedRuntimeService) ExecSync(containerID string, cmd []string, timeout time.Duration) ([]byte, []byte, error) { + const operation = "exec_sync" + defer recordOperation(operation, time.Now()) + + stdout, stderr, err := in.service.ExecSync(containerID, cmd, timeout) + recordError(operation, err) + return stdout, stderr, err +} + +func (in instrumentedRuntimeService) Exec(req *runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) { const operation = "exec" defer recordOperation(operation, time.Now()) - err := in.service.Exec(containerID, cmd, tty, stdin, stdout, stderr) + resp, err := in.service.Exec(req) recordError(operation, err) - return err + return resp, err +} + +func (in instrumentedRuntimeService) Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) { + const operation = "attach" + defer recordOperation(operation, time.Now()) + + resp, err := in.service.Attach(req) + recordError(operation, err) + return resp, err } func (in instrumentedRuntimeService) RunPodSandbox(config *runtimeApi.PodSandboxConfig) (string, error) { @@ -177,6 +194,15 @@ func (in instrumentedRuntimeService) ListPodSandbox(filter *runtimeApi.PodSandbo return out, err } +func (in instrumentedRuntimeService) PortForward(req *runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) { + const operation = "port_forward" + defer recordOperation(operation, time.Now()) + + resp, err := in.service.PortForward(req) + recordError(operation, err) + return resp, err +} + func (in instrumentedRuntimeService) UpdateRuntimeConfig(runtimeConfig *runtimeApi.RuntimeConfig) error { const operation = "update_runtime_config" defer recordOperation(operation, time.Now()) diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index 83727d56884..71c50dce022 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -668,7 +668,7 @@ func (m *kubeGenericRuntimeManager) AttachContainer(id kubecontainer.ContainerID // now to unblock other tests. // TODO: remove this hack after attach is defined in CRI. if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok { - return ds.AttachContainer(id, stdin, stdout, stderr, tty, resize) + return ds.LegacyAttach(id, stdin, stdout, stderr, tty, resize) } return fmt.Errorf("not implemented") } @@ -694,7 +694,7 @@ func (m *kubeGenericRuntimeManager) ExecInContainer(containerID kubecontainer.Co // now to unblock other tests. // TODO: remove this hack after exec is defined in CRI. if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok { - return ds.ExecInContainer(containerID, cmd, stdin, stdout, stderr, tty, resize) + return ds.LegacyExec(containerID, cmd, stdin, stdout, stderr, tty, resize) } return fmt.Errorf("not implemented") } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index d4bd574db94..51f4c3e4baf 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -1001,7 +1001,7 @@ func (m *kubeGenericRuntimeManager) PortForward(pod *kubecontainer.Pod, port uin // now to unblock other tests. // TODO: remove this hack after portforward is defined in CRI. if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok { - return ds.PortForward(pod.Sandboxes[0].ID.ID, port, stream) + return ds.LegacyPortForward(pod.Sandboxes[0].ID.ID, port, stream) } return fmt.Errorf("not implemented") diff --git a/pkg/kubelet/remote/BUILD b/pkg/kubelet/remote/BUILD index 951d444eed0..da072bb896c 100644 --- a/pkg/kubelet/remote/BUILD +++ b/pkg/kubelet/remote/BUILD @@ -22,6 +22,7 @@ go_library( deps = [ "//pkg/kubelet/api:go_default_library", "//pkg/kubelet/api/v1alpha1/runtime:go_default_library", + "//pkg/util/exec:go_default_library", "//vendor:github.com/golang/glog", "//vendor:golang.org/x/net/context", "//vendor:google.golang.org/grpc", diff --git a/pkg/kubelet/remote/remote_runtime.go b/pkg/kubelet/remote/remote_runtime.go index e9ec276702b..c87c1284516 100644 --- a/pkg/kubelet/remote/remote_runtime.go +++ b/pkg/kubelet/remote/remote_runtime.go @@ -17,14 +17,15 @@ limitations under the License. package remote import ( - "io" + "fmt" + "strings" "time" - "fmt" "github.com/golang/glog" "google.golang.org/grpc" internalApi "k8s.io/kubernetes/pkg/kubelet/api" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + utilexec "k8s.io/kubernetes/pkg/util/exec" ) // RemoteRuntimeService is a gRPC implementation of internalApi.RuntimeService. @@ -247,10 +248,75 @@ func (r *RemoteRuntimeService) ContainerStatus(containerID string) (*runtimeApi. return resp.Status, nil } -// Exec executes a command in the container. -// TODO: support terminal resizing for exec, refer https://github.com/kubernetes/kubernetes/issues/29579. -func (r *RemoteRuntimeService) Exec(containerID string, cmd []string, tty bool, stdin io.Reader, stdout, stderr io.WriteCloser) error { - return fmt.Errorf("Not implemented") +// ExecSync executes a command in the container, and returns the stdout output. +// If command exits with a non-zero exit code, an error is returned. +func (r *RemoteRuntimeService) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + timeoutSeconds := int64(timeout.Seconds()) + req := &runtimeApi.ExecSyncRequest{ + ContainerId: &containerID, + Cmd: cmd, + Timeout: &timeoutSeconds, + } + resp, err := r.runtimeClient.ExecSync(ctx, req) + if err != nil { + glog.Errorf("ExecSync %s '%s' from runtime service failed: %v", containerID, strings.Join(cmd, " "), err) + return nil, nil, err + } + + err = nil + if resp.GetExitCode() != 0 { + err = utilexec.CodeExitError{ + Err: fmt.Errorf("command '%s' exited with %d: %s", strings.Join(cmd, " "), resp.GetExitCode(), resp.GetStderr()), + Code: int(resp.GetExitCode()), + } + } + + return resp.GetStdout(), resp.GetStderr(), err +} + +// Exec prepares a streaming endpoint to execute a command in the container, and returns the address. +func (r *RemoteRuntimeService) Exec(req *runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + resp, err := r.runtimeClient.Exec(ctx, req) + if err != nil { + glog.Errorf("Exec %s '%s' from runtime service failed: %v", req.GetContainerId(), strings.Join(req.GetCmd(), " "), err) + return nil, err + } + + return resp, nil +} + +// Attach prepares a streaming endpoint to attach to a running container, and returns the address. +func (r *RemoteRuntimeService) Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + resp, err := r.runtimeClient.Attach(ctx, req) + if err != nil { + glog.Errorf("Attach %s from runtime service failed: %v", req.GetContainerId(), err) + return nil, err + } + + return resp, nil +} + +// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. +func (r *RemoteRuntimeService) PortForward(req *runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) { + ctx, cancel := getContextWithTimeout(r.timeout) + defer cancel() + + resp, err := r.runtimeClient.PortForward(ctx, req) + if err != nil { + glog.Errorf("PortForward %s from runtime service failed: %v", req.GetPodSandboxId(), err) + return nil, err + } + + return resp, nil } func (r *RemoteRuntimeService) UpdateRuntimeConfig(runtimeConfig *runtimeApi.RuntimeConfig) error { diff --git a/pkg/kubelet/rktshim/BUILD b/pkg/kubelet/rktshim/BUILD index d26c82c88fd..2db9d0409d7 100644 --- a/pkg/kubelet/rktshim/BUILD +++ b/pkg/kubelet/rktshim/BUILD @@ -23,6 +23,7 @@ go_library( deps = [ "//pkg/kubelet/api:go_default_library", "//pkg/kubelet/api/v1alpha1/runtime:go_default_library", + "//pkg/kubelet/util/ioutils:go_default_library", ], ) diff --git a/pkg/kubelet/rktshim/app-interface.go b/pkg/kubelet/rktshim/app-interface.go index cb45a7ba731..f3e60d5110e 100644 --- a/pkg/kubelet/rktshim/app-interface.go +++ b/pkg/kubelet/rktshim/app-interface.go @@ -17,7 +17,7 @@ limitations under the License. package rktshim import ( - "io" + "time" kubeletApi "k8s.io/kubernetes/pkg/kubelet/api" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" @@ -65,7 +65,18 @@ func (*Runtime) ContainerStatus(string) (*runtimeApi.ContainerStatus, error) { panic("not implemented") } -// Exec executes a command inside an app running inside a pod sanbox. -func (*Runtime) Exec(string, []string, bool, io.Reader, io.WriteCloser, io.WriteCloser) error { +// ExecSync executes a command in the container, and returns the stdout output. +// If command exits with a non-zero exit code, an error is returned. +func (*Runtime) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) { + panic("not implemented") +} + +// Exec prepares a streaming endpoint to execute a command in the container, and returns the address. +func (*Runtime) Exec(*runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) { + panic("not implemented") +} + +// Attach prepares a streaming endpoint to attach to a running container, and returns the address. +func (*Runtime) Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) { panic("not implemented") } diff --git a/pkg/kubelet/rktshim/fake-app-interface.go b/pkg/kubelet/rktshim/fake-app-interface.go index 23ffefd3f86..433f52e9c8f 100644 --- a/pkg/kubelet/rktshim/fake-app-interface.go +++ b/pkg/kubelet/rktshim/fake-app-interface.go @@ -17,6 +17,7 @@ limitations under the License. package rktshim import ( + "bytes" "errors" "io" "math/rand" @@ -24,6 +25,12 @@ import ( kubeletApi "k8s.io/kubernetes/pkg/kubelet/api" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + "k8s.io/kubernetes/pkg/kubelet/util/ioutils" +) + +const ( + FakeStreamingHost = "localhost" + FakeStreamingPort = "12345" ) func init() { @@ -200,16 +207,34 @@ func (r *FakeRuntime) ContainerStatus(id string) (*runtimeApi.ContainerStatus, e return &c.Status, nil } -func (r *FakeRuntime) Exec(id string, cmd []string, tty bool, in io.Reader, out, err io.WriteCloser) error { - c, ok := r.Containers[id] +func (r *FakeRuntime) ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) { + c, ok := r.Containers[containerID] if !ok { - return ErrContainerNotFound + return nil, nil, ErrContainerNotFound } // TODO(tmrts): Validate the assumption that container has to be running for exec to work. if c.State != runtimeApi.ContainerState_RUNNING { - return ErrInvalidContainerStateTransition + return nil, nil, ErrInvalidContainerStateTransition } - return c.Exec(cmd, in, out, err) + var stdoutBuffer, stderrBuffer bytes.Buffer + err = c.Exec(cmd, nil, + ioutils.WriteCloserWrapper(&stdoutBuffer), + ioutils.WriteCloserWrapper(&stderrBuffer)) + return stdoutBuffer.Bytes(), stderrBuffer.Bytes(), err +} + +func (r *FakeRuntime) Exec(req *runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) { + url := "http://" + FakeStreamingHost + ":" + FakeStreamingPort + "/exec/" + req.GetContainerId() + return &runtimeApi.ExecResponse{ + Url: &url, + }, nil +} + +func (r *FakeRuntime) Attach(req *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) { + url := "http://" + FakeStreamingHost + ":" + FakeStreamingPort + "/attach/" + req.GetContainerId() + return &runtimeApi.AttachResponse{ + Url: &url, + }, nil } diff --git a/pkg/kubelet/rktshim/pod-level-interface.go b/pkg/kubelet/rktshim/pod-level-interface.go index 48433604a12..4d4931caaec 100644 --- a/pkg/kubelet/rktshim/pod-level-interface.go +++ b/pkg/kubelet/rktshim/pod-level-interface.go @@ -57,3 +57,8 @@ func (*PodSandboxManager) PodSandboxStatus(string) (*runtimeApi.PodSandboxStatus func (*PodSandboxManager) ListPodSandbox(*runtimeApi.PodSandboxFilter) ([]*runtimeApi.PodSandbox, error) { panic("not implemented") } + +// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. +func (*PodSandboxManager) PortForward(*runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) { + panic("not implemented") +} diff --git a/pkg/kubelet/server/streaming/BUILD b/pkg/kubelet/server/streaming/BUILD index a5a8469926a..5e818a6a397 100644 --- a/pkg/kubelet/server/streaming/BUILD +++ b/pkg/kubelet/server/streaming/BUILD @@ -12,7 +12,10 @@ load( go_library( name = "go_default_library", - srcs = ["server.go"], + srcs = [ + "errors.go", + "server.go", + ], tags = ["automanaged"], deps = [ "//pkg/kubelet/api/v1alpha1/runtime:go_default_library", @@ -21,6 +24,7 @@ go_library( "//pkg/types:go_default_library", "//pkg/util/term:go_default_library", "//vendor:github.com/emicklei/go-restful", + "//vendor:google.golang.org/grpc/codes", "//vendor:k8s.io/client-go/pkg/api", ], ) diff --git a/pkg/kubelet/server/streaming/errors.go b/pkg/kubelet/server/streaming/errors.go new file mode 100644 index 00000000000..1fdd8dfab0b --- /dev/null +++ b/pkg/kubelet/server/streaming/errors.go @@ -0,0 +1,47 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package streaming + +import ( + "fmt" + "time" + + "google.golang.org/grpc/codes" +) + +type ResponseError struct { + Err string + Code codes.Code +} + +func (e *ResponseError) Error() string { + return e.Err +} + +func ErrorStreamingDisabled(method string) error { + return &ResponseError{ + Err: fmt.Sprintf("streaming method %s disabled", method), + Code: codes.NotFound, + } +} + +func ErrorTimeout(op string, timeout time.Duration) error { + return &ResponseError{ + Err: fmt.Sprintf("%s timed out after %s", op, timeout.String()), + Code: codes.DeadlineExceeded, + } +}