Split the storage and negotiation parts of Codecs

The codec factory should support two distinct interfaces - negotiating
for a serializer with a client, vs reading or writing data to a storage
form (etcd, disk, etc). Make the EncodeForVersion and DecodeToVersion
methods only take Encoder and Decoder, and slight refactoring elsewhere.

In the storage factory, use a content type to control what serializer to
pick, and use the universal deserializer. This ensures that storage can
read JSON (which might be from older objects) while only writing
protobuf. Add exceptions for those resources that may not be able to
write to protobuf (specifically third party resources, but potentially
others in the future).
This commit is contained in:
Clayton Coleman 2016-04-23 15:00:28 -04:00
parent 5622c8a471
commit e0ebcf4216
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
47 changed files with 543 additions and 240 deletions

View File

@ -44,6 +44,7 @@ type APIServer struct {
AuthorizationMode string
AuthorizationConfig apiserver.AuthorizationConfig
BasicAuthFile string
DefaultStorageMediaType string
DeleteCollectionWorkers int
DeprecatedStorageVersion string
EtcdServersOverrides []string
@ -76,6 +77,7 @@ func NewAPIServer() *APIServer {
ServerRunOptions: genericapiserver.NewServerRunOptions(),
AdmissionControl: "AlwaysAdmit",
AuthorizationMode: "AlwaysAllow",
DefaultStorageMediaType: "application/json",
DeleteCollectionWorkers: 1,
EventTTL: 1 * time.Hour,
MasterServiceNamespace: api.NamespaceDefault,
@ -153,6 +155,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
"In the case where objects are moved from one group to the other, you may specify the format \"group1=group2/v1beta1,group3/v1beta1,...\". "+
"You only need to pass the groups you wish to change from the defaults. "+
"It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable.")
fs.StringVar(&s.DefaultStorageMediaType, "storage-media-type", s.DefaultStorageMediaType, "The media type to use to store objects in storage. Defaults to application/json. Some resources may only support a specific media type and will ignore this setting.")
fs.DurationVar(&s.EventTTL, "event-ttl", s.EventTTL, "Amount of time to retain events. Default 1 hour.")
fs.StringVar(&s.BasicAuthFile, "basic-auth-file", s.BasicAuthFile, "If set, the file that will be used to admit requests to the secure port of the API server via http basic authentication.")
fs.StringVar(&s.TokenAuthFile, "token-auth-file", s.TokenAuthFile, "If set, the file that will be used to secure the secure port of the API server via token authentication.")

View File

@ -167,7 +167,9 @@ func Run(s *options.APIServer) error {
resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal})
}
storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, api.Codecs, resourceEncoding, apiResourceConfigSource)
storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, s.DefaultStorageMediaType, api.Codecs, resourceEncoding, apiResourceConfigSource)
// third party resources are always serialized to storage using JSON
storageFactory.SetSerializer(extensions.Resource("thirdpartyresources"), "application/json", nil)
storageFactory.AddCohabitatingResources(batch.Resource("jobs"), extensions.Resource("jobs"))
storageFactory.AddCohabitatingResources(autoscaling.Resource("horizontalpodautoscalers"), extensions.Resource("horizontalpodautoscalers"))
for _, override := range s.EtcdServersOverrides {

View File

@ -110,6 +110,7 @@ kube-apiserver
--ssh-keyfile="": If non-empty, use secure SSH proxy to the nodes, using this user keyfile
--ssh-user="": If non-empty, use secure SSH proxy to the nodes, using this user name
--storage-backend="": The storage backend for persistence. Options: 'etcd2' (default), 'etcd3'.
--storage-media-type="application/json": The media type to use to store objects in storage. Defaults to application/json. Some resources may only support a specific media type and will ignore this setting.
--storage-versions="apps/v1alpha1,authorization.k8s.io/v1beta1,autoscaling/v1,batch/v1,componentconfig/v1alpha1,extensions/v1beta1,metrics/v1alpha1,v1": The per-group version to store resources in. Specified in the format "group1/version1,group2/version2,...". In the case where objects are moved from one group to the other, you may specify the format "group1=group2/v1beta1,group3/v1beta1,...". You only need to pass the groups you wish to change from the defaults. It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable.
--tls-cert-file="": File containing x509 Certificate for HTTPS. (CA cert, if any, concatenated after server cert). If HTTPS serving is enabled, and --tls-cert-file and --tls-private-key-file are not provided, a self-signed certificate and key are generated for the public address and saved to /var/run/kubernetes.
--tls-private-key-file="": File containing x509 private key matching --tls-cert-file.

View File

@ -45,7 +45,7 @@ func newStorageFactory() genericapiserver.StorageFactory {
Prefix: genericapiserver.DefaultEtcdPathPrefix,
ServerList: []string{"http://127.0.0.1:4001"},
}
storageFactory := genericapiserver.NewDefaultStorageFactory(config, api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig())
storageFactory := genericapiserver.NewDefaultStorageFactory(config, "application/json", api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig())
return storageFactory
}

View File

@ -44,6 +44,7 @@ type APIServer struct {
AuthorizationMode string
AuthorizationConfig apiserver.AuthorizationConfig
BasicAuthFile string
DefaultStorageMediaType string
DeleteCollectionWorkers int
DeprecatedStorageVersion string
EtcdServersOverrides []string
@ -76,6 +77,7 @@ func NewAPIServer() *APIServer {
ServerRunOptions: genericapiserver.NewServerRunOptions(),
AdmissionControl: "AlwaysAdmit",
AuthorizationMode: "AlwaysAllow",
DefaultStorageMediaType: "application/json",
DeleteCollectionWorkers: 1,
EventTTL: 1 * time.Hour,
MasterServiceNamespace: api.NamespaceDefault,
@ -153,6 +155,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
"In the case where objects are moved from one group to the other, you may specify the format \"group1=group2/v1beta1,group3/v1beta1,...\". "+
"You only need to pass the groups you wish to change from the defaults. "+
"It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable.")
fs.StringVar(&s.DefaultStorageMediaType, "storage-media-type", s.DefaultStorageMediaType, "The media type to use to store objects in storage. Defaults to application/json. Some resources may only support a specific media type and will ignore this setting.")
fs.DurationVar(&s.EventTTL, "event-ttl", s.EventTTL, "Amount of time to retain events. Default 1 hour.")
fs.StringVar(&s.BasicAuthFile, "basic-auth-file", s.BasicAuthFile, "If set, the file that will be used to admit requests to the secure port of the API server via http basic authentication.")
fs.StringVar(&s.TokenAuthFile, "token-auth-file", s.TokenAuthFile, "If set, the file that will be used to secure the secure port of the API server via token authentication.")

View File

@ -76,7 +76,7 @@ func Run(s *options.APIServer) error {
resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal})
}
storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, api.Codecs, resourceEncoding, apiResourceConfigSource)
storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, s.DefaultStorageMediaType, api.Codecs, resourceEncoding, apiResourceConfigSource)
for _, override := range s.EtcdServersOverrides {
tokens := strings.Split(override, "#")
if len(tokens) != 2 {

View File

@ -192,6 +192,7 @@ ADMISSION_CONTROL="NamespaceLifecycle,LimitRanger,ResourceQuota"
--public-address-override="127.0.0.1" \
--kubelet-port=${KUBELET_PORT} \
--runtime-config=api/v1 \
--storage-media-type="${KUBE_TEST_API_STORAGE_TYPE-}" \
--cert-dir="${TMPDIR:-/tmp/}" \
--service-cluster-ip-range="10.0.0.0/24" 1>&2 &
APISERVER_PID=$!
@ -202,6 +203,7 @@ kube::util::wait_for_url "http://127.0.0.1:${API_PORT}/healthz" "apiserver"
kube::log::status "Starting controller-manager"
"${KUBE_OUTPUT_HOSTBIN}/kube-controller-manager" \
--port="${CTLRMGR_PORT}" \
--kube-api-content-type="${KUBE_TEST_API_TYPE-}" \
--master="127.0.0.1:${API_PORT}" 1>&2 &
CTLRMGR_PID=$!

View File

@ -394,6 +394,7 @@ static-pods-config
stats-port
stop-services
storage-backend
storage-media-type
storage-version
storage-versions
streaming-connection-idle-timeout

View File

@ -38,7 +38,7 @@ import (
func init() {
codecsToTest = append(codecsToTest, func(version unversioned.GroupVersion, item runtime.Object) (runtime.Codec, error) {
s := protobuf.NewSerializer(api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), "application/arbitrary.content.type")
return api.Codecs.CodecForVersions(s, testapi.ExternalGroupVersions(), nil), nil
return api.Codecs.CodecForVersions(s, s, testapi.ExternalGroupVersions(), nil), nil
})
}

View File

@ -284,18 +284,14 @@ func TestObjectWatchFraming(t *testing.T) {
converted, _ := api.Scheme.ConvertToVersion(secret, "v1")
v1secret := converted.(*v1.Secret)
for _, streamingMediaType := range api.Codecs.SupportedStreamingMediaTypes() {
s, framer, mediaType, _ := api.Codecs.StreamingSerializerForMediaType(streamingMediaType, nil)
// TODO: remove this when the runtime.SerializerInfo PR lands
if mediaType == "application/vnd.kubernetes.protobuf;stream=watch" {
mediaType = "application/vnd.kubernetes.protobuf"
}
embedded, ok := api.Codecs.SerializerForMediaType(mediaType, nil)
if !ok {
t.Logf("no embedded serializer for %s", mediaType)
embedded = s
s, _ := api.Codecs.StreamingSerializerForMediaType(streamingMediaType, nil)
framer := s.Framer
embedded := s.Embedded.Serializer
if embedded == nil {
t.Errorf("no embedded serializer for %s", streamingMediaType)
continue
}
innerDecode := api.Codecs.DecoderToVersion(embedded, api.SchemeGroupVersion)
//innerEncode := api.Codecs.EncoderForVersion(embedded, api.SchemeGroupVersion)
// write a single object through the framer and back out
obj := &bytes.Buffer{}

View File

@ -19,6 +19,7 @@ package testapi
import (
"fmt"
"mime"
"os"
"reflect"
"strings"
@ -33,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/recognizer"
_ "k8s.io/kubernetes/federation/apis/federation/install"
_ "k8s.io/kubernetes/pkg/api/install"
@ -45,14 +47,16 @@ import (
)
var (
Groups = make(map[string]TestGroup)
Default TestGroup
Autoscaling TestGroup
Batch TestGroup
Extensions TestGroup
Apps TestGroup
Federation TestGroup
NegotiatedSerializer = api.Codecs
Groups = make(map[string]TestGroup)
Default TestGroup
Autoscaling TestGroup
Batch TestGroup
Extensions TestGroup
Apps TestGroup
Federation TestGroup
serializer runtime.SerializerInfo
storageSerializer runtime.SerializerInfo
)
type TestGroup struct {
@ -62,6 +66,30 @@ type TestGroup struct {
}
func init() {
if apiMediaType := os.Getenv("KUBE_TEST_API_TYPE"); len(apiMediaType) > 0 {
var ok bool
mediaType, options, err := mime.ParseMediaType(apiMediaType)
if err != nil {
panic(err)
}
serializer, ok = api.Codecs.SerializerForMediaType(mediaType, options)
if !ok {
panic(fmt.Sprintf("no serializer for %s", apiMediaType))
}
}
if storageMediaType := StorageMediaType(); len(storageMediaType) > 0 {
var ok bool
mediaType, options, err := mime.ParseMediaType(storageMediaType)
if err != nil {
panic(err)
}
storageSerializer, ok = api.Codecs.SerializerForMediaType(mediaType, options)
if !ok {
panic(fmt.Sprintf("no serializer for %s", storageMediaType))
}
}
kubeTestAPI := os.Getenv("KUBE_TEST_API")
if len(kubeTestAPI) != 0 {
testGroupVersions := strings.Split(kubeTestAPI, ",")
@ -173,9 +201,40 @@ func (g TestGroup) InternalTypes() map[string]reflect.Type {
}
// Codec returns the codec for the API version to test against, as set by the
// KUBE_TEST_API env var.
// KUBE_TEST_API_TYPE env var.
func (g TestGroup) Codec() runtime.Codec {
return api.Codecs.LegacyCodec(g.externalGroupVersion)
if serializer.Serializer == nil {
return api.Codecs.LegacyCodec(g.externalGroupVersion)
}
return api.Codecs.CodecForVersions(serializer, api.Codecs.UniversalDeserializer(), []unversioned.GroupVersion{g.externalGroupVersion}, nil)
}
// NegotiatedSerializer returns the negotiated serializer for the server.
func (g TestGroup) NegotiatedSerializer() runtime.NegotiatedSerializer {
return api.Codecs
}
func StorageMediaType() string {
return os.Getenv("KUBE_TEST_API_STORAGE_TYPE")
}
// StorageCodec returns the codec for the API version to store in etcd, as set by the
// KUBE_TEST_API_STORAGE_TYPE env var.
func (g TestGroup) StorageCodec() runtime.Codec {
s := storageSerializer.Serializer
if s == nil {
return api.Codecs.LegacyCodec(g.externalGroupVersion)
}
// etcd2 only supports string data - we must wrap any result before returning
// TODO: remove for etcd3 / make parameterizable
if !storageSerializer.EncodesAsText {
s = runtime.NewBase64Serializer(s)
}
ds := recognizer.NewDecoder(s, api.Codecs.UniversalDeserializer())
return api.Codecs.CodecForVersions(s, ds, []unversioned.GroupVersion{g.externalGroupVersion}, nil)
}
// Converter returns the api.Scheme for the API version to test against, as set by the

View File

@ -274,9 +274,16 @@ type StripVersionNegotiatedSerializer struct {
runtime.NegotiatedSerializer
}
func (n StripVersionNegotiatedSerializer) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder {
encoder := n.NegotiatedSerializer.EncoderForVersion(serializer, gv)
return stripVersionEncoder{encoder, serializer}
func (n StripVersionNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder {
serializer, ok := encoder.(runtime.Serializer)
if !ok {
// The stripVersionEncoder needs both an encoder and decoder, but is called from a context that doesn't have access to the
// decoder. We do a best effort cast here (since this code path is only for backwards compatibility) to get access to the caller's
// decoder.
panic(fmt.Sprintf("Unable to extract serializer from %#v", encoder))
}
versioned := n.NegotiatedSerializer.EncoderForVersion(encoder, gv)
return stripVersionEncoder{versioned, serializer}
}
func keepUnversioned(group string) bool {
@ -422,14 +429,14 @@ func write(statusCode int, gv unversioned.GroupVersion, s runtime.NegotiatedSeri
// writeNegotiated renders an object in the content type negotiated by the client
func writeNegotiated(s runtime.NegotiatedSerializer, gv unversioned.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
serializer, contentType, err := negotiateOutputSerializer(req, s)
serializer, err := negotiateOutputSerializer(req, s)
if err != nil {
status := errToAPIStatus(err)
writeRawJSON(int(status.Code), status, w)
return
}
w.Header().Set("Content-Type", contentType)
w.Header().Set("Content-Type", serializer.MediaType)
w.WriteHeader(statusCode)
encoder := s.EncoderForVersion(serializer, gv)

View File

@ -50,31 +50,31 @@ func negotiateOutput(req *http.Request, supported []string) (string, map[string]
return mediaType, accept.Params, nil
}
func negotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.Serializer, string, error) {
func negotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
supported := ns.SupportedMediaTypes()
mediaType, params, err := negotiateOutput(req, supported)
if err != nil {
return nil, "", err
return runtime.SerializerInfo{}, err
}
if s, ok := ns.SerializerForMediaType(mediaType, params); ok {
return s, mediaType, nil
return s, nil
}
return nil, "", errNotAcceptable{supported}
return runtime.SerializerInfo{}, errNotAcceptable{supported}
}
func negotiateOutputStreamSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.Serializer, runtime.Framer, string, string, error) {
func negotiateOutputStreamSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.StreamSerializerInfo, error) {
supported := ns.SupportedMediaTypes()
mediaType, params, err := negotiateOutput(req, supported)
if err != nil {
return nil, nil, "", "", err
return runtime.StreamSerializerInfo{}, err
}
if s, f, exactMediaType, ok := ns.StreamingSerializerForMediaType(mediaType, params); ok {
return s, f, mediaType, exactMediaType, nil
if s, ok := ns.StreamingSerializerForMediaType(mediaType, params); ok {
return s, nil
}
return nil, nil, "", "", errNotAcceptable{supported}
return runtime.StreamSerializerInfo{}, errNotAcceptable{supported}
}
func negotiateInputSerializer(req *http.Request, s runtime.NegotiatedSerializer) (runtime.Serializer, error) {
func negotiateInputSerializer(req *http.Request, s runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
supported := s.SupportedMediaTypes()
mediaType := req.Header.Get("Content-Type")
if len(mediaType) == 0 {
@ -82,11 +82,11 @@ func negotiateInputSerializer(req *http.Request, s runtime.NegotiatedSerializer)
}
mediaType, options, err := mime.ParseMediaType(mediaType)
if err != nil {
return nil, errUnsupportedMediaType{supported}
return runtime.SerializerInfo{}, errUnsupportedMediaType{supported}
}
out, ok := s.SerializerForMediaType(mediaType, options)
if !ok {
return nil, errUnsupportedMediaType{supported}
return runtime.SerializerInfo{}, errUnsupportedMediaType{supported}
}
return out, nil
}

View File

@ -41,27 +41,34 @@ func (n *fakeNegotiater) SupportedStreamingMediaTypes() []string {
return n.streamTypes
}
func (n *fakeNegotiater) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) {
func (n *fakeNegotiater) SerializerForMediaType(mediaType string, options map[string]string) (runtime.SerializerInfo, bool) {
n.mediaType = mediaType
if len(options) > 0 {
n.options = options
}
return n.serializer, n.serializer != nil
return runtime.SerializerInfo{Serializer: n.serializer, MediaType: n.mediaType, EncodesAsText: true}, n.serializer != nil
}
func (n *fakeNegotiater) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) {
func (n *fakeNegotiater) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.StreamSerializerInfo, bool) {
n.streamMediaType = mediaType
if len(options) > 0 {
n.streamOptions = options
}
return n.streamSerializer, n.framer, mediaType, n.streamSerializer != nil
return runtime.StreamSerializerInfo{
SerializerInfo: runtime.SerializerInfo{
Serializer: n.serializer,
MediaType: mediaType,
EncodesAsText: true,
},
Framer: n.framer,
}, n.streamSerializer != nil
}
func (n *fakeNegotiater) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder {
func (n *fakeNegotiater) EncoderForVersion(serializer runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder {
return n.serializer
}
func (n *fakeNegotiater) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder {
func (n *fakeNegotiater) DecoderToVersion(serializer runtime.Decoder, gv unversioned.GroupVersion) runtime.Decoder {
return n.serializer
}
@ -228,7 +235,7 @@ func TestNegotiate(t *testing.T) {
req = &http.Request{Header: http.Header{}}
req.Header.Set("Accept", test.accept)
}
s, contentType, err := negotiateOutputSerializer(req, test.ns)
s, err := negotiateOutputSerializer(req, test.ns)
switch {
case err == nil && test.errFn != nil:
t.Errorf("%d: failed: expected error", i)
@ -251,11 +258,11 @@ func TestNegotiate(t *testing.T) {
}
continue
}
if test.contentType != contentType {
t.Errorf("%d: unexpected %s %s", i, test.contentType, contentType)
if test.contentType != s.MediaType {
t.Errorf("%d: unexpected %s %s", i, test.contentType, s.MediaType)
}
if s != test.serializer {
t.Errorf("%d: unexpected %s %s", i, test.serializer, s)
if s.Serializer != test.serializer {
t.Errorf("%d: unexpected %s %s", i, test.serializer, s.Serializer)
}
if !reflect.DeepEqual(test.params, test.ns.options) {
t.Errorf("%d: unexpected %#v %#v", i, test.params, test.ns.options)

View File

@ -59,47 +59,33 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
return t.C, t.Stop
}
type textEncodable interface {
// EncodesAsText should return true if objects should be transmitted as a WebSocket Text
// frame (otherwise, they will be sent as a Binary frame).
EncodesAsText() bool
}
// serveWatch handles serving requests to the server
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
func serveWatch(watcher watch.Interface, scope RequestScope, req *restful.Request, res *restful.Response, timeout time.Duration) {
// negotiate for the stream serializer
serializer, framer, mediaType, exactMediaType, err := negotiateOutputStreamSerializer(req.Request, scope.Serializer)
serializer, err := negotiateOutputStreamSerializer(req.Request, scope.Serializer)
if err != nil {
scope.err(err, res.ResponseWriter, req.Request)
return
}
if framer == nil {
scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", mediaType), res.ResponseWriter, req.Request)
if serializer.Framer == nil {
scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType), res.ResponseWriter, req.Request)
return
}
encoder := scope.Serializer.EncoderForVersion(serializer, scope.Kind.GroupVersion())
encoder := scope.Serializer.EncoderForVersion(serializer.Serializer, scope.Kind.GroupVersion())
useTextFraming := false
if encodable, ok := serializer.(textEncodable); ok && encodable.EncodesAsText() {
useTextFraming = true
}
useTextFraming := serializer.EncodesAsText
// find the embedded serializer matching the media type
embeddedSerializer, ok := scope.Serializer.SerializerForMediaType(mediaType, nil)
if !ok {
scope.err(fmt.Errorf("no serializer defined for %q available for embedded encoding", mediaType), res.ResponseWriter, req.Request)
return
}
embeddedEncoder := scope.Serializer.EncoderForVersion(embeddedSerializer, scope.Kind.GroupVersion())
embeddedEncoder := scope.Serializer.EncoderForVersion(serializer.Embedded.Serializer, scope.Kind.GroupVersion())
server := &WatchServer{
watching: watcher,
scope: scope,
useTextFraming: useTextFraming,
mediaType: exactMediaType,
framer: framer,
mediaType: serializer.MediaType,
framer: serializer.Framer,
encoder: encoder,
embeddedEncoder: embeddedEncoder,
fixup: func(obj runtime.Object) {

View File

@ -222,9 +222,9 @@ func TestWatchRead(t *testing.T) {
for _, protocol := range protocols {
for _, test := range testCases {
serializer, framer, _, ok := api.Codecs.StreamingSerializerForMediaType(test.MediaType, nil)
serializer, ok := api.Codecs.StreamingSerializerForMediaType(test.MediaType, nil)
if !ok {
t.Fatal(framer)
t.Fatal(serializer)
}
r, contentType := protocol.fn(test.Accept)
@ -235,13 +235,13 @@ func TestWatchRead(t *testing.T) {
}
objectSerializer, ok := api.Codecs.SerializerForMediaType(test.MediaType, nil)
if !ok {
t.Fatal(framer)
t.Fatal(objectSerializer)
}
objectCodec := api.Codecs.DecoderToVersion(objectSerializer, testInternalGroupVersion)
var fr io.ReadCloser = r
if !protocol.selfFraming {
fr = framer.NewFrameReader(r)
fr = serializer.Framer.NewFrameReader(r)
}
d := streaming.NewDecoder(fr, serializer)
@ -495,9 +495,9 @@ func TestWatchHTTPTimeout(t *testing.T) {
timeoutCh := make(chan time.Time)
done := make(chan struct{})
_, framer, _, ok := api.Codecs.StreamingSerializerForMediaType("application/json", nil)
serializer, ok := api.Codecs.StreamingSerializerForMediaType("application/json", nil)
if !ok {
t.Fatal(framer)
t.Fatal(serializer)
}
// Setup a new watchserver
@ -505,7 +505,7 @@ func TestWatchHTTPTimeout(t *testing.T) {
watching: watcher,
mediaType: "testcase/json",
framer: framer,
framer: serializer.Framer,
encoder: newCodec,
embeddedEncoder: newCodec,

View File

@ -139,26 +139,23 @@ func readExpBackoffConfig() BackoffManager {
func createSerializers(config ContentConfig) (*Serializers, error) {
negotiated := config.NegotiatedSerializer
contentType := config.ContentType
serializer, ok := negotiated.SerializerForMediaType(contentType, nil)
info, ok := negotiated.SerializerForMediaType(contentType, nil)
if !ok {
return nil, fmt.Errorf("serializer for %s not registered", contentType)
}
streamingSerializer, framer, _, ok := negotiated.StreamingSerializerForMediaType(contentType, nil)
streamInfo, ok := negotiated.StreamingSerializerForMediaType(contentType, nil)
if !ok {
return nil, fmt.Errorf("streaming serializer for %s not registered", contentType)
}
if framer == nil {
return nil, fmt.Errorf("no framer for %s", contentType)
}
internalGV := unversioned.GroupVersion{
Group: config.GroupVersion.Group,
Version: runtime.APIVersionInternal,
}
return &Serializers{
Encoder: negotiated.EncoderForVersion(serializer, *config.GroupVersion),
Decoder: negotiated.DecoderToVersion(serializer, internalGV),
StreamingSerializer: streamingSerializer,
Framer: framer,
Encoder: negotiated.EncoderForVersion(info.Serializer, *config.GroupVersion),
Decoder: negotiated.DecoderToVersion(info.Serializer, internalGV),
StreamingSerializer: streamInfo.Serializer,
Framer: streamInfo.Framer,
}, nil
}

View File

@ -47,7 +47,7 @@ func TestDoRequestSuccess(t *testing.T) {
Host: testServer.URL,
ContentConfig: ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
NegotiatedSerializer: testapi.Default.NegotiatedSerializer(),
},
Username: "user",
Password: "pass",
@ -92,7 +92,7 @@ func TestDoRequestFailed(t *testing.T) {
Host: testServer.URL,
ContentConfig: ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
NegotiatedSerializer: testapi.Default.NegotiatedSerializer(),
},
})
if err != nil {
@ -130,7 +130,7 @@ func TestDoRequestCreated(t *testing.T) {
Host: testServer.URL,
ContentConfig: ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
NegotiatedSerializer: testapi.Default.NegotiatedSerializer(),
},
Username: "user",
Password: "pass",

View File

@ -87,13 +87,13 @@ func TestSetKubernetesDefaultsUserAgent(t *testing.T) {
}
func TestRESTClientRequires(t *testing.T) {
if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{NegotiatedSerializer: testapi.NegotiatedSerializer}}); err == nil {
if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{NegotiatedSerializer: testapi.Default.NegotiatedSerializer()}}); err == nil {
t.Errorf("unexpected non-error")
}
if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}); err == nil {
t.Errorf("unexpected non-error")
}
if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), NegotiatedSerializer: testapi.NegotiatedSerializer}}); err != nil {
if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), NegotiatedSerializer: testapi.Default.NegotiatedSerializer()}}); err != nil {
t.Errorf("unexpected error: %v", err)
}
}

View File

@ -639,6 +639,10 @@ func (r *Request) Watch() (watch.Interface, error) {
if r.err != nil {
return nil, r.err
}
if r.serializers.Framer == nil {
return nil, fmt.Errorf("watching resources is not possible with this client (content-type: %s)", r.content.ContentType)
}
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
if err != nil {

View File

@ -247,7 +247,7 @@ func defaultContentConfig() ContentConfig {
return ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: testapi.Default.Codec(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
NegotiatedSerializer: testapi.Default.NegotiatedSerializer(),
}
}
@ -549,6 +549,7 @@ func TestRequestWatch(t *testing.T) {
},
{
Request: &Request{
serializers: defaultSerializers(),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return nil, io.EOF
}),
@ -558,6 +559,7 @@ func TestRequestWatch(t *testing.T) {
},
{
Request: &Request{
serializers: defaultSerializers(),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return nil, &url.Error{Err: io.EOF}
}),
@ -567,6 +569,7 @@ func TestRequestWatch(t *testing.T) {
},
{
Request: &Request{
serializers: defaultSerializers(),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return nil, errors.New("http: can't write HTTP request on broken connection")
}),
@ -576,6 +579,7 @@ func TestRequestWatch(t *testing.T) {
},
{
Request: &Request{
serializers: defaultSerializers(),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return nil, errors.New("foo: connection reset by peer")
}),

View File

@ -215,7 +215,10 @@ func setDiscoveryDefaults(config *restclient.Config) error {
config.APIPath = ""
config.GroupVersion = nil
codec := runtime.NoopEncoder{Decoder: api.Codecs.UniversalDecoder()}
config.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(codec, codec, runtime.DefaultFramer)
config.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(
runtime.SerializerInfo{Serializer: codec},
runtime.StreamSerializerInfo{},
)
if len(config.UserAgent) == 0 {
config.UserAgent = restclient.DefaultKubernetesUserAgent()
}

View File

@ -33,7 +33,6 @@ import (
"k8s.io/kubernetes/pkg/conversion/queryparams"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer"
serializerjson "k8s.io/kubernetes/pkg/runtime/serializer/json"
"k8s.io/kubernetes/pkg/watch"
)
@ -51,8 +50,10 @@ func NewClient(conf *restclient.Config) (*Client, error) {
conf = &confCopy
codec := dynamicCodec{}
legacyCodec := api.Codecs.LegacyCodec(v1.SchemeGroupVersion)
conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(codec, legacyCodec, serializerjson.Framer)
// TODO: it's questionable that this should be using anything other than unstructured schema and JSON
streamingInfo, _ := api.Codecs.StreamingSerializerForMediaType("application/json;stream=watch", nil)
conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec}, streamingInfo)
if conf.APIPath == "" {
conf.APIPath = "/api"

View File

@ -42,7 +42,7 @@ func TestSetKubernetesDefaults(t *testing.T) {
ContentConfig: restclient.ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: testapi.Default.Codec(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
NegotiatedSerializer: testapi.Default.NegotiatedSerializer(),
},
QPS: 5,
Burst: 10,
@ -126,7 +126,7 @@ func TestHelperGetServerAPIVersions(t *testing.T) {
w.Write(output)
}))
defer server.Close()
got, err := restclient.ServerAPIVersions(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "invalid version", Version: "one"}, NegotiatedSerializer: testapi.NegotiatedSerializer}})
got, err := restclient.ServerAPIVersions(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "invalid version", Version: "one"}, NegotiatedSerializer: testapi.Default.NegotiatedSerializer()}})
if err != nil {
t.Fatalf("unexpected encoding error: %v", err)
}

View File

@ -215,7 +215,7 @@ func TestStream(t *testing.T) {
url, _ := url.ParseRequestURI(server.URL)
config := restclient.ContentConfig{
GroupVersion: &unversioned.GroupVersion{Group: "x"},
NegotiatedSerializer: testapi.NegotiatedSerializer,
NegotiatedSerializer: testapi.Default.NegotiatedSerializer(),
}
c, err := restclient.NewRESTClient(url, "", config, -1, -1, nil, nil)
if err != nil {

View File

@ -18,9 +18,11 @@ package genericapiserver
import (
"fmt"
"mime"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/recognizer"
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
@ -50,19 +52,25 @@ type DefaultStorageFactory struct {
Overrides map[unversioned.GroupResource]groupResourceOverrides
// DefaultMediaType is the media type used to store resources. If it is not set, "application/json" is used.
DefaultMediaType string
// DefaultSerializer is used to create encoders and decoders for the storage.Interface.
DefaultSerializer runtime.NegotiatedSerializer
DefaultSerializer runtime.StorageSerializer
// ResourceEncodingConfig describes how to encode a particular GroupVersionResource
ResourceEncodingConfig ResourceEncodingConfig
// APIResourceConfigSource indicates whether the *storage* is enabled, NOT the API
// This is discrete from resource enablement because those are separate concerns. How it is surfaced to the user via flags
// or config is up to whoever is building this.
// This is discrete from resource enablement because those are separate concerns. How this source is configured
// is left to the caller.
APIResourceConfigSource APIResourceConfigSource
// newEtcdFn exists to be overwritten for unit testing. You should never set this in a normal world.
newEtcdFn func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error)
// newStorageCodecFn exists to be overwritten for unit testing.
newStorageCodecFn func(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (codec runtime.Codec, err error)
// newStorageFn exists to be overwritten for unit testing.
newStorageFn func(config storagebackend.Config) (etcdStorage storage.Interface, err error)
}
type groupResourceOverrides struct {
@ -71,8 +79,10 @@ type groupResourceOverrides struct {
etcdLocation []string
// etcdPrefix contains the list of "special" prefixes for a GroupResource. Resource=* means for the entire group
etcdPrefix string
// mediaType is the desired serializer to choose. If empty, the default is chosen.
mediaType string
// serializer contains the list of "special" serializers for a GroupResource. Resource=* means for the entire group
serializer runtime.NegotiatedSerializer
serializer runtime.StorageSerializer
// cohabitatingResources keeps track of which resources must be stored together. This happens when we have multiple ways
// of exposing one set of concepts. autoscaling.HPA and extensions.HPA as a for instance
// The order of the slice matters! It is the priority order of lookup for finding a storage location
@ -83,15 +93,20 @@ var _ StorageFactory = &DefaultStorageFactory{}
const AllResources = "*"
func NewDefaultStorageFactory(config storagebackend.Config, defaultSerializer runtime.NegotiatedSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory {
func NewDefaultStorageFactory(config storagebackend.Config, defaultMediaType string, defaultSerializer runtime.StorageSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory {
if len(defaultMediaType) == 0 {
defaultMediaType = runtime.ContentTypeJSON
}
return &DefaultStorageFactory{
StorageConfig: config,
Overrides: map[unversioned.GroupResource]groupResourceOverrides{},
DefaultMediaType: defaultMediaType,
DefaultSerializer: defaultSerializer,
ResourceEncodingConfig: resourceEncodingConfig,
APIResourceConfigSource: resourceConfig,
newEtcdFn: newEtcd,
newStorageCodecFn: NewStorageCodec,
newStorageFn: newStorage,
}
}
@ -107,8 +122,9 @@ func (s *DefaultStorageFactory) SetEtcdPrefix(groupResource unversioned.GroupRes
s.Overrides[groupResource] = overrides
}
func (s *DefaultStorageFactory) SetSerializer(groupResource unversioned.GroupResource, serializer runtime.NegotiatedSerializer) {
func (s *DefaultStorageFactory) SetSerializer(groupResource unversioned.GroupResource, mediaType string, serializer runtime.StorageSerializer) {
overrides := s.Overrides[groupResource]
overrides.mediaType = mediaType
overrides.serializer = serializer
s.Overrides[groupResource] = overrides
}
@ -160,6 +176,14 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st
etcdPrefix = exactResourceOverride.etcdPrefix
}
etcdMediaType := s.DefaultMediaType
if len(groupOverride.mediaType) != 0 {
etcdMediaType = groupOverride.mediaType
}
if len(exactResourceOverride.mediaType) != 0 {
etcdMediaType = exactResourceOverride.mediaType
}
etcdSerializer := s.DefaultSerializer
if groupOverride.serializer != nil {
etcdSerializer = groupOverride.serializer
@ -183,27 +207,19 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st
return nil, err
}
codec, err := s.newStorageCodecFn(etcdMediaType, etcdSerializer, storageEncodingVersion, internalVersion, config)
if err != nil {
return nil, err
}
config.Codec = codec
glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, config)
return s.newEtcdFn(etcdSerializer, storageEncodingVersion, internalVersion, config)
return s.newStorageFn(config)
}
func newEtcd(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error) {
s, ok := ns.SerializerForMediaType("application/json", nil)
if !ok {
return nil, fmt.Errorf("unable to find serializer for JSON")
}
encoder := ns.EncoderForVersion(s, storageVersion)
decoder := ns.DecoderToVersion(s, memoryVersion)
if memoryVersion.Group != storageVersion.Group {
// Allow this codec to translate between groups.
if err = versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil {
return nil, fmt.Errorf("error setting up encoder from %v to %v: %v", memoryVersion, storageVersion, err)
}
if err = versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil {
return nil, fmt.Errorf("error setting up decoder from %v to %v: %v", storageVersion, memoryVersion, err)
}
}
config.Codec = runtime.NewCodec(encoder, decoder)
// newStorage is the default implementation for creating a storage backend.
func newStorage(config storagebackend.Config) (etcdStorage storage.Interface, err error) {
return storagebackend.Create(config)
}
@ -217,3 +233,39 @@ func (s *DefaultStorageFactory) Backends() []string {
}
return backends.List()
}
// NewStorageCodec assembles a storage codec for the provided storage media type, the provided serializer, and the requested
// storage and memory versions.
func NewStorageCodec(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (runtime.Codec, error) {
mediaType, options, err := mime.ParseMediaType(storageMediaType)
if err != nil {
return nil, fmt.Errorf("%q is not a valid mime-type", storageMediaType)
}
serializer, ok := ns.SerializerForMediaType(mediaType, options)
if !ok {
return nil, fmt.Errorf("unable to find serializer for %q", storageMediaType)
}
s := serializer.Serializer
// etcd2 only supports string data - we must wrap any result before returning
// TODO: storagebackend should return a boolean indicating whether it supports binary data
if !serializer.EncodesAsText && (config.Type == storagebackend.StorageTypeUnset || config.Type == storagebackend.StorageTypeETCD2) {
glog.V(4).Infof("Wrapping the underlying binary storage serializer with a base64 encoding for etcd2")
s = runtime.NewBase64Serializer(s)
}
ds := recognizer.NewDecoder(s, ns.UniversalDeserializer())
encoder := ns.EncoderForVersion(s, storageVersion)
decoder := ns.DecoderToVersion(ds, memoryVersion)
if memoryVersion.Group != storageVersion.Group {
// Allow this codec to translate between groups.
if err := versioning.EnableCrossGroupEncoding(encoder, memoryVersion.Group, storageVersion.Group); err != nil {
return nil, fmt.Errorf("error setting up encoder from %v to %v: %v", memoryVersion, storageVersion, err)
}
if err := versioning.EnableCrossGroupDecoding(decoder, storageVersion.Group, memoryVersion.Group); err != nil {
return nil, fmt.Errorf("error setting up decoder from %v to %v: %v", storageVersion, memoryVersion, err)
}
}
return runtime.NewCodec(encoder, decoder), nil
}

View File

@ -23,7 +23,6 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)
@ -50,7 +49,7 @@ func TestUpdateEtcdOverrides(t *testing.T) {
defaultEtcdLocation := []string{"http://127.0.0.1"}
for i, test := range testCases {
actualConfig := storagebackend.Config{}
newEtcdFn := func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error) {
newStorageFn := func(config storagebackend.Config) (_ storage.Interface, err error) {
actualConfig = config
return nil, nil
}
@ -59,8 +58,8 @@ func TestUpdateEtcdOverrides(t *testing.T) {
Prefix: DefaultEtcdPathPrefix,
ServerList: defaultEtcdLocation,
}
storageFactory := NewDefaultStorageFactory(defaultConfig, api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig())
storageFactory.newEtcdFn = newEtcdFn
storageFactory := NewDefaultStorageFactory(defaultConfig, "", api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig())
storageFactory.newStorageFn = newStorageFn
storageFactory.SetEtcdLocation(test.resource, test.servers)
var err error

View File

@ -55,7 +55,7 @@ func (m *Mapper) InfoForData(data []byte, source string) (*Info, error) {
var obj runtime.Object
var versioned runtime.Object
if registered.IsThirdPartyAPIGroupVersion(gvk.GroupVersion()) {
obj, err = runtime.Decode(thirdpartyresourcedata.NewCodec(nil, gvk.Kind), data)
obj, err = runtime.Decode(thirdpartyresourcedata.NewDecoder(nil, gvk.Kind), data)
versioned = obj
} else {
obj, versioned = versions.Last(), versions.First()

View File

@ -677,7 +677,14 @@ func (m *Master) InstallThirdPartyResource(rsrc *extensions.ThirdPartyResource)
func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupVersion {
resourceStorage := thirdpartyresourcedataetcd.NewREST(
generic.RESTOptions{Storage: m.thirdPartyStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: m.deleteCollectionWorkers}, group, kind)
generic.RESTOptions{
Storage: m.thirdPartyStorage,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: m.deleteCollectionWorkers,
},
group,
kind,
)
apiRoot := makeThirdPartyPath("")

View File

@ -88,7 +88,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
resourceEncoding.SetVersionEncoding(batch.GroupName, *testapi.Batch.GroupVersion(), unversioned.GroupVersion{Group: batch.GroupName, Version: runtime.APIVersionInternal})
resourceEncoding.SetVersionEncoding(apps.GroupName, *testapi.Apps.GroupVersion(), unversioned.GroupVersion{Group: apps.GroupName, Version: runtime.APIVersionInternal})
resourceEncoding.SetVersionEncoding(extensions.GroupName, *testapi.Extensions.GroupVersion(), unversioned.GroupVersion{Group: extensions.GroupName, Version: runtime.APIVersionInternal})
storageFactory := genericapiserver.NewDefaultStorageFactory(storageConfig, api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource())
storageFactory := genericapiserver.NewDefaultStorageFactory(storageConfig, testapi.StorageMediaType(), api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource())
config.StorageFactory = storageFactory
config.APIResourceConfigSource = DefaultAPIResourceConfigSource()

View File

@ -90,7 +90,7 @@ func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool {
func NewTestGenericStoreRegistry(t *testing.T) (*etcdtesting.EtcdTestServer, *Store) {
podPrefix := "/pods"
server := etcdtesting.NewEtcdTestClientServer(t)
s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.StorageCodec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true}
return server, &Store{

View File

@ -38,7 +38,7 @@ import (
func NewEtcdStorage(t *testing.T, group string) (storage.Interface, *etcdtesting.EtcdTestServer) {
server := etcdtesting.NewEtcdTestClientServer(t)
storage := etcdstorage.NewEtcdStorage(server.Client, testapi.Groups[group].Codec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
storage := etcdstorage.NewEtcdStorage(server.Client, testapi.Groups[group].StorageCodec(), etcdtest.PathPrefix(), false, etcdtest.DeserializationCacheSize)
return storage, server
}

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets"
)
type thirdPartyObjectConverter struct {
@ -183,31 +184,39 @@ func NewNegotiatedSerializer(s runtime.NegotiatedSerializer, kind string, encode
}
}
func (t *thirdPartyResourceDataCodecFactory) EncoderForVersion(s runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder {
return NewCodec(runtime.NewCodec(
t.NegotiatedSerializer.EncoderForVersion(s, gv),
t.NegotiatedSerializer.DecoderToVersion(s, t.decodeGV),
), t.kind)
func (t *thirdPartyResourceDataCodecFactory) SupportedMediaTypes() []string {
supported := sets.NewString(t.NegotiatedSerializer.SupportedMediaTypes()...)
return supported.Intersection(sets.NewString("application/json", "application/yaml")).List()
}
func (t *thirdPartyResourceDataCodecFactory) DecoderToVersion(s runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder {
return NewCodec(runtime.NewCodec(
t.NegotiatedSerializer.EncoderForVersion(s, t.encodeGV),
t.NegotiatedSerializer.DecoderToVersion(s, gv),
), t.kind)
func (t *thirdPartyResourceDataCodecFactory) SupportedStreamingMediaTypes() []string {
supported := sets.NewString(t.NegotiatedSerializer.SupportedStreamingMediaTypes()...)
return supported.Intersection(sets.NewString("application/json", "application/json;stream=watch")).List()
}
type thirdPartyResourceDataCodec struct {
delegate runtime.Codec
func (t *thirdPartyResourceDataCodecFactory) EncoderForVersion(s runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder {
return &thirdPartyResourceDataEncoder{delegate: t.NegotiatedSerializer.EncoderForVersion(s, gv), kind: t.kind}
}
func (t *thirdPartyResourceDataCodecFactory) DecoderToVersion(s runtime.Decoder, gv unversioned.GroupVersion) runtime.Decoder {
return NewDecoder(t.NegotiatedSerializer.DecoderToVersion(s, gv), t.kind)
}
func NewCodec(delegate runtime.Codec, kind string) runtime.Codec {
return runtime.NewCodec(NewEncoder(delegate, kind), NewDecoder(delegate, kind))
}
type thirdPartyResourceDataDecoder struct {
delegate runtime.Decoder
kind string
}
var _ runtime.Codec = &thirdPartyResourceDataCodec{}
func NewCodec(codec runtime.Codec, kind string) runtime.Codec {
return &thirdPartyResourceDataCodec{codec, kind}
func NewDecoder(delegate runtime.Decoder, kind string) runtime.Decoder {
return &thirdPartyResourceDataDecoder{delegate: delegate, kind: kind}
}
var _ runtime.Decoder = &thirdPartyResourceDataDecoder{}
func parseObject(data []byte) (map[string]interface{}, error) {
var obj interface{}
if err := json.Unmarshal(data, &obj); err != nil {
@ -221,7 +230,7 @@ func parseObject(data []byte) (map[string]interface{}, error) {
return mapObj, nil
}
func (t *thirdPartyResourceDataCodec) populate(data []byte) (runtime.Object, error) {
func (t *thirdPartyResourceDataDecoder) populate(data []byte) (runtime.Object, error) {
mapObj, err := parseObject(data)
if err != nil {
return nil, err
@ -229,7 +238,7 @@ func (t *thirdPartyResourceDataCodec) populate(data []byte) (runtime.Object, err
return t.populateFromObject(mapObj, data)
}
func (t *thirdPartyResourceDataCodec) populateFromObject(mapObj map[string]interface{}, data []byte) (runtime.Object, error) {
func (t *thirdPartyResourceDataDecoder) populateFromObject(mapObj map[string]interface{}, data []byte) (runtime.Object, error) {
typeMeta := unversioned.TypeMeta{}
if err := json.Unmarshal(data, &typeMeta); err != nil {
return nil, err
@ -252,7 +261,7 @@ func (t *thirdPartyResourceDataCodec) populateFromObject(mapObj map[string]inter
}
}
func (t *thirdPartyResourceDataCodec) populateResource(objIn *extensions.ThirdPartyResourceData, mapObj map[string]interface{}, data []byte) error {
func (t *thirdPartyResourceDataDecoder) populateResource(objIn *extensions.ThirdPartyResourceData, mapObj map[string]interface{}, data []byte) error {
metadata, ok := mapObj["metadata"].(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected object for metadata: %#v", mapObj["metadata"])
@ -274,7 +283,7 @@ func (t *thirdPartyResourceDataCodec) populateResource(objIn *extensions.ThirdPa
return nil
}
func (t *thirdPartyResourceDataCodec) Decode(data []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) {
func (t *thirdPartyResourceDataDecoder) Decode(data []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) {
if into == nil {
obj, err := t.populate(data)
if err != nil {
@ -347,7 +356,7 @@ func (t *thirdPartyResourceDataCodec) Decode(data []byte, gvk *unversioned.Group
return thirdParty, actual, nil
}
func (t *thirdPartyResourceDataCodec) populateListResource(objIn *extensions.ThirdPartyResourceDataList, mapObj map[string]interface{}) error {
func (t *thirdPartyResourceDataDecoder) populateListResource(objIn *extensions.ThirdPartyResourceDataList, mapObj map[string]interface{}) error {
items, ok := mapObj["items"].([]interface{})
if !ok {
return fmt.Errorf("unexpected object for items: %#v", mapObj["items"])
@ -374,6 +383,17 @@ const template = `{
"items": [ %s ]
}`
type thirdPartyResourceDataEncoder struct {
delegate runtime.Encoder
kind string
}
func NewEncoder(delegate runtime.Encoder, kind string) runtime.Encoder {
return &thirdPartyResourceDataEncoder{delegate: delegate, kind: kind}
}
var _ runtime.Encoder = &thirdPartyResourceDataEncoder{}
func encodeToJSON(obj *extensions.ThirdPartyResourceData, stream io.Writer) error {
var objOut interface{}
if err := json.Unmarshal(obj.Data, &objOut); err != nil {
@ -388,7 +408,7 @@ func encodeToJSON(obj *extensions.ThirdPartyResourceData, stream io.Writer) erro
return encoder.Encode(objMap)
}
func (t *thirdPartyResourceDataCodec) EncodeToStream(obj runtime.Object, stream io.Writer, overrides ...unversioned.GroupVersion) (err error) {
func (t *thirdPartyResourceDataEncoder) EncodeToStream(obj runtime.Object, stream io.Writer, overrides ...unversioned.GroupVersion) (err error) {
switch obj := obj.(type) {
case *extensions.ThirdPartyResourceData:
return encodeToJSON(obj, stream)

View File

@ -87,13 +87,14 @@ func TestCodec(t *testing.T) {
},
}
for _, test := range tests {
codec := &thirdPartyResourceDataCodec{kind: "Foo", delegate: testapi.Extensions.Codec()}
d := &thirdPartyResourceDataDecoder{kind: "Foo", delegate: testapi.Extensions.Codec()}
e := &thirdPartyResourceDataEncoder{kind: "Foo", delegate: testapi.Extensions.Codec()}
data, err := json.Marshal(test.obj)
if err != nil {
t.Errorf("[%s] unexpected error: %v", test.name, err)
continue
}
obj, err := runtime.Decode(codec, data)
obj, err := runtime.Decode(d, data)
if err != nil && !test.expectErr {
t.Errorf("[%s] unexpected error: %v", test.name, err)
continue
@ -121,7 +122,7 @@ func TestCodec(t *testing.T) {
t.Errorf("[%s]\nexpected\n%v\nsaw\n%v\n", test.name, test.obj, &output)
}
data, err = runtime.Encode(codec, rsrcObj)
data, err = runtime.Encode(e, rsrcObj)
if err != nil {
t.Errorf("[%s] unexpected error: %v", test.name, err)
}

View File

@ -18,6 +18,7 @@ package runtime
import (
"bytes"
"encoding/base64"
"fmt"
"io"
"net/url"
@ -169,3 +170,27 @@ func (c *parameterCodec) EncodeParameters(obj Object, to unversioned.GroupVersio
}
return queryparams.Convert(obj)
}
type base64Serializer struct {
Serializer
}
func NewBase64Serializer(s Serializer) Serializer {
return &base64Serializer{s}
}
func (s base64Serializer) EncodeToStream(obj Object, stream io.Writer, overrides ...unversioned.GroupVersion) error {
e := base64.NewEncoder(base64.StdEncoding, stream)
err := s.Serializer.EncodeToStream(obj, e, overrides...)
e.Close()
return err
}
func (s base64Serializer) Decode(data []byte, defaults *unversioned.GroupVersionKind, into Object) (Object, *unversioned.GroupVersionKind, error) {
out := make([]byte, base64.StdEncoding.DecodedLen(len(data)))
n, err := base64.StdEncoding.Decode(out, data)
if err != nil {
return nil, nil, err
}
return s.Serializer.Decode(out[:n], defaults, into)
}

View File

@ -118,3 +118,33 @@ func DeepCopy_runtime_Scheme(in Scheme, out *Scheme, c *conversion.Cloner) error
}
return nil
}
func DeepCopy_runtime_SerializerInfo(in SerializerInfo, out *SerializerInfo, c *conversion.Cloner) error {
if in.Serializer == nil {
out.Serializer = nil
} else if newVal, err := c.DeepCopy(in.Serializer); err != nil {
return err
} else {
out.Serializer = newVal.(Serializer)
}
out.EncodesAsText = in.EncodesAsText
out.MediaType = in.MediaType
return nil
}
func DeepCopy_runtime_StreamSerializerInfo(in StreamSerializerInfo, out *StreamSerializerInfo, c *conversion.Cloner) error {
if err := DeepCopy_runtime_SerializerInfo(in.SerializerInfo, &out.SerializerInfo, c); err != nil {
return err
}
if in.Framer == nil {
out.Framer = nil
} else if newVal, err := c.DeepCopy(in.Framer); err != nil {
return err
} else {
out.Framer = newVal.(Framer)
}
if err := DeepCopy_runtime_SerializerInfo(in.Embedded, &out.Embedded, c); err != nil {
return err
}
return nil
}

View File

@ -86,32 +86,75 @@ type Framer interface {
NewFrameWriter(w io.Writer) io.Writer
}
// SerializerInfo contains information about a specific serialization format
type SerializerInfo struct {
Serializer
// EncodesAsText indicates this serializer can be encoded to UTF-8 safely.
EncodesAsText bool
// MediaType is the value that represents this serializer over the wire.
MediaType string
}
// StreamSerializerInfo contains information about a specific stream serialization format
type StreamSerializerInfo struct {
SerializerInfo
// Framer is the factory for retrieving streams that separate objects on the wire
Framer
// Embedded is the type of the nested serialization that should be used.
Embedded SerializerInfo
}
// NegotiatedSerializer is an interface used for obtaining encoders, decoders, and serializers
// for multiple supported media types.
// for multiple supported media types. This would commonly be accepted by a server component
// that performs HTTP content negotiation to accept multiple formats.
type NegotiatedSerializer interface {
// SupportedMediaTypes is the media types supported for reading and writing single objects.
SupportedMediaTypes() []string
// SerializerForMediaType returns a serializer for the provided media type. Options is a set of
// parameters applied to the media type that may modify the resulting output.
SerializerForMediaType(mediaType string, options map[string]string) (Serializer, bool)
// SerializerForMediaType returns a serializer for the provided media type. params is the set of
// parameters applied to the media type that may modify the resulting output. ok will be false
// if no serializer matched the media type.
SerializerForMediaType(mediaType string, params map[string]string) (s SerializerInfo, ok bool)
// SupportedStreamingMediaTypes returns the media types of the supported streaming serializers.
// Streaming serializers control how multiple objects are written to a stream output.
SupportedStreamingMediaTypes() []string
// StreamingSerializerForMediaType returns a serializer for the provided media type that supports
// reading and writing multiple objects to a stream. It returns a framer and serializer, or an
// error if no such serializer can be created. Options is a set of parameters applied to the
// media type that may modify the resulting output.
StreamingSerializerForMediaType(mediaType string, options map[string]string) (Serializer, Framer, string, bool)
// error if no such serializer can be created. Params is the set of parameters applied to the
// media type that may modify the resulting output. ok will be false if no serializer matched
// the media type.
StreamingSerializerForMediaType(mediaType string, params map[string]string) (s StreamSerializerInfo, ok bool)
// EncoderForVersion returns an encoder that ensures objects being written to the provided
// serializer are in the provided group version.
// TODO: take multiple group versions
EncoderForVersion(serializer Serializer, gv unversioned.GroupVersion) Encoder
EncoderForVersion(serializer Encoder, gv unversioned.GroupVersion) Encoder
// DecoderForVersion returns a decoder that ensures objects being read by the provided
// serializer are in the provided group version by default.
// TODO: take multiple group versions
DecoderToVersion(serializer Serializer, gv unversioned.GroupVersion) Decoder
DecoderToVersion(serializer Decoder, gv unversioned.GroupVersion) Decoder
}
// StorageSerializer is an interface used for obtaining encoders, decoders, and serializers
// that can read and write data at rest. This would commonly be used by client tools that must
// read files, or server side storage interfaces that persist restful objects.
type StorageSerializer interface {
// SerializerForMediaType returns a serializer for the provided media type. Options is a set of
// parameters applied to the media type that may modify the resulting output.
SerializerForMediaType(mediaType string, options map[string]string) (SerializerInfo, bool)
// UniversalDeserializer returns a Serializer that can read objects in multiple supported formats
// by introspecting the data at rest.
UniversalDeserializer() Decoder
// EncoderForVersion returns an encoder that ensures objects being written to the provided
// serializer are in the provided group version.
// TODO: take multiple group versions
EncoderForVersion(serializer Encoder, gv unversioned.GroupVersion) Encoder
// DecoderForVersion returns a decoder that ensures objects being read by the provided
// serializer are in the provided group version by default.
// TODO: take multiple group versions
DecoderToVersion(serializer Decoder, gv unversioned.GroupVersion) Decoder
}
///////////////////////////////////////////////////////////////////////////////

View File

@ -31,6 +31,8 @@ type serializerType struct {
AcceptContentTypes []string
ContentType string
FileExtensions []string
// EncodesAsText should be true if this content type can be represented safely in UTF-8
EncodesAsText bool
Serializer runtime.Serializer
PrettySerializer runtime.Serializer
@ -65,6 +67,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []seri
AcceptContentTypes: []string{"application/json"},
ContentType: "application/json",
FileExtensions: []string{"json"},
EncodesAsText: true,
Serializer: jsonSerializer,
PrettySerializer: jsonPrettySerializer,
@ -77,6 +80,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []seri
AcceptContentTypes: []string{"application/yaml"},
ContentType: "application/yaml",
FileExtensions: []string{"yaml"},
EncodesAsText: true,
Serializer: yamlSerializer,
// TODO: requires runtime.RawExtension to properly distinguish when the nested content is
@ -206,62 +210,91 @@ func (f CodecFactory) UniversalDeserializer() runtime.Decoder {
//
// TODO: the decoder will eventually be removed in favor of dealing with objects in their versioned form
func (f CodecFactory) UniversalDecoder(versions ...unversioned.GroupVersion) runtime.Decoder {
return f.CodecForVersions(runtime.NoopEncoder{Decoder: f.universal}, nil, versions)
return f.CodecForVersions(nil, f.universal, nil, versions)
}
// CodecFor creates a codec with the provided serializer. If an object is decoded and its group is not in the list,
// it will default to runtime.APIVersionInternal. If encode is not specified for an object's group, the object is not
// converted. If encode or decode are nil, no conversion is performed.
func (f CodecFactory) CodecForVersions(serializer runtime.Serializer, encode []unversioned.GroupVersion, decode []unversioned.GroupVersion) runtime.Codec {
return versioning.NewCodecForScheme(f.scheme, serializer, serializer, encode, decode)
func (f CodecFactory) CodecForVersions(encoder runtime.Encoder, decoder runtime.Decoder, encode []unversioned.GroupVersion, decode []unversioned.GroupVersion) runtime.Codec {
return versioning.NewCodecForScheme(f.scheme, encoder, decoder, encode, decode)
}
// DecoderToVersion returns a decoder that targets the provided group version.
func (f CodecFactory) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder {
return f.CodecForVersions(serializer, nil, []unversioned.GroupVersion{gv})
func (f CodecFactory) DecoderToVersion(decoder runtime.Decoder, gv unversioned.GroupVersion) runtime.Decoder {
return f.CodecForVersions(nil, decoder, nil, []unversioned.GroupVersion{gv})
}
// EncoderForVersion returns an encoder that targets the provided group version.
func (f CodecFactory) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder {
return f.CodecForVersions(serializer, []unversioned.GroupVersion{gv}, nil)
func (f CodecFactory) EncoderForVersion(encoder runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder {
return f.CodecForVersions(encoder, nil, []unversioned.GroupVersion{gv}, nil)
}
// SerializerForMediaType returns a serializer that matches the provided RFC2046 mediaType, or false if no such
// serializer exists
func (f CodecFactory) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) {
func (f CodecFactory) SerializerForMediaType(mediaType string, params map[string]string) (runtime.SerializerInfo, bool) {
for _, s := range f.serializers {
for _, accepted := range s.AcceptContentTypes {
if accepted == mediaType {
if s.Specialize != nil && len(options) > 0 {
serializer, ok := s.Specialize(options)
return serializer, ok
// specialization abstracts variants to the content type
if s.Specialize != nil && len(params) > 0 {
serializer, ok := s.Specialize(params)
// TODO: return formatted mediaType+params
return runtime.SerializerInfo{Serializer: serializer, MediaType: s.ContentType, EncodesAsText: s.EncodesAsText}, ok
}
if v, ok := options["pretty"]; ok && v == "1" && s.PrettySerializer != nil {
return s.PrettySerializer, true
// legacy support for ?pretty=1 continues, but this is more formally defined
if v, ok := params["pretty"]; ok && v == "1" && s.PrettySerializer != nil {
return runtime.SerializerInfo{Serializer: s.PrettySerializer, MediaType: s.ContentType, EncodesAsText: s.EncodesAsText}, true
}
return s.Serializer, true
// return the base variant
return runtime.SerializerInfo{Serializer: s.Serializer, MediaType: s.ContentType, EncodesAsText: s.EncodesAsText}, true
}
}
}
return nil, false
return runtime.SerializerInfo{}, false
}
// StreamingSerializerForMediaType returns a serializer that matches the provided RFC2046 mediaType, or false if no such
// serializer exists
func (f CodecFactory) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) {
func (f CodecFactory) StreamingSerializerForMediaType(mediaType string, params map[string]string) (runtime.StreamSerializerInfo, bool) {
for _, s := range f.serializers {
for _, accepted := range s.AcceptStreamContentTypes {
if accepted == mediaType {
if s.StreamSpecialize != nil && len(options) > 0 {
serializer, ok := s.StreamSpecialize(options)
// TODO: have StreamSpecialize return exact content type
return serializer, s.Framer, s.StreamContentType, ok
// TODO: accept params
nested, ok := f.SerializerForMediaType(s.ContentType, nil)
if !ok {
panic("no serializer defined for internal content type")
}
return s.StreamSerializer, s.Framer, s.StreamContentType, true
if s.StreamSpecialize != nil && len(params) > 0 {
serializer, ok := s.StreamSpecialize(params)
// TODO: return formatted mediaType+params
return runtime.StreamSerializerInfo{
SerializerInfo: runtime.SerializerInfo{
Serializer: serializer,
MediaType: s.StreamContentType,
EncodesAsText: s.EncodesAsText,
},
Framer: s.Framer,
Embedded: nested,
}, ok
}
return runtime.StreamSerializerInfo{
SerializerInfo: runtime.SerializerInfo{
Serializer: s.StreamSerializer,
MediaType: s.StreamContentType,
EncodesAsText: s.EncodesAsText,
},
Framer: s.Framer,
Embedded: nested,
}, true
}
}
}
return nil, nil, "", false
return runtime.StreamSerializerInfo{}, false
}
// SerializerForFileExtension returns a serializer for the provided extension, or false if no serializer matches.

View File

@ -267,7 +267,7 @@ func TestVersionedEncoding(t *testing.T) {
encoder, _ := cf.SerializerForFileExtension("json")
// codec that is unversioned uses the target version
unversionedCodec := cf.CodecForVersions(encoder, nil, nil)
unversionedCodec := cf.CodecForVersions(encoder, nil, nil, nil)
_, err = runtime.Encode(unversionedCodec, &TestType1{}, unversioned.GroupVersion{Version: "v3"})
if err == nil || !runtime.IsNotRegisteredError(err) {
t.Fatal(err)

View File

@ -195,14 +195,6 @@ func (s *Serializer) RecognizesData(peek io.Reader) (ok, unknown bool, err error
return ok, false, nil
}
// EncodesAsText returns true because both JSON and YAML are considered textual representations
// of data. This is used to determine whether the serialized object should be transmitted over
// a WebSocket Text or Binary frame. This must remain true for legacy compatibility with v1.1
// watch over websocket implementations.
func (s *Serializer) EncodesAsText() bool {
return true
}
// Framer is the default JSON framing behavior, with newlines delimiting individual objects.
var Framer = jsonFramer{}

View File

@ -24,35 +24,34 @@ import (
// TODO: We should figure out what happens when someone asks
// encoder for version and it conflicts with the raw serializer.
type negotiatedSerializerWrapper struct {
serializer runtime.Serializer
streamingSerializer runtime.Serializer
framer runtime.Framer
info runtime.SerializerInfo
streamInfo runtime.StreamSerializerInfo
}
func NegotiatedSerializerWrapper(serializer, streamingSerializer runtime.Serializer, framer runtime.Framer) runtime.NegotiatedSerializer {
return &negotiatedSerializerWrapper{serializer, streamingSerializer, framer}
func NegotiatedSerializerWrapper(info runtime.SerializerInfo, streamInfo runtime.StreamSerializerInfo) runtime.NegotiatedSerializer {
return &negotiatedSerializerWrapper{info, streamInfo}
}
func (n *negotiatedSerializerWrapper) SupportedMediaTypes() []string {
return []string{}
}
func (n *negotiatedSerializerWrapper) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) {
return n.serializer, true
func (n *negotiatedSerializerWrapper) SerializerForMediaType(mediaType string, options map[string]string) (runtime.SerializerInfo, bool) {
return n.info, true
}
func (n *negotiatedSerializerWrapper) SupportedStreamingMediaTypes() []string {
return []string{}
}
func (n *negotiatedSerializerWrapper) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) {
return n.streamingSerializer, n.framer, "", true
func (n *negotiatedSerializerWrapper) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.StreamSerializerInfo, bool) {
return n.streamInfo, true
}
func (n *negotiatedSerializerWrapper) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder {
return n.serializer
func (n *negotiatedSerializerWrapper) EncoderForVersion(e runtime.Encoder, _ unversioned.GroupVersion) runtime.Encoder {
return e
}
func (n *negotiatedSerializerWrapper) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder {
return n.serializer
func (n *negotiatedSerializerWrapper) DecoderToVersion(d runtime.Decoder, _gv unversioned.GroupVersion) runtime.Decoder {
return d
}

View File

@ -297,6 +297,18 @@ func TestDecodeObjects(t *testing.T) {
t.Fatal(err)
}
unk2 := &runtime.Unknown{
TypeMeta: runtime.TypeMeta{Kind: "Pod", APIVersion: "v1"},
}
wire2 := make([]byte, len(wire1)*2)
n, err := unk2.NestedMarshalTo(wire2, obj1, uint64(obj1.Size()))
if err != nil {
t.Fatal(err)
}
if n != len(wire1) || !bytes.Equal(wire1, wire2[:n]) {
t.Fatalf("unexpected wire:\n%s", hex.Dump(wire2[:n]))
}
wire1 = append([]byte{0x6b, 0x38, 0x73, 0x00}, wire1...)
testCases := []struct {

View File

@ -17,6 +17,7 @@ limitations under the License.
package recognizer
import (
"bufio"
"bytes"
"fmt"
"io"

View File

@ -27,6 +27,7 @@ import (
// EnableCrossGroupDecoding modifies the given decoder in place, if it is a codec
// from this package. It allows objects from one group to be auto-decoded into
// another group. 'destGroup' must already exist in the codec.
// TODO: this is an encapsulation violation and should be refactored
func EnableCrossGroupDecoding(d runtime.Decoder, sourceGroup, destGroup string) error {
internal, ok := d.(*codec)
if !ok {
@ -45,6 +46,7 @@ func EnableCrossGroupDecoding(d runtime.Decoder, sourceGroup, destGroup string)
// EnableCrossGroupEncoding modifies the given encoder in place, if it is a codec
// from this package. It allows objects from one group to be auto-decoded into
// another group. 'destGroup' must already exist in the codec.
// TODO: this is an encapsulation violation and should be refactored
func EnableCrossGroupEncoding(e runtime.Encoder, sourceGroup, destGroup string) error {
internal, ok := e.(*codec)
if !ok {

View File

@ -86,9 +86,13 @@ func New(kubeConfigFile string) (*WebhookAuthorizer, error) {
if err != nil {
return nil, err
}
serializer := json.NewSerializer(json.DefaultMetaFactory, api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), false)
codec := versioning.NewCodecForScheme(api.Scheme, serializer, serializer, encodeVersions, decodeVersions)
clientConfig.ContentConfig.NegotiatedSerializer = runtimeserializer.NegotiatedSerializerWrapper(codec, codec, json.Framer)
clientConfig.ContentConfig.NegotiatedSerializer = runtimeserializer.NegotiatedSerializerWrapper(
runtime.SerializerInfo{Serializer: codec},
runtime.StreamSerializerInfo{},
)
restClient, err := restclient.UnversionedRESTClientFor(clientConfig)
if err != nil {

View File

@ -155,24 +155,29 @@ func NewMasterConfig() *master.Config {
Prefix: etcdtest.PathPrefix(),
}
negotiatedSerializer := NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json")
negotiatedSerializer := NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), runtime.ContentTypeJSON)
storageFactory := genericapiserver.NewDefaultStorageFactory(config, negotiatedSerializer, genericapiserver.NewDefaultResourceEncodingConfig(), master.DefaultAPIResourceConfigSource())
storageFactory := genericapiserver.NewDefaultStorageFactory(config, runtime.ContentTypeJSON, negotiatedSerializer, genericapiserver.NewDefaultResourceEncodingConfig(), master.DefaultAPIResourceConfigSource())
storageFactory.SetSerializer(
unversioned.GroupResource{Group: api.GroupName, Resource: genericapiserver.AllResources},
NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json"))
"",
NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), runtime.ContentTypeJSON))
storageFactory.SetSerializer(
unversioned.GroupResource{Group: autoscaling.GroupName, Resource: genericapiserver.AllResources},
NewSingleContentTypeSerializer(api.Scheme, testapi.Autoscaling.Codec(), "application/json"))
"",
NewSingleContentTypeSerializer(api.Scheme, testapi.Autoscaling.Codec(), runtime.ContentTypeJSON))
storageFactory.SetSerializer(
unversioned.GroupResource{Group: batch.GroupName, Resource: genericapiserver.AllResources},
NewSingleContentTypeSerializer(api.Scheme, testapi.Batch.Codec(), "application/json"))
"",
NewSingleContentTypeSerializer(api.Scheme, testapi.Batch.Codec(), runtime.ContentTypeJSON))
storageFactory.SetSerializer(
unversioned.GroupResource{Group: apps.GroupName, Resource: genericapiserver.AllResources},
NewSingleContentTypeSerializer(api.Scheme, testapi.Apps.Codec(), "application/json"))
"",
NewSingleContentTypeSerializer(api.Scheme, testapi.Apps.Codec(), runtime.ContentTypeJSON))
storageFactory.SetSerializer(
unversioned.GroupResource{Group: extensions.GroupName, Resource: genericapiserver.AllResources},
NewSingleContentTypeSerializer(api.Scheme, testapi.Extensions.Codec(), "application/json"))
"",
NewSingleContentTypeSerializer(api.Scheme, testapi.Extensions.Codec(), runtime.ContentTypeJSON))
return &master.Config{
Config: &genericapiserver.Config{

View File

@ -23,7 +23,7 @@ import (
)
// NewSingleContentTypeSerializer wraps a serializer in a NegotiatedSerializer that handles one content type
func NewSingleContentTypeSerializer(scheme *runtime.Scheme, serializer runtime.Serializer, contentType string) runtime.NegotiatedSerializer {
func NewSingleContentTypeSerializer(scheme *runtime.Scheme, serializer runtime.Serializer, contentType string) runtime.StorageSerializer {
return &wrappedSerializer{
scheme: scheme,
serializer: serializer,
@ -37,29 +37,31 @@ type wrappedSerializer struct {
contentType string
}
var _ runtime.NegotiatedSerializer = &wrappedSerializer{}
var _ runtime.StorageSerializer = &wrappedSerializer{}
func (s *wrappedSerializer) SupportedMediaTypes() []string {
return []string{s.contentType}
}
func (s *wrappedSerializer) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) {
func (s *wrappedSerializer) SerializerForMediaType(mediaType string, options map[string]string) (runtime.SerializerInfo, bool) {
if mediaType != s.contentType {
return nil, false
return runtime.SerializerInfo{}, false
}
return s.serializer, true
}
func (s *wrappedSerializer) SupportedStreamingMediaTypes() []string {
return nil
}
func (s *wrappedSerializer) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) {
return nil, nil, "", false
return runtime.SerializerInfo{
Serializer: s.serializer,
MediaType: mediaType,
EncodesAsText: true, // TODO: this should be parameterized
}, true
}
func (s *wrappedSerializer) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder {
return versioning.NewCodec(s.serializer, s.serializer, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), []unversioned.GroupVersion{gv}, nil)
func (s *wrappedSerializer) UniversalDeserializer() runtime.Decoder {
return s.serializer
}
func (s *wrappedSerializer) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder {
return versioning.NewCodec(s.serializer, s.serializer, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), nil, []unversioned.GroupVersion{gv})
func (s *wrappedSerializer) EncoderForVersion(encoder runtime.Encoder, gv unversioned.GroupVersion) runtime.Encoder {
return versioning.NewCodec(encoder, nil, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), []unversioned.GroupVersion{gv}, nil)
}
func (s *wrappedSerializer) DecoderToVersion(decoder runtime.Decoder, gv unversioned.GroupVersion) runtime.Decoder {
return versioning.NewCodec(nil, decoder, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), nil, []unversioned.GroupVersion{gv})
}