diff --git a/pkg/api/helper.go b/pkg/api/helper.go index 4f1ebc75ead..d12a3824fab 100644 --- a/pkg/api/helper.go +++ b/pkg/api/helper.go @@ -44,6 +44,8 @@ func init() { Status{}, ServerOpList{}, ServerOp{}, + ContainerManifestList{}, + Endpoints{}, ) AddKnownTypes("v1beta1", v1beta1.PodList{}, @@ -106,6 +108,15 @@ func FindJSONBaseRO(obj interface{}) (JSONBase, error) { return jsonBase.Interface().(JSONBase), nil } +// EncodeOrDie is a version of Encode which will panic instead of returning an error. For tests. +func EncodeOrDie(obj interface{}) string { + bytes, err := Encode(obj) + if err != nil { + panic(err) + } + return string(bytes) +} + // Encode turns the given api object into an appropriate JSON string. // Will return an error if the object doesn't have an embedded JSONBase. // Obj may be a pointer to a struct, or a struct. If a struct, a copy diff --git a/pkg/api/types.go b/pkg/api/types.go index 485254b1d9a..85cd42207c8 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -58,6 +58,12 @@ type ContainerManifest struct { Containers []Container `yaml:"containers" json:"containers"` } +// ContainerManifestList is used to communicate container manifests to kubelet. +type ContainerManifestList struct { + JSONBase `json:",inline" yaml:",inline"` + Items []ContainerManifest `json:"items,omitempty" yaml:"items,omitempty"` +} + // Volume represents a named volume in a pod that may be accessed by any containers in the pod. type Volume struct { // Required: This must be a DNS_LABEL. Each volume in a pod must have @@ -289,8 +295,8 @@ type Service struct { // Endpoints is a collection of endpoints that implement the actual service, for example: // Name: "mysql", Endpoints: ["10.10.1.1:1909", "10.10.2.2:8834"] type Endpoints struct { - Name string - Endpoints []string + JSONBase `json:",inline" yaml:",inline"` + Endpoints []string `json:"endpoints,omitempty" yaml:"endpoints,omitempty"` } // Minion is a worker node in Kubernetenes. diff --git a/pkg/kubelet/config/etcd.go b/pkg/kubelet/config/etcd.go index 3ab7a0cd084..ef3210f5936 100644 --- a/pkg/kubelet/config/etcd.go +++ b/pkg/kubelet/config/etcd.go @@ -9,7 +9,7 @@ You may obtain a copy of the License at Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or sied. +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ @@ -110,12 +110,12 @@ func responseToPods(response *etcd.Response) ([]kubelet.Pod, error) { return pods, fmt.Errorf("no nodes field: %v", response) } - manifests := []api.ContainerManifest{} + manifests := api.ContainerManifestList{} if err := yaml.Unmarshal([]byte(response.Node.Value), &manifests); err != nil { return pods, fmt.Errorf("could not unmarshal manifests: %v", err) } - for i, manifest := range manifests { + for i, manifest := range manifests.Items { name := manifest.ID if name == "" { name = fmt.Sprintf("_%d", i+1) diff --git a/pkg/kubelet/config/etcd_test.go b/pkg/kubelet/config/etcd_test.go index dbf7039a2b6..0a302f0978d 100644 --- a/pkg/kubelet/config/etcd_test.go +++ b/pkg/kubelet/config/etcd_test.go @@ -24,7 +24,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" ) @@ -36,7 +35,9 @@ func TestGetEtcdData(t *testing.T) { fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ - Value: util.MakeJSONString([]api.ContainerManifest{api.ContainerManifest{ID: "foo"}}), + Value: api.EncodeOrDie(&api.ContainerManifestList{ + Items: []api.ContainerManifest{{ID: "foo"}}, + }), ModifiedIndex: 1, }, }, @@ -76,7 +77,9 @@ func TestGetEtcd(t *testing.T) { fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ - Value: util.MakeJSONString([]api.ContainerManifest{api.ContainerManifest{ID: "foo"}}), + Value: api.EncodeOrDie(&api.ContainerManifestList{ + Items: []api.ContainerManifest{{ID: "foo"}}, + }), ModifiedIndex: 1, }, }, @@ -103,7 +106,7 @@ func TestWatchEtcd(t *testing.T) { fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ - Value: util.MakeJSONString([]api.Container{}), + Value: api.EncodeOrDie(&api.ContainerManifestList{}), ModifiedIndex: 2, }, }, diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 5fab579b6cf..c106a9a91fb 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -127,19 +127,19 @@ func (s *endpointsStore) Merge(source string, change interface{}) error { case ADD: glog.Infof("Adding new endpoint from source %s : %v", source, update.Endpoints) for _, value := range update.Endpoints { - endpoints[value.Name] = value + endpoints[value.ID] = value } case REMOVE: glog.Infof("Removing an endpoint %v", update) for _, value := range update.Endpoints { - delete(endpoints, value.Name) + delete(endpoints, value.ID) } case SET: glog.Infof("Setting endpoints %v", update) // Clear the old map entries by just creating a new map endpoints = make(map[string]api.Endpoints) for _, value := range update.Endpoints { - endpoints[value.Name] = value + endpoints[value.ID] = value } default: glog.Infof("Received invalid update type: %v", update) diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index 755e02c19f8..38152457739 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -83,7 +83,7 @@ func (s sortedEndpoints) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s sortedEndpoints) Less(i, j int) bool { - return s[i].Name < s[j].Name + return s[i].ID < s[j].ID } type EndpointsHandlerMock struct { @@ -216,8 +216,14 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing. handler2 := NewEndpointsHandlerMock() config.RegisterHandler(handler) config.RegisterHandler(handler2) - endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foo", Endpoints: []string{"endpoint1", "endpoint2"}}) - endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "bar", Endpoints: []string{"endpoint3", "endpoint4"}}) + endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{ + JSONBase: api.JSONBase{ID: "foo"}, + Endpoints: []string{"endpoint1", "endpoint2"}, + }) + endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{ + JSONBase: api.JSONBase{ID: "bar"}, + Endpoints: []string{"endpoint3", "endpoint4"}, + }) handler.Wait(2) handler2.Wait(2) channelOne <- endpointsUpdate1 @@ -236,8 +242,14 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t handler2 := NewEndpointsHandlerMock() config.RegisterHandler(handler) config.RegisterHandler(handler2) - endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foo", Endpoints: []string{"endpoint1", "endpoint2"}}) - endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "bar", Endpoints: []string{"endpoint3", "endpoint4"}}) + endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{ + JSONBase: api.JSONBase{ID: "foo"}, + Endpoints: []string{"endpoint1", "endpoint2"}, + }) + endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{ + JSONBase: api.JSONBase{ID: "bar"}, + Endpoints: []string{"endpoint3", "endpoint4"}, + }) handler.Wait(2) handler2.Wait(2) channelOne <- endpointsUpdate1 @@ -248,7 +260,10 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t handler2.ValidateEndpoints(t, endpoints) // Add one more - endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foobar", Endpoints: []string{"endpoint5", "endpoint6"}}) + endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{ + JSONBase: api.JSONBase{ID: "foobar"}, + Endpoints: []string{"endpoint5", "endpoint6"}, + }) handler.Wait(1) handler2.Wait(1) channelTwo <- endpointsUpdate3 @@ -257,7 +272,10 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t handler2.ValidateEndpoints(t, endpoints) // Update the "foo" service with new endpoints - endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foo", Endpoints: []string{"endpoint77"}}) + endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{ + JSONBase: api.JSONBase{ID: "foo"}, + Endpoints: []string{"endpoint77"}, + }) handler.Wait(1) handler2.Wait(1) channelOne <- endpointsUpdate1 @@ -266,7 +284,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t handler2.ValidateEndpoints(t, endpoints) // Remove "bar" service - endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{Name: "bar"}) + endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{JSONBase: api.JSONBase{ID: "bar"}}) handler.Wait(1) handler2.Wait(1) channelTwo <- endpointsUpdate2 diff --git a/pkg/proxy/config/etcd.go b/pkg/proxy/config/etcd.go index 93f63a165bf..e0451306916 100644 --- a/pkg/proxy/config/etcd.go +++ b/pkg/proxy/config/etcd.go @@ -34,7 +34,6 @@ limitations under the License. package config import ( - "encoding/json" "fmt" "strings" "time" @@ -127,7 +126,7 @@ func (s ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error) // and create a Service entry for it. for i, node := range response.Node.Nodes { var svc api.Service - err = json.Unmarshal([]byte(node.Value), &svc) + err = api.DecodeInto([]byte(node.Value), &svc) if err != nil { glog.Errorf("Failed to load Service: %s (%#v)", node.Value, err) continue @@ -154,7 +153,9 @@ func (s ConfigSourceEtcd) GetEndpoints(service string) (api.Endpoints, error) { return api.Endpoints{}, err } // Parse all the endpoint specifications in this value. - return parseEndpoints(response.Node.Value) + var e api.Endpoints + err = api.DecodeInto([]byte(response.Node.Value), &e) + return e, err } // etcdResponseToService takes an etcd response and pulls it apart to find service. @@ -163,19 +164,13 @@ func etcdResponseToService(response *etcd.Response) (*api.Service, error) { return nil, fmt.Errorf("invalid response from etcd: %#v", response) } var svc api.Service - err := json.Unmarshal([]byte(response.Node.Value), &svc) + err := api.DecodeInto([]byte(response.Node.Value), &svc) if err != nil { return nil, err } return &svc, err } -func parseEndpoints(jsonString string) (api.Endpoints, error) { - var e api.Endpoints - err := json.Unmarshal([]byte(jsonString), &e) - return e, err -} - func (s ConfigSourceEtcd) WatchForChanges() { glog.Info("Setting up a watch for new services") watchChannel := make(chan *etcd.Response) @@ -220,7 +215,7 @@ func (s ConfigSourceEtcd) ProcessChange(response *etcd.Response) { func (s ConfigSourceEtcd) ProcessEndpointResponse(response *etcd.Response) { glog.Infof("Processing a change in endpoint configuration... %s", *response) var endpoints api.Endpoints - err := json.Unmarshal([]byte(response.Node.Value), &endpoints) + err := api.DecodeInto([]byte(response.Node.Value), &endpoints) if err != nil { glog.Errorf("Failed to parse service out of etcd key: %v : %+v", response.Node.Value, err) return diff --git a/pkg/proxy/config/etcd_test.go b/pkg/proxy/config/etcd_test.go deleted file mode 100644 index 9b1dafbcccb..00000000000 --- a/pkg/proxy/config/etcd_test.go +++ /dev/null @@ -1,56 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package config - -import ( - "encoding/json" - "reflect" - "testing" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" -) - -const TomcatContainerEtcdKey = "/registry/services/tomcat/endpoints/tomcat-3bd5af34" -const TomcatService = "tomcat" -const TomcatContainerID = "tomcat-3bd5af34" - -func validateJSONParsing(t *testing.T, jsonString string, expectedEndpoints api.Endpoints, expectError bool) { - endpoints, err := parseEndpoints(jsonString) - if err == nil && expectError { - t.Errorf("validateJSONParsing did not get expected error when parsing %s", jsonString) - } - if err != nil && !expectError { - t.Errorf("validateJSONParsing got unexpected error %+v when parsing %s", err, jsonString) - } - if !reflect.DeepEqual(expectedEndpoints, endpoints) { - t.Errorf("Didn't get expected endpoints %+v got: %+v", expectedEndpoints, endpoints) - } -} - -func TestParseJsonEndpoints(t *testing.T) { - validateJSONParsing(t, "", api.Endpoints{}, true) - endpoints := api.Endpoints{ - Name: "foo", - Endpoints: []string{"foo", "bar", "baz"}, - } - data, err := json.Marshal(endpoints) - if err != nil { - t.Errorf("Unexpected error: %#v", err) - } - validateJSONParsing(t, string(data), endpoints, false) - // validateJSONParsing(t, "[{\"port\":8000,\"name\":\"mysql\",\"machine\":\"foo\"},{\"port\":9000,\"name\":\"mysql\",\"machine\":\"bar\"}]", []string{"foo:8000", "bar:9000"}, false) -} diff --git a/pkg/proxy/config/file.go b/pkg/proxy/config/file.go index 0fc91420f37..26465bc5f80 100644 --- a/pkg/proxy/config/file.go +++ b/pkg/proxy/config/file.go @@ -102,7 +102,7 @@ func (s ConfigSourceFile) Run() { newEndpoints := make([]api.Endpoints, len(config.Services)) for i, service := range config.Services { newServices[i] = api.Service{JSONBase: api.JSONBase{ID: service.Name}, Port: service.Port} - newEndpoints[i] = api.Endpoints{Name: service.Name, Endpoints: service.Endpoints} + newEndpoints[i] = api.Endpoints{JSONBase: api.JSONBase{ID: service.Name}, Endpoints: service.Endpoints} } if !reflect.DeepEqual(lastServices, newServices) { serviceUpdate := ServiceUpdate{Op: SET, Services: newServices} diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/proxier_test.go index 5893b381ca2..8489d8d358f 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/proxier_test.go @@ -52,7 +52,8 @@ func TestProxy(t *testing.T) { } lb := NewLoadBalancerRR() - lb.OnUpdate([]api.Endpoints{{"echo", []string{net.JoinHostPort("127.0.0.1", port)}}}) + lb.OnUpdate([]api.Endpoints{ + {JSONBase: api.JSONBase{ID: "echo"}, Endpoints: []string{net.JoinHostPort("127.0.0.1", port)}}}) p := NewProxier(lb) diff --git a/pkg/proxy/roundrobbin.go b/pkg/proxy/roundrobbin.go index 1f4c32794a4..b459318d104 100644 --- a/pkg/proxy/roundrobbin.go +++ b/pkg/proxy/roundrobbin.go @@ -89,15 +89,15 @@ func (impl LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { defer impl.lock.Unlock() // First update / add all new endpoints for services. for _, value := range endpoints { - existingEndpoints, exists := impl.endpointsMap[value.Name] + existingEndpoints, exists := impl.endpointsMap[value.ID] validEndpoints := impl.filterValidEndpoints(value.Endpoints) if !exists || !reflect.DeepEqual(existingEndpoints, validEndpoints) { - glog.Infof("LoadBalancerRR: Setting endpoints for %s to %+v", value.Name, value.Endpoints) - impl.endpointsMap[value.Name] = validEndpoints + glog.Infof("LoadBalancerRR: Setting endpoints for %s to %+v", value.ID, value.Endpoints) + impl.endpointsMap[value.ID] = validEndpoints // Start RR from the beginning if added or updated. - impl.rrIndex[value.Name] = 0 + impl.rrIndex[value.ID] = 0 } - tmp[value.Name] = true + tmp[value.ID] = true } // Then remove any endpoints no longer relevant for key, value := range impl.endpointsMap { diff --git a/pkg/proxy/roundrobbin_test.go b/pkg/proxy/roundrobbin_test.go index 1112141de80..55f0760bcbb 100644 --- a/pkg/proxy/roundrobbin_test.go +++ b/pkg/proxy/roundrobbin_test.go @@ -87,7 +87,10 @@ func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { t.Errorf("Didn't fail with non-existent service") } endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{Name: "foo", Endpoints: []string{"endpoint1:40"}} + endpoints[0] = api.Endpoints{ + JSONBase: api.JSONBase{ID: "foo"}, + Endpoints: []string{"endpoint1:40"}, + } loadBalancer.OnUpdate(endpoints) expectEndpoint(t, loadBalancer, "foo", "endpoint1:40") expectEndpoint(t, loadBalancer, "foo", "endpoint1:40") @@ -102,7 +105,10 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { t.Errorf("Didn't fail with non-existent service") } endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{Name: "foo", Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}} + endpoints[0] = api.Endpoints{ + JSONBase: api.JSONBase{ID: "foo"}, + Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, + } loadBalancer.OnUpdate(endpoints) expectEndpoint(t, loadBalancer, "foo", "endpoint:1") expectEndpoint(t, loadBalancer, "foo", "endpoint:2") @@ -117,7 +123,10 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { t.Errorf("Didn't fail with non-existent service") } endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{Name: "foo", Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}} + endpoints[0] = api.Endpoints{ + JSONBase: api.JSONBase{ID: "foo"}, + Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, + } loadBalancer.OnUpdate(endpoints) expectEndpoint(t, loadBalancer, "foo", "endpoint:1") expectEndpoint(t, loadBalancer, "foo", "endpoint:2") @@ -126,14 +135,16 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, "foo", "endpoint:2") // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again - endpoints[0] = api.Endpoints{Name: "foo", Endpoints: []string{"endpoint:8", "endpoint:9"}} + endpoints[0] = api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, + Endpoints: []string{"endpoint:8", "endpoint:9"}, + } loadBalancer.OnUpdate(endpoints) expectEndpoint(t, loadBalancer, "foo", "endpoint:8") expectEndpoint(t, loadBalancer, "foo", "endpoint:9") expectEndpoint(t, loadBalancer, "foo", "endpoint:8") expectEndpoint(t, loadBalancer, "foo", "endpoint:9") // Clear endpoints - endpoints[0] = api.Endpoints{Name: "foo", Endpoints: []string{}} + endpoints[0] = api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}} loadBalancer.OnUpdate(endpoints) endpoint, err = loadBalancer.LoadBalance("foo", nil) @@ -149,8 +160,14 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { t.Errorf("Didn't fail with non-existent service") } endpoints := make([]api.Endpoints, 2) - endpoints[0] = api.Endpoints{Name: "foo", Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}} - endpoints[1] = api.Endpoints{Name: "bar", Endpoints: []string{"endpoint:4", "endpoint:5"}} + endpoints[0] = api.Endpoints{ + JSONBase: api.JSONBase{ID: "foo"}, + Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"}, + } + endpoints[1] = api.Endpoints{ + JSONBase: api.JSONBase{ID: "bar"}, + Endpoints: []string{"endpoint:4", "endpoint:5"}, + } loadBalancer.OnUpdate(endpoints) expectEndpoint(t, loadBalancer, "foo", "endpoint:1") expectEndpoint(t, loadBalancer, "foo", "endpoint:2") diff --git a/pkg/registry/endpoints.go b/pkg/registry/endpoints.go index 3696685e74d..f7d34f89899 100644 --- a/pkg/registry/endpoints.go +++ b/pkg/registry/endpoints.go @@ -88,7 +88,7 @@ func (e *EndpointController) SyncServiceEndpoints() error { endpoints[ix] = net.JoinHostPort(pod.CurrentState.PodIP, strconv.Itoa(port)) } err = e.serviceRegistry.UpdateEndpoints(api.Endpoints{ - Name: service.ID, + JSONBase: api.JSONBase{ID: service.ID}, Endpoints: endpoints, }) if err != nil { diff --git a/pkg/registry/etcd_registry.go b/pkg/registry/etcd_registry.go index 626f9ba083e..868f8877e00 100644 --- a/pkg/registry/etcd_registry.go +++ b/pkg/registry/etcd_registry.go @@ -112,9 +112,10 @@ func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error { } contKey := makeContainerKey(machine) - err = registry.helper().AtomicUpdate(contKey, &[]api.ContainerManifest{}, func(in interface{}) (interface{}, error) { - manifests := *in.(*[]api.ContainerManifest) - return append(manifests, manifest), nil + err = registry.helper().AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { + manifests := *in.(*api.ContainerManifestList) + manifests.Items = append(manifests.Items, manifest) + return manifests, nil }) if err != nil { // Don't strand stuff. @@ -153,11 +154,11 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error // Next, remove the pod from the machine atomically. contKey := makeContainerKey(machine) - return registry.helper().AtomicUpdate(contKey, &[]api.ContainerManifest{}, func(in interface{}) (interface{}, error) { - manifests := *in.(*[]api.ContainerManifest) - newManifests := make([]api.ContainerManifest, 0, len(manifests)) + return registry.helper().AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { + manifests := in.(*api.ContainerManifestList) + newManifests := make([]api.ContainerManifest, 0, len(manifests.Items)) found := false - for _, manifest := range manifests { + for _, manifest := range manifests.Items { if manifest.ID != podID { newManifests = append(newManifests, manifest) } else { @@ -170,7 +171,8 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error // However it is "deleted" so log it and move on glog.Infof("Couldn't find: %s in %#v", podID, manifests) } - return newManifests, nil + manifests.Items = newManifests + return manifests, nil }) } @@ -297,5 +299,5 @@ func (registry *EtcdRegistry) UpdateService(svc api.Service) error { // UpdateEndpoints update Endpoints of a Service. func (registry *EtcdRegistry) UpdateEndpoints(e api.Endpoints) error { - return registry.helper().SetObj("/registry/services/endpoints/"+e.Name, e) + return registry.helper().SetObj("/registry/services/endpoints/"+e.ID, e) } diff --git a/pkg/registry/etcd_registry_test.go b/pkg/registry/etcd_registry_test.go index 5d0718a3754..5ac151c4677 100644 --- a/pkg/registry/etcd_registry_test.go +++ b/pkg/registry/etcd_registry_test.go @@ -17,7 +17,6 @@ limitations under the License. package registry import ( - "encoding/json" "reflect" "testing" @@ -70,7 +69,7 @@ func TestEtcdCreatePod(t *testing.T) { }, E: tools.EtcdErrorNotFound, } - fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{}), 0) + fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.CreatePod("machine", api.Pod{ JSONBase: api.JSONBase{ @@ -88,18 +87,20 @@ func TestEtcdCreatePod(t *testing.T) { }) expectNoError(t, err) resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false) - expectNoError(t, err) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } var pod api.Pod - err = json.Unmarshal([]byte(resp.Node.Value), &pod) + err = api.DecodeInto([]byte(resp.Node.Value), &pod) expectNoError(t, err) if pod.ID != "foo" { t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) } - var manifests []api.ContainerManifest + var manifests api.ContainerManifestList resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false) expectNoError(t, err) - err = json.Unmarshal([]byte(resp.Node.Value), &manifests) - if len(manifests) != 1 || manifests[0].ID != "foo" { + err = api.DecodeInto([]byte(resp.Node.Value), &manifests) + if len(manifests.Items) != 1 || manifests.Items[0].ID != "foo" { t.Errorf("Unexpected manifest list: %#v", manifests) } } @@ -189,18 +190,20 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { }) expectNoError(t, err) resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false) - expectNoError(t, err) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } var pod api.Pod - err = json.Unmarshal([]byte(resp.Node.Value), &pod) + err = api.DecodeInto([]byte(resp.Node.Value), &pod) expectNoError(t, err) if pod.ID != "foo" { t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) } - var manifests []api.ContainerManifest + var manifests api.ContainerManifestList resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false) expectNoError(t, err) - err = json.Unmarshal([]byte(resp.Node.Value), &manifests) - if len(manifests) != 1 || manifests[0].ID != "foo" { + err = api.DecodeInto([]byte(resp.Node.Value), &manifests) + if len(manifests.Items) != 1 || manifests.Items[0].ID != "foo" { t.Errorf("Unexpected manifest list: %#v", manifests) } } @@ -213,9 +216,9 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { }, E: tools.EtcdErrorNotFound, } - fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{ - { - ID: "bar", + fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(api.ContainerManifestList{ + Items: []api.ContainerManifest{ + {ID: "bar"}, }, }), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) @@ -236,18 +239,20 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { }) expectNoError(t, err) resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false) - expectNoError(t, err) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } var pod api.Pod - err = json.Unmarshal([]byte(resp.Node.Value), &pod) + err = api.DecodeInto([]byte(resp.Node.Value), &pod) expectNoError(t, err) if pod.ID != "foo" { t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) } - var manifests []api.ContainerManifest + var manifests api.ContainerManifestList resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false) expectNoError(t, err) - err = json.Unmarshal([]byte(resp.Node.Value), &manifests) - if len(manifests) != 2 || manifests[1].ID != "foo" { + err = api.DecodeInto([]byte(resp.Node.Value), &manifests) + if len(manifests.Items) != 2 || manifests.Items[1].ID != "foo" { t.Errorf("Unexpected manifest list: %#v", manifests) } } @@ -256,9 +261,9 @@ func TestEtcdDeletePod(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) key := "/registry/hosts/machine/pods/foo" fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) - fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{ - { - ID: "foo", + fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{ + Items: []api.ContainerManifest{ + {ID: "foo"}, }, }), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) @@ -269,8 +274,13 @@ func TestEtcdDeletePod(t *testing.T) { } else if fakeClient.DeletedKeys[0] != key { t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } - response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) - if response.Node.Value != "[]" { + response, err := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + var manifests api.ContainerManifestList + api.DecodeInto([]byte(response.Node.Value), &manifests) + if len(manifests.Items) != 0 { t.Errorf("Unexpected container set: %s, expected empty", response.Node.Value) } } @@ -279,9 +289,11 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) key := "/registry/hosts/machine/pods/foo" fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) - fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{ - {ID: "foo"}, - {ID: "bar"}, + fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{ + Items: []api.ContainerManifest{ + {ID: "foo"}, + {ID: "bar"}, + }, }), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.DeletePod("foo") @@ -292,13 +304,16 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) { if fakeClient.DeletedKeys[0] != key { t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } - response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) - var manifests []api.ContainerManifest - json.Unmarshal([]byte(response.Node.Value), &manifests) - if len(manifests) != 1 { - t.Errorf("Unexpected manifest set: %#v, expected empty", manifests) + response, err := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) + if err != nil { + t.Fatalf("Unexpected error %v", err) } - if manifests[0].ID != "bar" { + var manifests api.ContainerManifestList + api.DecodeInto([]byte(response.Node.Value), &manifests) + if len(manifests.Items) != 1 { + t.Fatalf("Unexpected manifest set: %#v, expected empty", manifests) + } + if manifests.Items[0].ID != "bar" { t.Errorf("Deleted wrong manifest: %#v", manifests) } } @@ -476,9 +491,11 @@ func TestEtcdCreateController(t *testing.T) { }) expectNoError(t, err) resp, err := fakeClient.Get("/registry/controllers/foo", false, false) - expectNoError(t, err) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } var ctrl api.ReplicationController - err = json.Unmarshal([]byte(resp.Node.Value), &ctrl) + err = api.DecodeInto([]byte(resp.Node.Value), &ctrl) expectNoError(t, err) if ctrl.ID != "foo" { t.Errorf("Unexpected pod: %#v %s", ctrl, resp.Node.Value) @@ -544,7 +561,7 @@ func TestEtcdCreateService(t *testing.T) { resp, err := fakeClient.Get("/registry/services/specs/foo", false, false) expectNoError(t, err) var service api.Service - err = json.Unmarshal([]byte(resp.Node.Value), &service) + err = api.DecodeInto([]byte(resp.Node.Value), &service) expectNoError(t, err) if service.ID != "foo" { t.Errorf("Unexpected service: %#v %s", service, resp.Node.Value) @@ -621,15 +638,17 @@ func TestEtcdUpdateEndpoints(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) endpoints := api.Endpoints{ - Name: "foo", + JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"baz", "bar"}, } err := registry.UpdateEndpoints(endpoints) expectNoError(t, err) response, err := fakeClient.Get("/registry/services/endpoints/foo", false, false) - expectNoError(t, err) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } var endpointsOut api.Endpoints - err = json.Unmarshal([]byte(response.Node.Value), &endpointsOut) + err = api.DecodeInto([]byte(response.Node.Value), &endpointsOut) if !reflect.DeepEqual(endpoints, endpointsOut) { t.Errorf("Unexpected endpoints: %#v, expected %#v", endpointsOut, endpoints) }