Support content-type negotiation in the API server
A NegotiatedSerializer is passed into the API installer (and ParameterCodec, which abstracts conversion of query params) that can be used to negotiate client/server request/response serialization. All error paths are now negotiation aware, and are at least minimally version aware. Watch is specially coded to only allow application/json - a follow up change will convert it to use negotiation. Ensure the swagger scheme will include supported serializations - this now includes application/yaml as a negotiated option.
This commit is contained in:
@@ -65,23 +65,34 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
|
||||
}
|
||||
|
||||
// serveWatch handles serving requests to the server
|
||||
func serveWatch(watcher watch.Interface, scope RequestScope, w http.ResponseWriter, req *restful.Request, timeout time.Duration) {
|
||||
watchServer := &WatchServer{watcher, scope.Codec, func(obj runtime.Object) {
|
||||
func serveWatch(watcher watch.Interface, scope RequestScope, req *restful.Request, res *restful.Response, timeout time.Duration) {
|
||||
s, mediaType, err := negotiateOutputSerializer(req.Request, scope.Serializer)
|
||||
if err != nil {
|
||||
scope.err(err, req, res)
|
||||
return
|
||||
}
|
||||
// TODO: replace with typed serialization
|
||||
if mediaType != "application/json" {
|
||||
writeRawJSON(http.StatusNotAcceptable, (errNotAcceptable{[]string{"application/json"}}).Status(), res.ResponseWriter)
|
||||
return
|
||||
}
|
||||
encoder := scope.Serializer.EncoderForVersion(s, scope.Kind.GroupVersion())
|
||||
watchServer := &WatchServer{watcher, encoder, func(obj runtime.Object) {
|
||||
if err := setSelfLink(obj, req, scope.Namer); err != nil {
|
||||
glog.V(5).Infof("Failed to set self link for object %v: %v", reflect.TypeOf(obj), err)
|
||||
}
|
||||
}, &realTimeoutFactory{timeout}}
|
||||
if isWebsocketRequest(req.Request) {
|
||||
websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req.Request)
|
||||
websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(res.ResponseWriter), req.Request)
|
||||
} else {
|
||||
watchServer.ServeHTTP(w, req.Request)
|
||||
watchServer.ServeHTTP(res.ResponseWriter, req.Request)
|
||||
}
|
||||
}
|
||||
|
||||
// WatchServer serves a watch.Interface over a websocket or vanilla HTTP.
|
||||
type WatchServer struct {
|
||||
watching watch.Interface
|
||||
codec runtime.Codec
|
||||
encoder runtime.Encoder
|
||||
fixup func(runtime.Object)
|
||||
t timeoutFactory
|
||||
}
|
||||
@@ -108,7 +119,7 @@ func (w *WatchServer) HandleWS(ws *websocket.Conn) {
|
||||
return
|
||||
}
|
||||
w.fixup(event.Object)
|
||||
obj, err := watchjson.Object(w.codec, &event)
|
||||
obj, err := watchjson.Object(w.encoder, &event)
|
||||
if err != nil {
|
||||
// Client disconnect.
|
||||
w.watching.Stop()
|
||||
@@ -134,20 +145,21 @@ func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
|
||||
cn, ok := w.(http.CloseNotifier)
|
||||
if !ok {
|
||||
loggedW.Addf("unable to get CloseNotifier")
|
||||
loggedW.Addf("unable to get CloseNotifier: %#v", w)
|
||||
http.NotFound(w, req)
|
||||
return
|
||||
}
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
loggedW.Addf("unable to get Flusher")
|
||||
loggedW.Addf("unable to get Flusher: %#v", w)
|
||||
http.NotFound(w, req)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Transfer-Encoding", "chunked")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
flusher.Flush()
|
||||
encoder := watchjson.NewEncoder(w, self.codec)
|
||||
// TODO: use arbitrary serialization on watch
|
||||
encoder := watchjson.NewEncoder(w, self.encoder)
|
||||
for {
|
||||
select {
|
||||
case <-cn.CloseNotify():
|
||||
|
Reference in New Issue
Block a user