Cleanup watch encoding (remove dupe Encoding)
Move standard watch encode / decode streams to use runtime.RawExtension and embed API decoding based on a provided codec.
This commit is contained in:
69
pkg/watch/json/decoder.go
Normal file
69
pkg/watch/json/decoder.go
Normal file
@@ -0,0 +1,69 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package json
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// Decoder implements the watch.Decoder interface for io.ReadClosers that
|
||||
// have contents which consist of a series of watchEvent objects encoded via JSON.
|
||||
// It will decode any object registered in the supplied codec.
|
||||
type Decoder struct {
|
||||
r io.ReadCloser
|
||||
decoder *json.Decoder
|
||||
codec runtime.Codec
|
||||
}
|
||||
|
||||
// NewDecoder creates an Decoder for the given writer and codec.
|
||||
func NewDecoder(r io.ReadCloser, codec runtime.Codec) *Decoder {
|
||||
return &Decoder{
|
||||
r: r,
|
||||
decoder: json.NewDecoder(r),
|
||||
codec: codec,
|
||||
}
|
||||
}
|
||||
|
||||
// Decode blocks until it can return the next object in the writer. Returns an error
|
||||
// if the writer is closed or an object can't be decoded.
|
||||
func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) {
|
||||
var got watchEvent
|
||||
if err := d.decoder.Decode(&got); err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
switch got.Type {
|
||||
case watch.Added, watch.Modified, watch.Deleted:
|
||||
default:
|
||||
return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type)
|
||||
}
|
||||
|
||||
obj, err := d.codec.Decode(got.Object.RawJSON)
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("unable to decode watch event: %v", err)
|
||||
}
|
||||
return got.Type, obj, nil
|
||||
}
|
||||
|
||||
// Close closes the underlying r.
|
||||
func (d *Decoder) Close() {
|
||||
d.r.Close()
|
||||
}
|
101
pkg/watch/json/decoder_test.go
Normal file
101
pkg/watch/json/decoder_test.go
Normal file
@@ -0,0 +1,101 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package json
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
func TestDecoder(t *testing.T) {
|
||||
out, in := io.Pipe()
|
||||
decoder := NewDecoder(out, v1beta1.Codec)
|
||||
|
||||
expect := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||
encoder := json.NewEncoder(in)
|
||||
go func() {
|
||||
data, err := v1beta1.Codec.Encode(expect)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
if err := encoder.Encode(&watchEvent{watch.Added, runtime.RawExtension{json.RawMessage(data)}}); err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
in.Close()
|
||||
}()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
action, got, err := decoder.Decode()
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
if e, a := watch.Added, action; e != a {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := expect, got; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
t.Logf("Exited read")
|
||||
close(done)
|
||||
}()
|
||||
<-done
|
||||
|
||||
done = make(chan struct{})
|
||||
go func() {
|
||||
_, _, err := decoder.Decode()
|
||||
if err == nil {
|
||||
t.Errorf("Unexpected nil error")
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
<-done
|
||||
|
||||
decoder.Close()
|
||||
}
|
||||
|
||||
func TestDecoder_SourceClose(t *testing.T) {
|
||||
out, in := io.Pipe()
|
||||
decoder := NewDecoder(out, v1beta1.Codec)
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
_, _, err := decoder.Decode()
|
||||
if err == nil {
|
||||
t.Errorf("Unexpected nil error")
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
|
||||
in.Close()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
break
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Error("Timeout")
|
||||
}
|
||||
}
|
19
pkg/watch/json/doc.go
Normal file
19
pkg/watch/json/doc.go
Normal file
@@ -0,0 +1,19 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package json implements a simple encoder and decoder for streams
|
||||
// of watch events over io.Writer/Readers
|
||||
package json
|
53
pkg/watch/json/encoder.go
Normal file
53
pkg/watch/json/encoder.go
Normal file
@@ -0,0 +1,53 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package json
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// Encoder implements the json.Encoder interface for io.Writers that
|
||||
// should serialize watchEvent objects into JSON. It will encode any object
|
||||
// registered in the supplied codec and return an error otherwies.
|
||||
type Encoder struct {
|
||||
w io.Writer
|
||||
encoder *json.Encoder
|
||||
codec runtime.Codec
|
||||
}
|
||||
|
||||
// NewEncoder creates an Encoder for the given writer and codec
|
||||
func NewEncoder(w io.Writer, codec runtime.Codec) *Encoder {
|
||||
return &Encoder{
|
||||
w: w,
|
||||
encoder: json.NewEncoder(w),
|
||||
codec: codec,
|
||||
}
|
||||
}
|
||||
|
||||
// Encode writes an event to the writer. Returns an error
|
||||
// if the writer is closed or an object can't be encoded.
|
||||
func (e *Encoder) Encode(event *watch.Event) error {
|
||||
obj, err := Object(e.codec, event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return e.encoder.Encode(obj)
|
||||
}
|
76
pkg/watch/json/encoder_test.go
Normal file
76
pkg/watch/json/encoder_test.go
Normal file
@@ -0,0 +1,76 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package json
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io/ioutil"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
func TestEncodeDecodeRoundTrip(t *testing.T) {
|
||||
testCases := []struct {
|
||||
Type watch.EventType
|
||||
Object runtime.Object
|
||||
Codec runtime.Codec
|
||||
}{
|
||||
{
|
||||
watch.Added,
|
||||
&api.Pod{JSONBase: api.JSONBase{ID: "foo"}},
|
||||
v1beta1.Codec,
|
||||
},
|
||||
{
|
||||
watch.Modified,
|
||||
&api.Pod{JSONBase: api.JSONBase{ID: "foo"}},
|
||||
v1beta2.Codec,
|
||||
},
|
||||
{
|
||||
watch.Deleted,
|
||||
&api.Pod{JSONBase: api.JSONBase{ID: "foo"}},
|
||||
api.Codec,
|
||||
},
|
||||
}
|
||||
for i, testCase := range testCases {
|
||||
buf := &bytes.Buffer{}
|
||||
|
||||
encoder := NewEncoder(buf, testCase.Codec)
|
||||
if err := encoder.Encode(&watch.Event{Type: testCase.Type, Object: testCase.Object}); err != nil {
|
||||
t.Errorf("%d: unexpected error: %v", i, err)
|
||||
continue
|
||||
}
|
||||
|
||||
decoder := NewDecoder(ioutil.NopCloser(buf), testCase.Codec)
|
||||
event, obj, err := decoder.Decode()
|
||||
if err != nil {
|
||||
t.Errorf("%d: unexpected error: %v", i, err)
|
||||
continue
|
||||
}
|
||||
if !reflect.DeepEqual(testCase.Object, obj) {
|
||||
t.Errorf("%d: expected %#v, got %#v", i, testCase.Object, obj)
|
||||
}
|
||||
if event != testCase.Type {
|
||||
t.Errorf("%d: unexpected type: %#v", i, event)
|
||||
}
|
||||
}
|
||||
}
|
50
pkg/watch/json/types.go
Normal file
50
pkg/watch/json/types.go
Normal file
@@ -0,0 +1,50 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package json
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// watchEvent objects are streamed from the api server in response to a watch request.
|
||||
// These are not API objects and are unversioned today.
|
||||
type watchEvent struct {
|
||||
// The type of the watch event; added, modified, or deleted.
|
||||
Type watch.EventType `json:"type,omitempty" yaml:"type,omitempty"`
|
||||
|
||||
// For added or modified objects, this is the new object; for deleted objects,
|
||||
// it's the state of the object immediately prior to its deletion.
|
||||
Object runtime.RawExtension `json:"object,omitempty" yaml:"object,omitempty"`
|
||||
}
|
||||
|
||||
// Object converts a watch.Event into an appropriately serializable JSON object
|
||||
func Object(codec runtime.Codec, event *watch.Event) (interface{}, error) {
|
||||
obj, ok := event.Object.(runtime.Object)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("The event object cannot be safely converted to JSON: %v", reflect.TypeOf(event.Object).Name())
|
||||
}
|
||||
data, err := codec.Encode(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &watchEvent{event.Type, runtime.RawExtension{json.RawMessage(data)}}, nil
|
||||
}
|
Reference in New Issue
Block a user