Make runtime less global for Codec

* Make Codec separate from Scheme
* Move EncodeOrDie off Scheme to take a Codec
* Make Copy work without a Codec
* Create a "latest" package that imports all versions and
  sets global defaults for "most recent encoding"
  * v1beta1 is the current "latest", v1beta2 exists
  * Kill DefaultCodec, replace it with "latest.Codec"
  * This updates the client and etcd to store the latest known version
* EmbeddedObject is per schema and per package now
* Move runtime.DefaultScheme to api.Scheme
* Split out WatchEvent since it's not an API object today, treat it
like a special object in api
* Kill DefaultResourceVersioner, instead place it on "latest" (as the
  package that understands all packages)
* Move objDiff to runtime.ObjectDiff
This commit is contained in:
Clayton Coleman
2014-09-11 13:02:53 -04:00
parent 154a91cd33
commit 61e3ce7ddc
58 changed files with 944 additions and 389 deletions

View File

@@ -1,62 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tools
import (
"encoding/json"
"fmt"
"io"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// APIEventDecoder implements the watch.Decoder interface for io.ReadClosers that
// have contents which consist of a series of api.WatchEvent objects encoded via JSON.
type APIEventDecoder struct {
stream io.ReadCloser
decoder *json.Decoder
}
// NewAPIEventDecoder creates an APIEventDecoder for the given stream.
func NewAPIEventDecoder(stream io.ReadCloser) *APIEventDecoder {
return &APIEventDecoder{
stream: stream,
decoder: json.NewDecoder(stream),
}
}
// Decode blocks until it can return the next object in the stream. Returns an error
// if the stream is closed or an object can't be decoded.
func (d *APIEventDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
var got api.WatchEvent
err = d.decoder.Decode(&got)
if err != nil {
return action, nil, err
}
switch got.Type {
case watch.Added, watch.Modified, watch.Deleted:
return got.Type, got.Object.Object, err
}
return action, nil, fmt.Errorf("got invalid watch event type: %v", got.Type)
}
// Close closes the underlying stream.
func (d *APIEventDecoder) Close() {
d.stream.Close()
}

View File

@@ -1,108 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tools
import (
"encoding/json"
"io"
"reflect"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
func TestDecoder(t *testing.T) {
out, in := io.Pipe()
encoder := json.NewEncoder(in)
decoder := NewAPIEventDecoder(out)
expect := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
go func() {
err := encoder.Encode(api.WatchEvent{watch.Added, runtime.EmbeddedObject{expect}})
if err != nil {
t.Errorf("Unexpected error %v", err)
}
}()
done := make(chan struct{})
go func() {
action, got, err := decoder.Decode()
if err != nil {
t.Errorf("Unexpected error %v", err)
}
if e, a := watch.Added, action; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := expect, got; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
close(done)
}()
select {
case <-done:
break
case <-time.After(10 * time.Second):
t.Error("Timeout")
}
done = make(chan struct{})
go func() {
_, _, err := decoder.Decode()
if err == nil {
t.Errorf("Unexpected nil error")
}
close(done)
}()
decoder.Close()
select {
case <-done:
break
case <-time.After(10 * time.Second):
t.Error("Timeout")
}
}
func TestDecoder_SourceClose(t *testing.T) {
out, in := io.Pipe()
decoder := NewAPIEventDecoder(out)
done := make(chan struct{})
go func() {
_, _, err := decoder.Decode()
if err == nil {
t.Errorf("Unexpected nil error")
}
close(done)
}()
in.Close()
select {
case <-done:
break
case <-time.After(10 * time.Second):
t.Error("Timeout")
}
}

View File

@@ -24,6 +24,7 @@ import (
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
@@ -42,13 +43,14 @@ type TestResource struct {
func (*TestResource) IsAnAPIObject() {}
var scheme *runtime.Scheme
var codec = latest.Codec
var versioner = runtime.DefaultResourceVersioner
var codec runtime.Codec
var versioner = runtime.NewJSONBaseResourceVersioner()
func init() {
scheme = runtime.NewScheme("", "v1beta1")
scheme = runtime.NewScheme()
scheme.AddKnownTypes("", &TestResource{})
scheme.AddKnownTypes("v1beta1", &TestResource{})
codec = runtime.CodecFor(scheme, "v1beta1")
}
func TestIsEtcdNotFound(t *testing.T) {
@@ -93,7 +95,7 @@ func TestExtractList(t *testing.T) {
}
var got []api.Pod
helper := EtcdHelper{fakeClient, codec, versioner}
helper := EtcdHelper{fakeClient, latest.Codec, versioner}
resourceVersion := uint64(0)
err := helper.ExtractList("/some/key", &got, &resourceVersion)
if err != nil {
@@ -114,7 +116,7 @@ func TestExtractObj(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
expect := api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
fakeClient.Set("/some/key", util.EncodeJSON(expect), 0)
helper := EtcdHelper{fakeClient, codec, versioner}
helper := EtcdHelper{fakeClient, latest.Codec, versioner}
var got api.Pod
err := helper.ExtractObj("/some/key", &got, false)
if err != nil {
@@ -168,12 +170,12 @@ func TestExtractObjNotFoundErr(t *testing.T) {
func TestSetObj(t *testing.T) {
obj := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := EtcdHelper{fakeClient, codec, versioner}
helper := EtcdHelper{fakeClient, latest.Codec, versioner}
err := helper.SetObj("/some/key", obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
data, err := codec.Encode(obj)
data, err := latest.Codec.Encode(obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
@@ -191,18 +193,18 @@ func TestSetObjWithVersion(t *testing.T) {
fakeClient.Data["/some/key"] = EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: latest.Codec.EncodeOrDie(obj),
Value: runtime.EncodeOrDie(latest.Codec, obj),
ModifiedIndex: 1,
},
},
}
helper := EtcdHelper{fakeClient, codec, versioner}
helper := EtcdHelper{fakeClient, latest.Codec, versioner}
err := helper.SetObj("/some/key", obj)
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
data, err := codec.Encode(obj)
data, err := latest.Codec.Encode(obj)
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
@@ -216,12 +218,12 @@ func TestSetObjWithVersion(t *testing.T) {
func TestSetObjWithoutResourceVersioner(t *testing.T) {
obj := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := EtcdHelper{fakeClient, codec, nil}
helper := EtcdHelper{fakeClient, latest.Codec, nil}
err := helper.SetObj("/some/key", obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
data, err := codec.Encode(obj)
data, err := latest.Codec.Encode(obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
@@ -235,7 +237,6 @@ func TestSetObjWithoutResourceVersioner(t *testing.T) {
func TestAtomicUpdate(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.TestIndex = true
codec := scheme
helper := EtcdHelper{fakeClient, codec, runtime.NewJSONBaseResourceVersioner()}
// Create a new node.
@@ -290,7 +291,7 @@ func TestAtomicUpdate(t *testing.T) {
func TestAtomicUpdateNoChange(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := EtcdHelper{fakeClient, scheme, runtime.NewJSONBaseResourceVersioner()}
helper := EtcdHelper{fakeClient, codec, runtime.NewJSONBaseResourceVersioner()}
// Create a new node.
fakeClient.ExpectNotFoundGet("/some/key")
@@ -321,7 +322,6 @@ func TestAtomicUpdateNoChange(t *testing.T) {
func TestAtomicUpdate_CreateCollision(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.TestIndex = true
codec := scheme
helper := EtcdHelper{fakeClient, codec, runtime.NewJSONBaseResourceVersioner()}
fakeClient.ExpectNotFoundGet("/some/key")

View File

@@ -23,12 +23,14 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
)
func TestWatchInterpretations(t *testing.T) {
codec := latest.Codec
// Declare some pods to make the test cases compact.
podFoo := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBar := &api.Pod{JSONBase: api.JSONBase{ID: "bar"}}
@@ -48,62 +50,62 @@ func TestWatchInterpretations(t *testing.T) {
}{
"create": {
actions: []string{"create", "get"},
nodeValue: latest.Codec.EncodeOrDie(podBar),
nodeValue: runtime.EncodeOrDie(codec, podBar),
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
"create but filter blocks": {
actions: []string{"create", "get"},
nodeValue: latest.Codec.EncodeOrDie(podFoo),
nodeValue: runtime.EncodeOrDie(codec, podFoo),
expectEmit: false,
},
"delete": {
actions: []string{"delete"},
prevNodeValue: latest.Codec.EncodeOrDie(podBar),
prevNodeValue: runtime.EncodeOrDie(codec, podBar),
expectEmit: true,
expectType: watch.Deleted,
expectObject: podBar,
},
"delete but filter blocks": {
actions: []string{"delete"},
nodeValue: latest.Codec.EncodeOrDie(podFoo),
nodeValue: runtime.EncodeOrDie(codec, podFoo),
expectEmit: false,
},
"modify appears to create 1": {
actions: []string{"set", "compareAndSwap"},
nodeValue: latest.Codec.EncodeOrDie(podBar),
nodeValue: runtime.EncodeOrDie(codec, podBar),
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
"modify appears to create 2": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: latest.Codec.EncodeOrDie(podFoo),
nodeValue: latest.Codec.EncodeOrDie(podBar),
prevNodeValue: runtime.EncodeOrDie(codec, podFoo),
nodeValue: runtime.EncodeOrDie(codec, podBar),
expectEmit: true,
expectType: watch.Added,
expectObject: podBar,
},
"modify appears to delete": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: latest.Codec.EncodeOrDie(podBar),
nodeValue: latest.Codec.EncodeOrDie(podFoo),
prevNodeValue: runtime.EncodeOrDie(codec, podBar),
nodeValue: runtime.EncodeOrDie(codec, podFoo),
expectEmit: true,
expectType: watch.Deleted,
expectObject: podBar, // Should return last state that passed the filter!
},
"modify modifies": {
actions: []string{"set", "compareAndSwap"},
prevNodeValue: latest.Codec.EncodeOrDie(podBar),
nodeValue: latest.Codec.EncodeOrDie(podBaz),
prevNodeValue: runtime.EncodeOrDie(codec, podBar),
nodeValue: runtime.EncodeOrDie(codec, podBaz),
expectEmit: true,
expectType: watch.Modified,
expectObject: podBaz,
},
"modify ignores": {
actions: []string{"set", "compareAndSwap"},
nodeValue: latest.Codec.EncodeOrDie(podFoo),
nodeValue: runtime.EncodeOrDie(codec, podFoo),
expectEmit: false,
},
}
@@ -197,6 +199,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
}
func TestWatch(t *testing.T) {
codec := latest.Codec
fakeClient := NewFakeEtcdClient(t)
fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{}
h := EtcdHelper{fakeClient, codec, versioner}
@@ -243,6 +246,7 @@ func TestWatch(t *testing.T) {
}
func TestWatchEtcdState(t *testing.T) {
codec := latest.Codec
type T struct {
Type watch.EventType
Endpoints []string
@@ -259,7 +263,7 @@ func TestWatchEtcdState(t *testing.T) {
{
Action: "create",
Node: &etcd.Node{
Value: string(latest.Codec.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})),
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})),
},
},
},
@@ -273,12 +277,12 @@ func TestWatchEtcdState(t *testing.T) {
{
Action: "compareAndSwap",
Node: &etcd.Node{
Value: string(latest.Codec.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})),
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})),
CreatedIndex: 1,
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: string(latest.Codec.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})),
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
@@ -295,7 +299,7 @@ func TestWatchEtcdState(t *testing.T) {
R: &etcd.Response{
Action: "get",
Node: &etcd.Node{
Value: string(latest.Codec.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})),
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
@@ -308,12 +312,12 @@ func TestWatchEtcdState(t *testing.T) {
{
Action: "compareAndSwap",
Node: &etcd.Node{
Value: string(latest.Codec.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})),
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})),
CreatedIndex: 1,
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: string(latest.Codec.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})),
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
@@ -359,6 +363,7 @@ func TestWatchEtcdState(t *testing.T) {
}
func TestWatchFromZeroIndex(t *testing.T) {
codec := latest.Codec
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
testCases := map[string]struct {
@@ -370,7 +375,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: latest.Codec.EncodeOrDie(pod),
Value: runtime.EncodeOrDie(codec, pod),
CreatedIndex: 1,
ModifiedIndex: 1,
},
@@ -385,7 +390,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: latest.Codec.EncodeOrDie(pod),
Value: runtime.EncodeOrDie(codec, pod),
CreatedIndex: 1,
ModifiedIndex: 2,
},
@@ -434,6 +439,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
}
func TestWatchListFromZeroIndex(t *testing.T) {
codec := latest.Codec
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
fakeClient := NewFakeEtcdClient(t)
@@ -443,13 +449,13 @@ func TestWatchListFromZeroIndex(t *testing.T) {
Dir: true,
Nodes: etcd.Nodes{
&etcd.Node{
Value: latest.Codec.EncodeOrDie(pod),
Value: runtime.EncodeOrDie(codec, pod),
CreatedIndex: 1,
ModifiedIndex: 1,
Nodes: etcd.Nodes{},
},
&etcd.Node{
Value: latest.Codec.EncodeOrDie(pod),
Value: runtime.EncodeOrDie(codec, pod),
CreatedIndex: 2,
ModifiedIndex: 2,
Nodes: etcd.Nodes{},