Merge pull request #11999 from brendandburns/attach2
Add support for attach to kubectl
This commit is contained in:
@@ -25,6 +25,7 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion/queryparams"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy"
|
||||
"github.com/golang/glog"
|
||||
@@ -40,32 +41,74 @@ func (u *defaultUpgrader) upgrade(req *client.Request, config *client.Config) (h
|
||||
return req.Upgrade(config, spdy.NewRoundTripper)
|
||||
}
|
||||
|
||||
// Executor executes a command on a pod container
|
||||
type Executor struct {
|
||||
req *client.Request
|
||||
config *client.Config
|
||||
command []string
|
||||
stdin io.Reader
|
||||
stdout io.Writer
|
||||
stderr io.Writer
|
||||
tty bool
|
||||
type Streamer struct {
|
||||
req *client.Request
|
||||
config *client.Config
|
||||
stdin io.Reader
|
||||
stdout io.Writer
|
||||
stderr io.Writer
|
||||
tty bool
|
||||
|
||||
upgrader upgrader
|
||||
}
|
||||
|
||||
// Executor executes a command on a pod container
|
||||
type Executor struct {
|
||||
Streamer
|
||||
command []string
|
||||
}
|
||||
|
||||
// New creates a new RemoteCommandExecutor
|
||||
func New(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) *Executor {
|
||||
return &Executor{
|
||||
req: req,
|
||||
config: config,
|
||||
command: command,
|
||||
stdin: stdin,
|
||||
stdout: stdout,
|
||||
stderr: stderr,
|
||||
tty: tty,
|
||||
Streamer: Streamer{
|
||||
req: req,
|
||||
config: config,
|
||||
stdin: stdin,
|
||||
stdout: stdout,
|
||||
stderr: stderr,
|
||||
tty: tty,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type Attach struct {
|
||||
Streamer
|
||||
}
|
||||
|
||||
// NewAttach creates a new RemoteAttach
|
||||
func NewAttach(req *client.Request, config *client.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) *Attach {
|
||||
return &Attach{
|
||||
Streamer: Streamer{
|
||||
req: req,
|
||||
config: config,
|
||||
stdin: stdin,
|
||||
stdout: stdout,
|
||||
stderr: stderr,
|
||||
tty: tty,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Execute sends a remote command execution request, upgrading the
|
||||
// connection and creating streams to represent stdin/stdout/stderr. Data is
|
||||
// copied between these streams and the supplied stdin/stdout/stderr parameters.
|
||||
func (e *Attach) Execute() error {
|
||||
opts := api.PodAttachOptions{
|
||||
Stdin: (e.stdin != nil),
|
||||
Stdout: (e.stdout != nil),
|
||||
Stderr: (!e.tty && e.stderr != nil),
|
||||
TTY: e.tty,
|
||||
}
|
||||
|
||||
if err := e.setupRequestParameters(&opts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return e.doStream()
|
||||
}
|
||||
|
||||
// Execute sends a remote command execution request, upgrading the
|
||||
// connection and creating streams to represent stdin/stdout/stderr. Data is
|
||||
// copied between these streams and the supplied stdin/stdout/stderr parameters.
|
||||
@@ -78,7 +121,15 @@ func (e *Executor) Execute() error {
|
||||
Command: e.command,
|
||||
}
|
||||
|
||||
versioned, err := api.Scheme.ConvertToVersion(&opts, e.config.Version)
|
||||
if err := e.setupRequestParameters(&opts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return e.doStream()
|
||||
}
|
||||
|
||||
func (e *Streamer) setupRequestParameters(obj runtime.Object) error {
|
||||
versioned, err := api.Scheme.ConvertToVersion(obj, e.config.Version)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -91,7 +142,10 @@ func (e *Executor) Execute() error {
|
||||
e.req.Param(k, vv)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Streamer) doStream() error {
|
||||
if e.upgrader == nil {
|
||||
e.upgrader = &defaultUpgrader{}
|
||||
}
|
||||
@@ -134,7 +188,7 @@ func (e *Executor) Execute() error {
|
||||
}()
|
||||
defer errorStream.Reset()
|
||||
|
||||
if opts.Stdin {
|
||||
if e.stdin != nil {
|
||||
headers.Set(api.StreamType, api.StreamTypeStdin)
|
||||
remoteStdin, err := conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
@@ -151,7 +205,7 @@ func (e *Executor) Execute() error {
|
||||
waitCount := 0
|
||||
completedStreams := 0
|
||||
|
||||
if opts.Stdout {
|
||||
if e.stdout != nil {
|
||||
waitCount++
|
||||
headers.Set(api.StreamType, api.StreamTypeStdout)
|
||||
remoteStdout, err := conn.CreateStream(headers)
|
||||
@@ -162,7 +216,7 @@ func (e *Executor) Execute() error {
|
||||
go cp(api.StreamTypeStdout, e.stdout, remoteStdout)
|
||||
}
|
||||
|
||||
if opts.Stderr && !e.tty {
|
||||
if e.stderr != nil && !e.tty {
|
||||
waitCount++
|
||||
headers.Set(api.StreamType, api.StreamTypeStderr)
|
||||
remoteStderr, err := conn.CreateStream(headers)
|
||||
|
@@ -190,3 +190,80 @@ func TestRequestExecuteRemoteCommand(t *testing.T) {
|
||||
server.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: this test is largely cut and paste, refactor to share code
|
||||
func TestRequestAttachRemoteCommand(t *testing.T) {
|
||||
testCases := []struct {
|
||||
Stdin string
|
||||
Stdout string
|
||||
Stderr string
|
||||
Error string
|
||||
Tty bool
|
||||
}{
|
||||
{
|
||||
Error: "bail",
|
||||
},
|
||||
{
|
||||
Stdin: "a",
|
||||
Stdout: "b",
|
||||
Stderr: "c",
|
||||
},
|
||||
{
|
||||
Stdin: "a",
|
||||
Stdout: "b",
|
||||
Tty: true,
|
||||
},
|
||||
}
|
||||
|
||||
for i, testCase := range testCases {
|
||||
localOut := &bytes.Buffer{}
|
||||
localErr := &bytes.Buffer{}
|
||||
|
||||
server := httptest.NewServer(fakeExecServer(t, i, testCase.Stdin, testCase.Stdout, testCase.Stderr, testCase.Error, testCase.Tty))
|
||||
|
||||
url, _ := url.ParseRequestURI(server.URL)
|
||||
c := client.NewRESTClient(url, "x", nil, -1, -1)
|
||||
req := c.Post().Resource("testing")
|
||||
|
||||
conf := &client.Config{
|
||||
Host: server.URL,
|
||||
}
|
||||
e := NewAttach(req, conf, strings.NewReader(testCase.Stdin), localOut, localErr, testCase.Tty)
|
||||
//e.upgrader = testCase.Upgrader
|
||||
err := e.Execute()
|
||||
hasErr := err != nil
|
||||
|
||||
if len(testCase.Error) > 0 {
|
||||
if !hasErr {
|
||||
t.Errorf("%d: expected an error", i)
|
||||
} else {
|
||||
if e, a := testCase.Error, err.Error(); !strings.Contains(a, e) {
|
||||
t.Errorf("%d: expected error stream read '%v', got '%v'", i, e, a)
|
||||
}
|
||||
}
|
||||
|
||||
server.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
if hasErr {
|
||||
t.Errorf("%d: unexpected error: %v", i, err)
|
||||
server.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
if len(testCase.Stdout) > 0 {
|
||||
if e, a := testCase.Stdout, localOut; e != a.String() {
|
||||
t.Errorf("%d: expected stdout data '%s', got '%s'", i, e, a)
|
||||
}
|
||||
}
|
||||
|
||||
if testCase.Stderr != "" {
|
||||
if e, a := testCase.Stderr, localErr; e != a.String() {
|
||||
t.Errorf("%d: expected stderr data '%s', got '%s'", i, e, a)
|
||||
}
|
||||
}
|
||||
|
||||
server.Close()
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user