Add windows portforward support

Signed-off-by: Angela Li <yanhuil@google.com>
This commit is contained in:
Angela Li 2019-09-23 17:36:43 -07:00
parent b431316edd
commit dc413bd6d6
8 changed files with 124 additions and 54 deletions

View File

@ -17,7 +17,7 @@
ROOT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"/..
# Not from vendor.conf.
CRITOOL_VERSION=84b8540c69e82671bfc244a0b0fe28a6a98381ce
CRITOOL_VERSION=ad1e8a4f1e0e44b22c1c371b9dc4db92eba6f157
CRITOOL_PKG=github.com/kubernetes-sigs/cri-tools
CRITOOL_REPO=github.com/kubernetes-sigs/cri-tools

View File

@ -54,7 +54,7 @@ func DefaultConfig() PluginConfig {
TLSKeyFile: "",
TLSCertFile: "",
},
SandboxImage: "mcr.microsoft.com/k8s/core/pause:1.0.0",
SandboxImage: "mcr.microsoft.com/k8s/core/pause:1.2.0",
StatsCollectPeriod: 10,
MaxContainerLogLineSize: 16 * 1024,
Registry: Registry{

View File

@ -70,38 +70,13 @@ type execOptions struct {
timeout time.Duration
}
// execInContainer executes a command inside the container synchronously, and
// redirects stdio stream properly.
// This function only returns when the exec process exits, this means that:
// 1) As long as the exec process is running, the goroutine in the cri plugin
// will be running and wait for the exit code;
// 2) `kubectl exec -it` will hang until the exec process exits, even after io
// is detached. This is different from dockershim, which leaves the exec process
// running in background after io is detached.
// https://github.com/kubernetes/kubernetes/blob/v1.15.0/pkg/kubelet/dockershim/exec.go#L127
// For example, if the `kubectl exec -it` process is killed, IO will be closed. In
// this case, the CRI plugin will still have a goroutine waiting for the exec process
// to exit and log the exit code, but dockershim won't.
func (c *criService) execInContainer(ctx context.Context, id string, opts execOptions) (*uint32, error) {
func (c *criService) execInternal(ctx context.Context, container containerd.Container, id string, opts execOptions) (*uint32, error) {
// Cancel the context before returning to ensure goroutines are stopped.
// This is important, because if `Start` returns error, `Wait` will hang
// forever unless we cancel the context.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Get container from our container store.
cntr, err := c.containerStore.Get(id)
if err != nil {
return nil, errors.Wrapf(err, "failed to find container %q in store", id)
}
id = cntr.ID
state := cntr.Status.Get().State()
if state != runtime.ContainerState_CONTAINER_RUNNING {
return nil, errors.Errorf("container is in %s state", criContainerStateToString(state))
}
container := cntr.Container
spec, err := container.Spec(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to get container spec")
@ -205,3 +180,32 @@ func (c *criService) execInContainer(ctx context.Context, id string, opts execOp
return &code, nil
}
}
// execInContainer executes a command inside the container synchronously, and
// redirects stdio stream properly.
// This function only returns when the exec process exits, this means that:
// 1) As long as the exec process is running, the goroutine in the cri plugin
// will be running and wait for the exit code;
// 2) `kubectl exec -it` will hang until the exec process exits, even after io
// is detached. This is different from dockershim, which leaves the exec process
// running in background after io is detached.
// https://github.com/kubernetes/kubernetes/blob/v1.15.0/pkg/kubelet/dockershim/exec.go#L127
// For example, if the `kubectl exec -it` process is killed, IO will be closed. In
// this case, the CRI plugin will still have a goroutine waiting for the exec process
// to exit and log the exit code, but dockershim won't.
func (c *criService) execInContainer(ctx context.Context, id string, opts execOptions) (*uint32, error) {
// Get container from our container store.
cntr, err := c.containerStore.Get(id)
if err != nil {
return nil, errors.Wrapf(err, "failed to find container %q in store", id)
}
id = cntr.ID
state := cntr.Status.Get().State()
if state != runtime.ContainerState_CONTAINER_RUNNING {
return nil, errors.Errorf("container is in %s state", criContainerStateToString(state))
}
return c.execInternal(ctx, cntr.Container, id, opts)
}

View File

@ -0,0 +1,38 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"github.com/pkg/errors"
"golang.org/x/net/context"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
)
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
func (c *criService) PortForward(ctx context.Context, r *runtime.PortForwardRequest) (retRes *runtime.PortForwardResponse, retErr error) {
sandbox, err := c.sandboxStore.Get(r.GetPodSandboxId())
if err != nil {
return nil, errors.Wrapf(err, "failed to find sandbox %q", r.GetPodSandboxId())
}
if sandbox.Status.Get().State != sandboxstore.StateReady {
return nil, errors.New("sandbox container is not running")
}
// TODO(random-liu): Verify that ports are exposed.
return c.streamServer.GetPortForward(r)
}

View File

@ -31,23 +31,8 @@ import (
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
)
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
func (c *criService) PortForward(ctx context.Context, r *runtime.PortForwardRequest) (retRes *runtime.PortForwardResponse, retErr error) {
sandbox, err := c.sandboxStore.Get(r.GetPodSandboxId())
if err != nil {
return nil, errors.Wrapf(err, "failed to find sandbox %q", r.GetPodSandboxId())
}
if sandbox.Status.Get().State != sandboxstore.StateReady {
return nil, errors.New("sandbox container is not running")
}
// TODO(random-liu): Verify that ports are exposed.
return c.streamServer.GetPortForward(r)
}
// portForward requires `socat` on the node. It uses netns to enter the sandbox namespace,
// and run `socat` inside the namespace to forward stream for a specific port. The `socat`
// command keeps running until it exits or client disconnect.

View File

@ -19,19 +19,62 @@ limitations under the License.
package server
import (
"bytes"
"fmt"
"io"
"github.com/containerd/containerd/errdefs"
"github.com/pkg/errors"
"golang.org/x/net/context"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/utils/exec"
"github.com/containerd/cri/pkg/ioutil"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
)
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
// TODO(windows): Implement this for windows.
func (c *criService) PortForward(ctx context.Context, r *runtime.PortForwardRequest) (*runtime.PortForwardResponse, error) {
return nil, errdefs.ErrNotImplemented
func (c *criService) portForward(ctx context.Context, id string, port int32, stream io.ReadWriter) error {
stdout := ioutil.NewNopWriteCloser(stream)
stderrBuffer := new(bytes.Buffer)
stderr := ioutil.NewNopWriteCloser(stderrBuffer)
// localhost is resolved to 127.0.0.1 in ipv4, and ::1 in ipv6.
// Explicitly using ipv4 IP address in here to avoid flakiness.
cmd := []string{"wincat.exe", "127.0.0.1", fmt.Sprint(port)}
err := c.execInSandbox(ctx, id, cmd, stream, stdout, stderr)
if err != nil {
return errors.Wrapf(err, "failed to execute port forward in sandbox: %s", stderrBuffer.String())
}
return nil
}
func (c *criService) portForward(ctx context.Context, id string, port int32, stream io.ReadWriter) error {
return errdefs.ErrNotImplemented
func (c *criService) execInSandbox(ctx context.Context, sandboxID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser) error {
// Get sandbox from our sandbox store.
sb, err := c.sandboxStore.Get(sandboxID)
if err != nil {
return errors.Wrapf(err, "failed to find sandbox %q in store", sandboxID)
}
// Check the sandbox state
state := sb.Status.Get().State
if state != sandboxstore.StateReady {
return errors.Errorf("sandbox is in %s state", fmt.Sprint(state))
}
opts := execOptions{
cmd: cmd,
stdin: stdin,
stdout: stdout,
stderr: stderr,
tty: false,
resize: nil,
}
exitCode, err := c.execInternal(ctx, sb.Container, sandboxID, opts)
if err != nil {
return errors.Wrap(err, "failed to exec in sandbox")
}
if *exitCode == 0 {
return nil
}
return &exec.CodeExitError{
Err: errors.Errorf("error executing command %v, exit code %d", cmd, *exitCode),
Code: int(*exitCode),
}
}

View File

@ -25,7 +25,6 @@ import (
"time"
"github.com/pkg/errors"
"golang.org/x/net/context"
k8snet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/remotecommand"
@ -156,7 +155,8 @@ func (s *streamRuntime) PortForward(podSandboxID string, port int32, stream io.R
if port <= 0 || port > math.MaxUint16 {
return errors.Errorf("invalid port %d", port)
}
return s.c.portForward(context.Background(), podSandboxID, port, stream)
ctx := ctrdutil.NamespacedContext()
return s.c.portForward(ctx, podSandboxID, port, stream)
}
// handleResizing spawns a goroutine that processes the resize channel, calling resizeFunc for each

View File

@ -21,7 +21,7 @@ set -o pipefail
export PATH="/c/Program Files/Containerd:$PATH"
REPO_TAR="${REPO_TAR:-"/c/cri.tar.gz"}"
FOCUS="${FOCUS:-"Conformance"}"
SKIP="${SKIP:-"portforward"}"
SKIP="${SKIP:-""}"
REPORT_DIR="${REPORT_DIR:-"/c/_artifacts"}"
repo="$GOPATH/src/github.com/containerd/cri"