178 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			178 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2015 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package wsstream
 | 
						|
 | 
						|
import (
 | 
						|
	"encoding/base64"
 | 
						|
	"io"
 | 
						|
	"net/http"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"golang.org/x/net/websocket"
 | 
						|
 | 
						|
	"k8s.io/apimachinery/pkg/util/runtime"
 | 
						|
)
 | 
						|
 | 
						|
// The WebSocket subprotocol "binary.k8s.io" will only send messages to the
 | 
						|
// client and ignore messages sent to the server. The received messages are
 | 
						|
// the exact bytes written to the stream. Zero byte messages are possible.
 | 
						|
const binaryWebSocketProtocol = "binary.k8s.io"
 | 
						|
 | 
						|
// The WebSocket subprotocol "base64.binary.k8s.io" will only send messages to the
 | 
						|
// client and ignore messages sent to the server. The received messages are
 | 
						|
// a base64 version of the bytes written to the stream. Zero byte messages are
 | 
						|
// possible.
 | 
						|
const base64BinaryWebSocketProtocol = "base64.binary.k8s.io"
 | 
						|
 | 
						|
// ReaderProtocolConfig describes a websocket subprotocol with one stream.
 | 
						|
type ReaderProtocolConfig struct {
 | 
						|
	Binary bool
 | 
						|
}
 | 
						|
 | 
						|
// NewDefaultReaderProtocols returns a stream protocol map with the
 | 
						|
// subprotocols "", "channel.k8s.io", "base64.channel.k8s.io".
 | 
						|
func NewDefaultReaderProtocols() map[string]ReaderProtocolConfig {
 | 
						|
	return map[string]ReaderProtocolConfig{
 | 
						|
		"": {Binary: true},
 | 
						|
		binaryWebSocketProtocol:       {Binary: true},
 | 
						|
		base64BinaryWebSocketProtocol: {Binary: false},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Reader supports returning an arbitrary byte stream over a websocket channel.
 | 
						|
type Reader struct {
 | 
						|
	err              chan error
 | 
						|
	r                io.Reader
 | 
						|
	ping             bool
 | 
						|
	timeout          time.Duration
 | 
						|
	protocols        map[string]ReaderProtocolConfig
 | 
						|
	selectedProtocol string
 | 
						|
 | 
						|
	handleCrash func() // overridable for testing
 | 
						|
}
 | 
						|
 | 
						|
// NewReader creates a WebSocket pipe that will copy the contents of r to a provided
 | 
						|
// WebSocket connection. If ping is true, a zero length message will be sent to the client
 | 
						|
// before the stream begins reading.
 | 
						|
//
 | 
						|
// The protocols parameter maps subprotocol names to StreamProtocols. The empty string
 | 
						|
// subprotocol name is used if websocket.Config.Protocol is empty.
 | 
						|
func NewReader(r io.Reader, ping bool, protocols map[string]ReaderProtocolConfig) *Reader {
 | 
						|
	return &Reader{
 | 
						|
		r:           r,
 | 
						|
		err:         make(chan error),
 | 
						|
		ping:        ping,
 | 
						|
		protocols:   protocols,
 | 
						|
		handleCrash: func() { runtime.HandleCrash() },
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// SetIdleTimeout sets the interval for both reads and writes before timeout. If not specified,
 | 
						|
// there is no timeout on the reader.
 | 
						|
func (r *Reader) SetIdleTimeout(duration time.Duration) {
 | 
						|
	r.timeout = duration
 | 
						|
}
 | 
						|
 | 
						|
func (r *Reader) handshake(config *websocket.Config, req *http.Request) error {
 | 
						|
	supportedProtocols := make([]string, 0, len(r.protocols))
 | 
						|
	for p := range r.protocols {
 | 
						|
		supportedProtocols = append(supportedProtocols, p)
 | 
						|
	}
 | 
						|
	return handshake(config, req, supportedProtocols)
 | 
						|
}
 | 
						|
 | 
						|
// Copy the reader to the response. The created WebSocket is closed after this
 | 
						|
// method completes.
 | 
						|
func (r *Reader) Copy(w http.ResponseWriter, req *http.Request) error {
 | 
						|
	go func() {
 | 
						|
		defer r.handleCrash()
 | 
						|
		websocket.Server{Handshake: r.handshake, Handler: r.handle}.ServeHTTP(w, req)
 | 
						|
	}()
 | 
						|
	return <-r.err
 | 
						|
}
 | 
						|
 | 
						|
// handle implements a WebSocket handler.
 | 
						|
func (r *Reader) handle(ws *websocket.Conn) {
 | 
						|
	// Close the connection when the client requests it, or when we finish streaming, whichever happens first
 | 
						|
	closeConnOnce := &sync.Once{}
 | 
						|
	closeConn := func() {
 | 
						|
		closeConnOnce.Do(func() {
 | 
						|
			ws.Close()
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
	negotiated := ws.Config().Protocol
 | 
						|
	r.selectedProtocol = negotiated[0]
 | 
						|
	defer close(r.err)
 | 
						|
	defer closeConn()
 | 
						|
 | 
						|
	go func() {
 | 
						|
		defer runtime.HandleCrash()
 | 
						|
		// This blocks until the connection is closed.
 | 
						|
		// Client should not send anything.
 | 
						|
		IgnoreReceives(ws, r.timeout)
 | 
						|
		// Once the client closes, we should also close
 | 
						|
		closeConn()
 | 
						|
	}()
 | 
						|
 | 
						|
	r.err <- messageCopy(ws, r.r, !r.protocols[r.selectedProtocol].Binary, r.ping, r.timeout)
 | 
						|
}
 | 
						|
 | 
						|
func resetTimeout(ws *websocket.Conn, timeout time.Duration) {
 | 
						|
	if timeout > 0 {
 | 
						|
		ws.SetDeadline(time.Now().Add(timeout))
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func messageCopy(ws *websocket.Conn, r io.Reader, base64Encode, ping bool, timeout time.Duration) error {
 | 
						|
	buf := make([]byte, 2048)
 | 
						|
	if ping {
 | 
						|
		resetTimeout(ws, timeout)
 | 
						|
		if base64Encode {
 | 
						|
			if err := websocket.Message.Send(ws, ""); err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			if err := websocket.Message.Send(ws, []byte{}); err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	for {
 | 
						|
		resetTimeout(ws, timeout)
 | 
						|
		n, err := r.Read(buf)
 | 
						|
		if err != nil {
 | 
						|
			if err == io.EOF {
 | 
						|
				return nil
 | 
						|
			}
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		if n > 0 {
 | 
						|
			if base64Encode {
 | 
						|
				if err := websocket.Message.Send(ws, base64.StdEncoding.EncodeToString(buf[:n])); err != nil {
 | 
						|
					return err
 | 
						|
				}
 | 
						|
			} else {
 | 
						|
				if err := websocket.Message.Send(ws, buf[:n]); err != nil {
 | 
						|
					return err
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 |