Fix exec/attach test flakes
Ensure that stream reply frames are enqueued prior to any goaway frames.
This commit is contained in:
@@ -606,6 +606,15 @@ func standardShellChannels(stdin, stdout, stderr bool) []wsstream.ChannelType {
|
||||
return channels
|
||||
}
|
||||
|
||||
// streamAndReply holds both a Stream and a channel that is closed when the stream's reply frame is
|
||||
// enqueued. Consumers can wait for replySent to be closed prior to proceeding, to ensure that the
|
||||
// replyFrame is enqueued before the connection's goaway frame is sent (e.g. if a stream was
|
||||
// received and right after, the connection gets closed).
|
||||
type streamAndReply struct {
|
||||
httpstream.Stream
|
||||
replySent <-chan struct{}
|
||||
}
|
||||
|
||||
func (s *Server) createStreams(request *restful.Request, response *restful.Response) (io.Reader, io.WriteCloser, io.WriteCloser, io.WriteCloser, Closer, bool, bool) {
|
||||
tty := request.QueryParameter(api.ExecTTYParam) == "1"
|
||||
stdin := request.QueryParameter(api.ExecStdinParam) == "1"
|
||||
@@ -664,11 +673,11 @@ func (s *Server) createStreams(request *restful.Request, response *restful.Respo
|
||||
return nil, nil, nil, nil, nil, false, false
|
||||
}
|
||||
|
||||
streamCh := make(chan httpstream.Stream)
|
||||
streamCh := make(chan streamAndReply)
|
||||
|
||||
upgrader := spdy.NewResponseUpgrader()
|
||||
conn := upgrader.UpgradeResponse(response.ResponseWriter, request.Request, func(stream httpstream.Stream) error {
|
||||
streamCh <- stream
|
||||
conn := upgrader.UpgradeResponse(response.ResponseWriter, request.Request, func(stream httpstream.Stream, replySent <-chan struct{}) error {
|
||||
streamCh <- streamAndReply{Stream: stream, replySent: replySent}
|
||||
return nil
|
||||
})
|
||||
// from this point on, we can no longer call methods on response
|
||||
@@ -686,6 +695,9 @@ func (s *Server) createStreams(request *restful.Request, response *restful.Respo
|
||||
|
||||
var errorStream, stdinStream, stdoutStream, stderrStream httpstream.Stream
|
||||
receivedStreams := 0
|
||||
replyChan := make(chan struct{})
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
WaitForStreams:
|
||||
for {
|
||||
select {
|
||||
@@ -694,19 +706,21 @@ WaitForStreams:
|
||||
switch streamType {
|
||||
case api.StreamTypeError:
|
||||
errorStream = stream
|
||||
receivedStreams++
|
||||
go waitStreamReply(stream.replySent, replyChan, stop)
|
||||
case api.StreamTypeStdin:
|
||||
stdinStream = stream
|
||||
receivedStreams++
|
||||
go waitStreamReply(stream.replySent, replyChan, stop)
|
||||
case api.StreamTypeStdout:
|
||||
stdoutStream = stream
|
||||
receivedStreams++
|
||||
go waitStreamReply(stream.replySent, replyChan, stop)
|
||||
case api.StreamTypeStderr:
|
||||
stderrStream = stream
|
||||
receivedStreams++
|
||||
go waitStreamReply(stream.replySent, replyChan, stop)
|
||||
default:
|
||||
glog.Errorf("Unexpected stream type: '%s'", streamType)
|
||||
}
|
||||
case <-replyChan:
|
||||
receivedStreams++
|
||||
if receivedStreams == expectedStreams {
|
||||
break WaitForStreams
|
||||
}
|
||||
@@ -721,6 +735,16 @@ WaitForStreams:
|
||||
return stdinStream, stdoutStream, stderrStream, errorStream, conn, tty, true
|
||||
}
|
||||
|
||||
// waitStreamReply waits until either replySent or stop is closed. If replySent is closed, it sends
|
||||
// an empty struct to the notify channel.
|
||||
func waitStreamReply(replySent <-chan struct{}, notify chan<- struct{}, stop <-chan struct{}) {
|
||||
select {
|
||||
case <-replySent:
|
||||
notify <- struct{}{}
|
||||
case <-stop:
|
||||
}
|
||||
}
|
||||
|
||||
func getPodCoordinates(request *restful.Request) (namespace, pod string, uid types.UID) {
|
||||
namespace = request.PathParameter("podNamespace")
|
||||
pod = request.PathParameter("podID")
|
||||
@@ -796,8 +820,8 @@ func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder Po
|
||||
// forward streams. It checks each stream's port and stream type headers,
|
||||
// rejecting any streams that with missing or invalid values. Each valid
|
||||
// stream is sent to the streams channel.
|
||||
func portForwardStreamReceived(streams chan httpstream.Stream) func(httpstream.Stream) error {
|
||||
return func(stream httpstream.Stream) error {
|
||||
func portForwardStreamReceived(streams chan httpstream.Stream) func(httpstream.Stream, <-chan struct{}) error {
|
||||
return func(stream httpstream.Stream, replySent <-chan struct{}) error {
|
||||
// make sure it has a valid port header
|
||||
portString := stream.Headers().Get(api.PortHeader)
|
||||
if len(portString) == 0 {
|
||||
|
Reference in New Issue
Block a user