Add return code support to kubectl-exec and -run

This commit is contained in:
bindata-mockuser 2016-08-08 20:25:10 +02:00 committed by Dr. Stefan Schimanski
parent 6dcb0c9130
commit e792d4117d
19 changed files with 666 additions and 83 deletions

View File

@ -0,0 +1,55 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotecommand
import (
"fmt"
"io"
"io/ioutil"
"k8s.io/kubernetes/pkg/util/runtime"
)
// errorStreamDecoder interprets the data on the error channel and creates a go error object from it.
type errorStreamDecoder interface {
decode(message []byte) error
}
// watchErrorStream watches the errorStream for remote command error data,
// decodes it with the given errorStreamDecoder, sends the decoded error (or nil if the remote
// command exited successfully) to the returned error channel, and closes it.
// This function returns immediately.
func watchErrorStream(errorStream io.Reader, d errorStreamDecoder) chan error {
errorChan := make(chan error)
go func() {
defer runtime.HandleCrash()
message, err := ioutil.ReadAll(errorStream)
switch {
case err != nil && err != io.EOF:
errorChan <- fmt.Errorf("error reading from error stream: %s", err)
case len(message) > 0:
errorChan <- d.decode(message)
default:
errorChan <- nil
}
close(errorChan)
}()
return errorChan
}

View File

@ -162,6 +162,8 @@ func (e *streamExecutor) Stream(options StreamOptions) error {
var streamer streamProtocolHandler
switch protocol {
case remotecommand.StreamProtocolV4Name:
streamer = newStreamProtocolV4(options)
case remotecommand.StreamProtocolV3Name:
streamer = newStreamProtocolV3(options)
case remotecommand.StreamProtocolV2Name:

View File

@ -88,27 +88,6 @@ func (p *streamProtocolV2) createStreams(conn streamCreator) error {
return nil
}
func (p *streamProtocolV2) setupErrorStreamReading() chan error {
errorChan := make(chan error)
go func() {
defer runtime.HandleCrash()
message, err := ioutil.ReadAll(p.errorStream)
switch {
case err != nil && err != io.EOF:
errorChan <- fmt.Errorf("error reading from error stream: %s", err)
case len(message) > 0:
errorChan <- fmt.Errorf("error executing remote command: %s", message)
default:
errorChan <- nil
}
close(errorChan)
}()
return errorChan
}
func (p *streamProtocolV2) copyStdin() {
if p.Stdin != nil {
var once sync.Once
@ -193,7 +172,7 @@ func (p *streamProtocolV2) stream(conn streamCreator) error {
// now that all the streams have been created, proceed with reading & copying
errorChan := p.setupErrorStreamReading()
errorChan := watchErrorStream(p.errorStream, &errorDecoderV2{})
p.copyStdin()
@ -207,3 +186,10 @@ func (p *streamProtocolV2) stream(conn streamCreator) error {
// waits for errorStream to finish reading with an error or nil
return <-errorChan
}
// errorDecoderV2 interprets the error channel data as plain text.
type errorDecoderV2 struct{}
func (d *errorDecoderV2) decode(message []byte) error {
return fmt.Errorf("error executing remote command: %s", message)
}

View File

@ -199,7 +199,7 @@ func TestV2ErrorStreamReading(t *testing.T) {
h := newStreamProtocolV2(StreamOptions{}).(*streamProtocolV2)
h.errorStream = test.stream
ch := h.setupErrorStreamReading()
ch := watchErrorStream(h.errorStream, &errorDecoderV2{})
if ch == nil {
t.Fatalf("%s: unexpected nil channel", test.name)
}

View File

@ -90,7 +90,7 @@ func (p *streamProtocolV3) stream(conn streamCreator) error {
// now that all the streams have been created, proceed with reading & copying
errorChan := p.setupErrorStreamReading()
errorChan := watchErrorStream(p.errorStream, &errorDecoderV3{})
p.handleResizes()
@ -106,3 +106,7 @@ func (p *streamProtocolV3) stream(conn streamCreator) error {
// waits for errorStream to finish reading with an error or nil
return <-errorChan
}
type errorDecoderV3 struct {
errorDecoderV2
}

View File

@ -0,0 +1,119 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotecommand
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"sync"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/util/exec"
)
// streamProtocolV4 implements version 4 of the streaming protocol for attach
// and exec. This version adds support for exit codes on the error stream through
// the use of unversioned.Status instead of plain text messages.
type streamProtocolV4 struct {
*streamProtocolV3
}
var _ streamProtocolHandler = &streamProtocolV4{}
func newStreamProtocolV4(options StreamOptions) streamProtocolHandler {
return &streamProtocolV4{
streamProtocolV3: newStreamProtocolV3(options).(*streamProtocolV3),
}
}
func (p *streamProtocolV4) createStreams(conn streamCreator) error {
return p.streamProtocolV3.createStreams(conn)
}
func (p *streamProtocolV4) handleResizes() {
p.streamProtocolV3.handleResizes()
}
func (p *streamProtocolV4) stream(conn streamCreator) error {
if err := p.createStreams(conn); err != nil {
return err
}
// now that all the streams have been created, proceed with reading & copying
errorChan := watchErrorStream(p.errorStream, &errorDecoderV4{})
p.handleResizes()
p.copyStdin()
var wg sync.WaitGroup
p.copyStdout(&wg)
p.copyStderr(&wg)
// we're waiting for stdout/stderr to finish copying
wg.Wait()
// waits for errorStream to finish reading with an error or nil
return <-errorChan
}
// errorDecoderV4 interprets the json-marshaled unversioned.Status on the error channel
// and creates an exec.ExitError from it.
type errorDecoderV4 struct{}
func (d *errorDecoderV4) decode(message []byte) error {
status := unversioned.Status{}
err := json.Unmarshal(message, &status)
if err != nil {
return fmt.Errorf("error stream protocol error: %v in %q", err, string(message))
}
switch status.Status {
case unversioned.StatusSuccess:
return nil
case unversioned.StatusFailure:
if status.Reason == remotecommand.NonZeroExitCodeReason {
if status.Details == nil {
return errors.New("error stream protocol error: details must be set")
}
for i := range status.Details.Causes {
c := &status.Details.Causes[i]
if c.Type != remotecommand.ExitCodeCauseType {
continue
}
rc, err := strconv.ParseUint(c.Message, 10, 8)
if err != nil {
return fmt.Errorf("error stream protocol error: invalid exit code value %q", c.Message)
}
return exec.CodeExitError{
Err: fmt.Errorf("command terminated with exit code %d", rc),
Code: int(rc),
}
}
return fmt.Errorf("error stream protocol error: no %s cause given", remotecommand.ExitCodeCauseType)
}
default:
return errors.New("error stream protocol error: unknown error")
}
return fmt.Errorf(status.Message)
}

View File

@ -0,0 +1,71 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remotecommand
import (
"fmt"
"testing"
)
func TestV4ErrorDecoder(t *testing.T) {
dec := errorDecoderV4{}
type Test struct {
message string
err string
}
for _, test := range []Test{
{
message: "{}",
err: "error stream protocol error: unknown error",
},
{
message: "{",
err: "error stream protocol error: unexpected end of JSON input in \"{\"",
},
{
message: `{"status": "Success" }`,
err: "",
},
{
message: `{"status": "Failure", "message": "foobar" }`,
err: "foobar",
},
{
message: `{"status": "Failure", "message": "foobar", "reason": "NonZeroExitCode", "details": {"causes": [{"reason": "foo"}] } }`,
err: "error stream protocol error: no ExitCode cause given",
},
{
message: `{"status": "Failure", "message": "foobar", "reason": "NonZeroExitCode", "details": {"causes": [{"reason": "ExitCode"}] } }`,
err: "error stream protocol error: invalid exit code value \"\"",
},
{
message: `{"status": "Failure", "message": "foobar", "reason": "NonZeroExitCode", "details": {"causes": [{"reason": "ExitCode", "message": "42"}] } }`,
err: "command terminated with exit code 42",
},
} {
err := dec.decode([]byte(test.message))
want := test.err
if want == "" {
want = "<nil>"
}
if got := fmt.Sprintf("%v", err); got != want {
t.Errorf("wrong error for message %q: want=%q, got=%q", test.message, want, got)
}
}
}

View File

@ -37,6 +37,8 @@ import (
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/runtime"
uexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/watch"
)
var (
@ -114,7 +116,7 @@ func addRunFlags(cmd *cobra.Command) {
cmd.Flags().StringP("labels", "l", "", "Labels to apply to the pod(s).")
cmd.Flags().BoolP("stdin", "i", false, "Keep stdin open on the container(s) in the pod, even if nothing is attached.")
cmd.Flags().BoolP("tty", "t", false, "Allocated a TTY for each container in the pod.")
cmd.Flags().Bool("attach", false, "If true, wait for the Pod to start running, and then attach to the Pod as if 'kubectl attach ...' were called. Default false, unless '-i/--stdin' is set, in which case the default is true.")
cmd.Flags().Bool("attach", false, "If true, wait for the Pod to start running, and then attach to the Pod as if 'kubectl attach ...' were called. Default false, unless '-i/--stdin' is set, in which case the default is true. With '--restart=Never' the exit code of the container process is returned.")
cmd.Flags().Bool("leave-stdin-open", false, "If the pod is started in interactive mode or with stdin, leave stdin open after the first attach completes. By default, stdin will be closed after the first attach completes.")
cmd.Flags().String("restart", "Always", "The restart policy for this Pod. Legal values [Always, OnFailure, Never]. If set to 'Always' a deployment is created for this pod, if set to 'OnFailure', a job is created for this pod, if set to 'Never', a regular pod is created. For the latter two --replicas must be 1. Default 'Always'")
cmd.Flags().Bool("command", false, "If true and extra arguments are present, use them as the 'command' field in the container, rather than the 'args' field which is the default.")
@ -128,7 +130,6 @@ func addRunFlags(cmd *cobra.Command) {
}
func Run(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer, cmd *cobra.Command, args []string, argsLenAtDash int) error {
quiet := cmdutil.GetFlagBool(cmd, "quiet")
if len(os.Args) > 1 && os.Args[1] == "run-container" {
printDeprecationWarning("run", "run-container")
}
@ -243,6 +244,7 @@ func Run(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer, cmd *cob
}
if attach {
quiet := cmdutil.GetFlagBool(cmd, "quiet")
opts := &AttachOptions{
StreamOptions: StreamOptions{
In: cmdIn,
@ -273,11 +275,21 @@ func Run(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer, cmd *cob
if err != nil {
return err
}
err = handleAttachPod(f, client, attachablePod, opts, quiet)
err = handleAttachPod(f, client, attachablePod.Namespace, attachablePod.Name, opts, quiet)
if err != nil {
return err
}
var pod *api.Pod
leaveStdinOpen := cmdutil.GetFlagBool(cmd, "leave-stdin-open")
waitForExitCode := !leaveStdinOpen && restartPolicy == api.RestartPolicyNever
if waitForExitCode {
pod, err = waitForPodTerminated(client, attachablePod.Namespace, attachablePod.Name, opts.Out, quiet)
if err != nil {
return err
}
}
if remove {
namespace, err = mapping.MetadataAccessor.Namespace(obj)
if err != nil {
@ -295,9 +307,37 @@ func Run(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer, cmd *cob
ResourceNames(mapping.Resource, name).
Flatten().
Do()
return ReapResult(r, f, cmdOut, true, true, 0, -1, false, mapper, quiet)
err = ReapResult(r, f, cmdOut, true, true, 0, -1, false, mapper, quiet)
if err != nil {
return err
}
}
// after removal is done, return successfully if we are not interested in the exit code
if !waitForExitCode {
return nil
}
switch pod.Status.Phase {
case api.PodSucceeded:
return nil
case api.PodFailed:
unknownRcErr := fmt.Errorf("pod %s/%s failed with unknown exit code", pod.Namespace, pod.Name)
if len(pod.Status.ContainerStatuses) == 0 || pod.Status.ContainerStatuses[0].State.Terminated == nil {
return unknownRcErr
}
// assume here that we have at most one status because kubectl-run only creates one container per pod
rc := pod.Status.ContainerStatuses[0].State.Terminated.ExitCode
if rc == 0 {
return unknownRcErr
}
return uexec.CodeExitError{
Err: fmt.Errorf("pod %s/%s terminated", pod.Namespace, pod.Name),
Code: int(rc),
}
default:
return fmt.Errorf("pod %s/%s left in phase %s", pod.Namespace, pod.Name, pod.Status.Phase)
}
return nil
}
outputFormat := cmdutil.GetFlagString(cmd, "output")
@ -325,37 +365,91 @@ func contains(resourcesList map[string]*unversioned.APIResourceList, resource un
return false
}
func waitForPodRunning(c *client.Client, pod *api.Pod, out io.Writer, quiet bool) (status api.PodPhase, err error) {
for {
pod, err := c.Pods(pod.Namespace).Get(pod.Name)
if err != nil {
return api.PodUnknown, err
}
ready := false
if pod.Status.Phase == api.PodRunning {
ready = true
for _, status := range pod.Status.ContainerStatuses {
if !status.Ready {
ready = false
break
}
}
if ready {
return api.PodRunning, nil
}
}
if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed {
return pod.Status.Phase, nil
}
if !quiet {
fmt.Fprintf(out, "Waiting for pod %s/%s to be running, status is %s, pod ready: %v\n", pod.Namespace, pod.Name, pod.Status.Phase, ready)
}
time.Sleep(2 * time.Second)
// waitForPod watches the given pod until the exitCondition is true. Each two seconds
// the tick function is called e.g. for progress output.
func waitForPod(c *client.Client, ns, name string, exitCondition func(*api.Pod) bool, tick func(*api.Pod)) (*api.Pod, error) {
pod, err := c.Pods(ns).Get(name)
if err != nil {
return nil, err
}
if exitCondition(pod) {
return pod, nil
}
tick(pod)
w, err := c.Pods(ns).Watch(api.SingleObject(api.ObjectMeta{Name: pod.Name, ResourceVersion: pod.ResourceVersion}))
if err != nil {
return nil, err
}
t := time.NewTicker(2 * time.Second)
defer t.Stop()
go func() {
for range t.C {
tick(pod)
}
}()
err = nil
result := pod
kubectl.WatchLoop(w, func(ev watch.Event) error {
switch ev.Type {
case watch.Added, watch.Modified:
pod = ev.Object.(*api.Pod)
if exitCondition(pod) {
result = pod
w.Stop()
}
case watch.Deleted:
w.Stop()
case watch.Error:
result = nil
err = fmt.Errorf("failed to watch pod %s/%s", ns, name)
w.Stop()
}
return nil
})
return result, err
}
func handleAttachPod(f *cmdutil.Factory, c *client.Client, pod *api.Pod, opts *AttachOptions, quiet bool) error {
status, err := waitForPodRunning(c, pod, opts.Out, quiet)
func waitForPodRunning(c *client.Client, ns, name string, out io.Writer, quiet bool) (*api.Pod, error) {
exitCondition := func(pod *api.Pod) bool {
switch pod.Status.Phase {
case api.PodRunning:
for _, status := range pod.Status.ContainerStatuses {
if !status.Ready {
return false
}
}
return true
case api.PodSucceeded, api.PodFailed:
return true
default:
return false
}
}
return waitForPod(c, ns, name, exitCondition, func(pod *api.Pod) {
if !quiet {
fmt.Fprintf(out, "Waiting for pod %s/%s to be running, status is %s, pod ready: false\n", pod.Namespace, pod.Name, pod.Status.Phase)
}
})
}
func waitForPodTerminated(c *client.Client, ns, name string, out io.Writer, quiet bool) (*api.Pod, error) {
exitCondition := func(pod *api.Pod) bool {
return pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed
}
return waitForPod(c, ns, name, exitCondition, func(pod *api.Pod) {
if !quiet {
fmt.Fprintf(out, "Waiting for pod %s/%s to terminate, status is %s\n", pod.Namespace, pod.Name, pod.Status.Phase)
}
})
}
func handleAttachPod(f *cmdutil.Factory, c *client.Client, ns, name string, opts *AttachOptions, quiet bool) error {
pod, err := waitForPodRunning(c, ns, name, opts.Out, quiet)
if err != nil {
return err
}
@ -363,7 +457,7 @@ func handleAttachPod(f *cmdutil.Factory, c *client.Client, pod *api.Pod, opts *A
if err != nil {
return err
}
if status == api.PodSucceeded || status == api.PodFailed {
if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed {
req, err := f.LogsForObject(pod, &api.PodLogOptions{Container: ctrName})
if err != nil {
return err
@ -377,8 +471,8 @@ func handleAttachPod(f *cmdutil.Factory, c *client.Client, pod *api.Pod, opts *A
return err
}
opts.Client = c
opts.PodName = pod.Name
opts.Namespace = pod.Namespace
opts.PodName = name
opts.Namespace = ns
if err := opts.Run(); err != nil {
fmt.Fprintf(opts.Out, "Error attaching, falling back to logs: %v\n", err)
req, err := f.LogsForObject(pod, &api.PodLogOptions{Container: ctrName})

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/runtime"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
utilexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/strategicpatch"
@ -150,6 +151,9 @@ func checkErr(prefix string, err error, handleErr func(string, int)) {
}
case utilerrors.Aggregate:
handleErr(MultipleErrors(prefix, err.Errors()), DefaultErrorExitCode)
case utilexec.ExitError:
// do not print anything, only terminate with given error
handleErr("", err.ExitStatus())
default: // for any other error type
msg, ok := StandardErrorMessage(err)
if !ok {

View File

@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/runtime"
uexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/validation/field"
)
@ -269,6 +270,16 @@ func TestCheckNoResourceMatchError(t *testing.T) {
})
}
func TestCheckExitError(t *testing.T) {
testCheckError(t, []checkErrTestCase{
{
uexec.CodeExitError{Err: fmt.Errorf("pod foo/bar terminated"), Code: 42},
"",
42,
},
})
}
func testCheckError(t *testing.T, tests []checkErrTestCase) {
var errReturned string
var codeReturned int

View File

@ -26,6 +26,7 @@ import (
dockertypes "github.com/docker/engine-api/types"
"github.com/golang/glog"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
utilexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/term"
)
@ -74,7 +75,7 @@ func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *do
go io.Copy(stdout, p)
}
return command.Wait()
err = command.Wait()
} else {
if stdin != nil {
// Use an os.Pipe here as it returns true *os.File objects.
@ -96,8 +97,13 @@ func (*NsenterExecHandler) ExecInContainer(client DockerInterface, container *do
command.Stderr = stderr
}
return command.Run()
err = command.Run()
}
if exitErr, ok := err.(*exec.ExitError); ok {
return &utilexec.ExitErrorWrapper{ExitError: exitErr}
}
return err
}
// NativeExecHandler executes commands in Docker containers using Docker's exec API.

View File

@ -17,12 +17,13 @@ limitations under the License.
package remotecommand
import (
"errors"
"fmt"
"io"
"net/http"
"time"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/term"
@ -47,8 +48,12 @@ func ServeAttach(w http.ResponseWriter, req *http.Request, attacher Attacher, po
err := attacher.AttachContainer(podName, uid, container, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan)
if err != nil {
msg := fmt.Sprintf("error attaching to container: %v", err)
runtime.HandleError(errors.New(msg))
fmt.Fprint(ctx.errorStream, msg)
err = fmt.Errorf("error attaching to container: %v", err)
runtime.HandleError(err)
ctx.writeStatus(apierrors.NewInternalError(err))
} else {
ctx.writeStatus(&apierrors.StatusError{ErrStatus: unversioned.Status{
Status: unversioned.StatusSuccess,
}})
}
}

View File

@ -36,6 +36,11 @@ const (
// attachment/execution. It is the third version of the subprotocol and
// adds support for resizing container terminals.
StreamProtocolV3Name = "v3.channel.k8s.io"
// The SPDY subprotocol "v4.channel.k8s.io" is used for remote command
// attachment/execution. It is the 4th version of the subprotocol and
// adds support for exit codes.
StreamProtocolV4Name = "v4.channel.k8s.io"
)
var SupportedStreamingProtocols = []string{StreamProtocolV3Name, StreamProtocolV2Name, StreamProtocolV1Name}
var SupportedStreamingProtocols = []string{StreamProtocolV4Name, StreamProtocolV3Name, StreamProtocolV2Name, StreamProtocolV1Name}

View File

@ -17,18 +17,25 @@ limitations under the License.
package remotecommand
import (
"errors"
"fmt"
"io"
"net/http"
"time"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/types"
utilexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/term"
)
const (
NonZeroExitCodeReason = unversioned.StatusReason("NonZeroExitCode")
ExitCodeCauseType = unversioned.CauseType("ExitCode")
)
// Executor knows how to execute a command in a container in a pod.
type Executor interface {
// ExecInContainer executes a command in a container in the pod, copying data
@ -51,8 +58,29 @@ func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podN
err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan)
if err != nil {
msg := fmt.Sprintf("error executing command in container: %v", err)
runtime.HandleError(errors.New(msg))
fmt.Fprint(ctx.errorStream, msg)
if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() {
rc := exitErr.ExitStatus()
ctx.writeStatus(&apierrors.StatusError{ErrStatus: unversioned.Status{
Status: unversioned.StatusFailure,
Reason: NonZeroExitCodeReason,
Details: &unversioned.StatusDetails{
Causes: []unversioned.StatusCause{
{
Type: ExitCodeCauseType,
Message: fmt.Sprintf("%d", rc),
},
},
},
Message: fmt.Sprintf("command terminated with non-zero exit code: %v", exitErr),
}})
} else {
err = fmt.Errorf("error executing command in container: %v", err)
runtime.HandleError(err)
ctx.writeStatus(apierrors.NewInternalError(err))
}
} else {
ctx.writeStatus(&apierrors.StatusError{ErrStatus: unversioned.Status{
Status: unversioned.StatusSuccess,
}})
}
}

View File

@ -25,6 +25,8 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/httpstream/spdy"
"k8s.io/kubernetes/pkg/util/runtime"
@ -88,7 +90,7 @@ type context struct {
stdinStream io.ReadCloser
stdoutStream io.WriteCloser
stderrStream io.WriteCloser
errorStream io.WriteCloser
writeStatus func(status *apierrors.StatusError) error
resizeStream io.ReadCloser
resizeChan chan term.Size
tty bool
@ -168,6 +170,8 @@ func createHttpStreamStreams(req *http.Request, w http.ResponseWriter, opts *opt
var handler protocolHandler
switch protocol {
case StreamProtocolV4Name:
handler = &v4ProtocolHandler{}
case StreamProtocolV3Name:
handler = &v3ProtocolHandler{}
case StreamProtocolV2Name:
@ -206,6 +210,59 @@ type protocolHandler interface {
supportsTerminalResizing() bool
}
// v4ProtocolHandler implements the V4 protocol version for streaming command execution. It only differs
// in from v3 in the error stream format using an json-marshaled unversioned.Status which carries
// the process' exit code.
type v4ProtocolHandler struct{}
func (*v4ProtocolHandler) waitForStreams(streams <-chan streamAndReply, expectedStreams int, expired <-chan time.Time) (*context, error) {
ctx := &context{}
receivedStreams := 0
replyChan := make(chan struct{})
stop := make(chan struct{})
defer close(stop)
WaitForStreams:
for {
select {
case stream := <-streams:
streamType := stream.Headers().Get(api.StreamType)
switch streamType {
case api.StreamTypeError:
ctx.writeStatus = v4WriteStatusFunc(stream) // write json errors
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdin:
ctx.stdinStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdout:
ctx.stdoutStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStderr:
ctx.stderrStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeResize:
ctx.resizeStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
default:
runtime.HandleError(fmt.Errorf("Unexpected stream type: %q", streamType))
}
case <-replyChan:
receivedStreams++
if receivedStreams == expectedStreams {
break WaitForStreams
}
case <-expired:
// TODO find a way to return the error to the user. Maybe use a separate
// stream to report errors?
return nil, errors.New("timed out waiting for client to create streams")
}
}
return ctx, nil
}
// supportsTerminalResizing returns true because v4ProtocolHandler supports it
func (*v4ProtocolHandler) supportsTerminalResizing() bool { return true }
// v3ProtocolHandler implements the V3 protocol version for streaming command execution.
type v3ProtocolHandler struct{}
@ -222,7 +279,7 @@ WaitForStreams:
streamType := stream.Headers().Get(api.StreamType)
switch streamType {
case api.StreamTypeError:
ctx.errorStream = stream
ctx.writeStatus = v1WriteStatusFunc(stream)
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdin:
ctx.stdinStream = stream
@ -273,7 +330,7 @@ WaitForStreams:
streamType := stream.Headers().Get(api.StreamType)
switch streamType {
case api.StreamTypeError:
ctx.errorStream = stream
ctx.writeStatus = v1WriteStatusFunc(stream)
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdin:
ctx.stdinStream = stream
@ -321,7 +378,7 @@ WaitForStreams:
streamType := stream.Headers().Get(api.StreamType)
switch streamType {
case api.StreamTypeError:
ctx.errorStream = stream
ctx.writeStatus = v1WriteStatusFunc(stream)
// This defer statement shouldn't be here, but due to previous refactoring, it ended up in
// here. This is what 1.0.x kubelets do, so we're retaining that behavior. This is fixed in
@ -375,3 +432,26 @@ func handleResizeEvents(stream io.Reader, channel chan<- term.Size) {
channel <- size
}
}
func v1WriteStatusFunc(stream io.WriteCloser) func(status *apierrors.StatusError) error {
return func(status *apierrors.StatusError) error {
if status.Status().Status == unversioned.StatusSuccess {
return nil // send error messages
}
_, err := stream.Write([]byte(status.Error()))
return err
}
}
// v4WriteStatusFunc returns a WriteStatusFunc that marshals a given api Status
// as json in the error channel.
func v4WriteStatusFunc(stream io.WriteCloser) func(status *apierrors.StatusError) error {
return func(status *apierrors.StatusError) error {
bs, err := json.Marshal(status.Status())
if err != nil {
return err
}
_, err = stream.Write(bs)
return err
}
}

View File

@ -32,6 +32,11 @@ const (
stderrChannel
errorChannel
resizeChannel
preV4BinaryWebsocketProtocol = wsstream.ChannelWebSocketProtocol
preV4Base64WebsocketProtocol = wsstream.Base64ChannelWebSocketProtocol
v4BinaryWebsocketProtocol = "v4." + wsstream.ChannelWebSocketProtocol
v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol
)
// createChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2)
@ -67,9 +72,30 @@ func writeChannel(real bool) wsstream.ChannelType {
// streams needed to perform an exec or an attach.
func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *options, idleTimeout time.Duration) (*context, bool) {
channels := createChannels(opts)
conn := wsstream.NewConn(channels...)
conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{
"": {
Binary: true,
Channels: channels,
},
preV4BinaryWebsocketProtocol: {
Binary: true,
Channels: channels,
},
preV4Base64WebsocketProtocol: {
Binary: false,
Channels: channels,
},
v4BinaryWebsocketProtocol: {
Binary: true,
Channels: channels,
},
v4Base64WebsocketProtocol: {
Binary: false,
Channels: channels,
},
})
conn.SetIdleTimeout(idleTimeout)
streams, err := conn.Open(httplog.Unlogged(w), req)
negotiatedProtocol, streams, err := conn.Open(httplog.Unlogged(w), req)
if err != nil {
runtime.HandleError(fmt.Errorf("Unable to upgrade websocket connection: %v", err))
return nil, false
@ -86,13 +112,21 @@ func createWebSocketStreams(req *http.Request, w http.ResponseWriter, opts *opti
streams[errorChannel].Write([]byte{})
}
return &context{
ctx := &context{
conn: conn,
stdinStream: streams[stdinChannel],
stdoutStream: streams[stdoutChannel],
stderrStream: streams[stderrChannel],
errorStream: streams[errorChannel],
tty: opts.tty,
resizeStream: streams[resizeChannel],
}, true
}
switch negotiatedProtocol {
case v4BinaryWebsocketProtocol, v4Base64WebsocketProtocol:
ctx.writeStatus = v4WriteStatusFunc(streams[errorChannel])
default:
ctx.writeStatus = v1WriteStatusFunc(streams[errorChannel])
}
return ctx, true
}

View File

@ -140,3 +140,28 @@ func (eew ExitErrorWrapper) ExitStatus() int {
}
return ws.ExitStatus()
}
// CodeExitError is an implementation of ExitError consisting of an error object
// and an exit code (the upper bits of os.exec.ExitStatus).
type CodeExitError struct {
Err error
Code int
}
var _ ExitError = CodeExitError{}
func (e CodeExitError) Error() string {
return e.Err.Error()
}
func (e CodeExitError) String() string {
return e.Err.Error()
}
func (e CodeExitError) Exited() bool {
return true
}
func (e CodeExitError) ExitStatus() int {
return e.Code
}

View File

@ -36,6 +36,7 @@ import (
"strconv"
"strings"
"sync"
"syscall"
"time"
"k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4"
@ -62,6 +63,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
sshutil "k8s.io/kubernetes/pkg/ssh"
"k8s.io/kubernetes/pkg/types"
uexec "k8s.io/kubernetes/pkg/util/exec"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/system"
@ -1996,7 +1998,7 @@ func (b kubectlBuilder) Exec() (string, error) {
Logf("Running '%s %s'", cmd.Path, strings.Join(cmd.Args[1:], " ")) // skip arg[0] as it is printed separately
if err := cmd.Start(); err != nil {
return "", fmt.Errorf("Error starting %v:\nCommand stdout:\n%v\nstderr:\n%v\nerror:\n%v\n", cmd, cmd.Stdout, cmd.Stderr, err)
return "", fmt.Errorf("error starting %v:\nCommand stdout:\n%v\nstderr:\n%v\nerror:\n%v\n", cmd, cmd.Stdout, cmd.Stderr, err)
}
errCh := make(chan error, 1)
go func() {
@ -2005,11 +2007,19 @@ func (b kubectlBuilder) Exec() (string, error) {
select {
case err := <-errCh:
if err != nil {
return "", fmt.Errorf("Error running %v:\nCommand stdout:\n%v\nstderr:\n%v\nerror:\n%v\n", cmd, cmd.Stdout, cmd.Stderr, err)
var rc int = 127
if ee, ok := err.(*exec.ExitError); ok {
Logf("rc: %d", rc)
rc = int(ee.Sys().(syscall.WaitStatus).ExitStatus())
}
return "", uexec.CodeExitError{
Err: fmt.Errorf("error running %v:\nCommand stdout:\n%v\nstderr:\n%v\nerror:\n%v\n", cmd, cmd.Stdout, cmd.Stderr, err),
Code: rc,
}
}
case <-b.timeout:
b.cmd.Process.Kill()
return "", fmt.Errorf("Timed out waiting for command %v:\nCommand stdout:\n%v\nstderr:\n%v\n", cmd, cmd.Stdout, cmd.Stderr)
return "", fmt.Errorf("timed out waiting for command %v:\nCommand stdout:\n%v\nstderr:\n%v\n", cmd, cmd.Stdout, cmd.Stderr)
}
Logf("stderr: %q", stderr.String())
return stdout.String(), nil

View File

@ -51,6 +51,7 @@ import (
"k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/generic/registry"
uexec "k8s.io/kubernetes/pkg/util/exec"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/pkg/util/wait"
@ -348,6 +349,49 @@ var _ = framework.KubeDescribe("Kubectl client", func() {
}
})
It("should return command exit codes", func() {
nsFlag := fmt.Sprintf("--namespace=%v", ns)
By("execing into a container with a successful command")
_, err := framework.NewKubectlCommand(nsFlag, "exec", "nginx", "--", "/bin/sh", "-c", "exit 0").Exec()
ExpectNoError(err)
By("execing into a container with a failing command")
_, err = framework.NewKubectlCommand(nsFlag, "exec", "nginx", "--", "/bin/sh", "-c", "exit 42").Exec()
ee, ok := err.(uexec.ExitError)
Expect(ok).To(Equal(true))
Expect(ee.ExitStatus()).To(Equal(42))
By("running a successful command")
_, err = framework.NewKubectlCommand(nsFlag, "run", "-i", "--image="+busyboxImage, "--restart=Never", "success", "--", "/bin/sh", "-c", "exit 0").Exec()
ExpectNoError(err)
By("running a failing command")
_, err = framework.NewKubectlCommand(nsFlag, "run", "-i", "--image="+busyboxImage, "--restart=Never", "failure-1", "--", "/bin/sh", "-c", "exit 42").Exec()
ee, ok = err.(uexec.ExitError)
Expect(ok).To(Equal(true))
Expect(ee.ExitStatus()).To(Equal(42))
By("running a failing command without --restart=Never")
_, err = framework.NewKubectlCommand(nsFlag, "run", "-i", "--image="+busyboxImage, "--restart=OnFailure", "failure-2", "--", "/bin/sh", "-c", "cat && exit 42").
WithStdinData("abcd1234").
Exec()
ExpectNoError(err)
By("running a failing command without --restart=Never, but with --rm")
_, err = framework.NewKubectlCommand(nsFlag, "run", "-i", "--image="+busyboxImage, "--restart=OnFailure", "--rm", "failure-3", "--", "/bin/sh", "-c", "cat && exit 42").
WithStdinData("abcd1234").
Exec()
ExpectNoError(err)
framework.WaitForPodToDisappear(f.Client, ns, "failure-3", labels.Everything(), 2*time.Second, wait.ForeverTestTimeout)
By("running a failing command with --leave-stdin-open")
_, err = framework.NewKubectlCommand(nsFlag, "run", "-i", "--image="+busyboxImage, "--restart=Never", "failure-4", "--leave-stdin-open", "--", "/bin/sh", "-c", "exit 42").
WithStdinData("abcd1234").
Exec()
ExpectNoError(err)
})
It("should support inline execution and attach", func() {
framework.SkipIfContainerRuntimeIs("rkt") // #23335
framework.SkipUnlessServerVersionGTE(jobsVersion, c)