diff --git a/cmd/controller-manager/controller-manager.go b/cmd/controller-manager/controller-manager.go index d32992b1204..28f6303c7b4 100644 --- a/cmd/controller-manager/controller-manager.go +++ b/cmd/controller-manager/controller-manager.go @@ -47,7 +47,7 @@ func main() { } controllerManager := controller.NewReplicationManager( - client.New("http://"+*master, nil)) + client.New(*master, nil)) controllerManager.Run(10 * time.Second) select {} diff --git a/cmd/proxy/proxy.go b/cmd/proxy/proxy.go index 63a82fcf4f8..af16a8c17bf 100644 --- a/cmd/proxy/proxy.go +++ b/cmd/proxy/proxy.go @@ -18,7 +18,9 @@ package main import ( "flag" + "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy" "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -29,11 +31,12 @@ import ( var ( configFile = flag.String("configfile", "/tmp/proxy_config", "Configuration file for the proxy") + master = flag.String("master", "", "The address of the Kubernetes API server (optional)") etcdServerList util.StringList ) func init() { - flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated") + flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated (optional)") } func main() { @@ -43,24 +46,39 @@ func main() { verflag.PrintAndExitIfRequested() - // Set up logger for etcd client - etcd.SetLogger(util.NewLogger("etcd ")) - - glog.Infof("Using configuration file %s and etcd_servers %v", *configFile, etcdServerList) - serviceConfig := config.NewServiceConfig() endpointsConfig := config.NewEndpointsConfig() + // define api config source + if *master != "" { + glog.Infof("Using api calls to get config %v", *master) + //TODO: add auth info + client := client.New(*master, nil) + config.NewSourceAPI( + client, + 30*time.Second, + serviceConfig.Channel("api"), + endpointsConfig.Channel("api"), + ) + } + // Create a configuration source that handles configuration from etcd. - etcdClient := etcd.NewClient(etcdServerList) - config.NewConfigSourceEtcd(etcdClient, - serviceConfig.Channel("etcd"), - endpointsConfig.Channel("etcd")) + if len(etcdServerList) > 0 && *master == "" { + glog.Infof("Using etcd servers %v", etcdServerList) + + // Set up logger for etcd client + etcd.SetLogger(util.NewLogger("etcd ")) + etcdClient := etcd.NewClient(etcdServerList) + config.NewConfigSourceEtcd(etcdClient, + serviceConfig.Channel("etcd"), + endpointsConfig.Channel("etcd")) + } // And create a configuration source that reads from a local file config.NewConfigSourceFile(*configFile, serviceConfig.Channel("file"), endpointsConfig.Channel("file")) + glog.Infof("Using configuration file %s", *configFile) loadBalancer := proxy.NewLoadBalancerRR() proxier := proxy.NewProxier(loadBalancer) diff --git a/hack/local-up-cluster.sh b/hack/local-up-cluster.sh index b4a42741d0c..372263096fc 100755 --- a/hack/local-up-cluster.sh +++ b/hack/local-up-cluster.sh @@ -66,7 +66,7 @@ KUBELET_PID=$! PROXY_LOG=/tmp/kube-proxy.log ${GO_OUT}/proxy \ - --etcd_servers="http://127.0.0.1:4001" &> ${PROXY_LOG} & + --master="http://${API_HOST}:${API_PORT}" &> ${PROXY_LOG} & PROXY_PID=$! echo "Local Kubernetes cluster is running. Press Ctrl-C to shut it down." diff --git a/pkg/client/client.go b/pkg/client/client.go index d624442910a..7746cbcc4e7 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -23,6 +23,7 @@ import ( "io" "io/ioutil" "net/http" + "net/url" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -67,6 +68,9 @@ type ServiceInterface interface { CreateService(api.Service) (api.Service, error) UpdateService(api.Service) (api.Service, error) DeleteService(string) error + WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + + WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) } // VersionInterface has a method to retrieve the server version @@ -183,7 +187,12 @@ func (c *RESTClient) doRequest(request *http.Request) ([]byte, error) { // requestBody is the body of the request. Can be nil. // target the interface to marshal the JSON response into. Can be nil. func (c *RESTClient) rawRequest(method, path string, requestBody io.Reader, target interface{}) ([]byte, error) { - request, err := http.NewRequest(method, c.makeURL(path), requestBody) + reqUrl, err := c.makeURL(path) + if err != nil { + return nil, err + } + + request, err := http.NewRequest(method, reqUrl, requestBody) if err != nil { return nil, err } @@ -201,8 +210,24 @@ func (c *RESTClient) rawRequest(method, path string, requestBody io.Reader, targ return body, err } -func (c *RESTClient) makeURL(path string) string { - return c.host + c.Prefix + path +func (c *RESTClient) makeURL(path string) (string, error) { + base := c.host + hostURL, err := url.Parse(base) + if err != nil { + return "", err + } + if hostURL.Scheme == "" { + hostURL, err = url.Parse("http://" + base) + if err != nil { + return "", err + } + if hostURL.Path != "" && hostURL.Path != "/" { + return "", fmt.Errorf("host must be a URL or a host:port pair: %s", base) + } + } + hostURL.Path += c.Prefix + path + + return hostURL.String(), nil } // ListPods takes a selector, and returns the list of pods that match that selector @@ -309,6 +334,28 @@ func (c *Client) DeleteService(name string) error { return c.Delete().Path("services").Path(name).Do().Error() } +// WatchService returns a watch.Interface that watches the requested services. +func (c *Client) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return c.Get(). + Path("watch"). + Path("services"). + UintParam("resourceVersion", resourceVersion). + SelectorParam("labels", label). + SelectorParam("fields", field). + Watch() +} + +// WatchEndpoints returns a watch.Interface that watches the requested endpoints for a service. +func (c *Client) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return c.Get(). + Path("watch"). + Path("endpoints"). + UintParam("resourceVersion", resourceVersion). + SelectorParam("labels", label). + SelectorParam("fields", field). + Watch() +} + // ServerVersion retrieves and parses the server's version. func (c *Client) ServerVersion() (*version.Info, error) { body, err := c.Get().AbsPath("/version").Do().Raw() diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 21c2d440d20..269d0c02371 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -37,6 +37,33 @@ func makeURL(suffix string) string { return apiPath + suffix } +func TestValidatesHostParameter(t *testing.T) { + testCases := map[string]struct { + Value string + Err bool + }{ + "foo.bar.com": {"http://foo.bar.com/api/v1beta1/", false}, + "http://host/server": {"http://host/server/api/v1beta1/", false}, + "host/server": {"", true}, + } + for k, expected := range testCases { + c := RESTClient{host: k, Prefix: "/api/v1beta1/"} + actual, err := c.makeURL("") + switch { + case err == nil && expected.Err: + t.Errorf("expected error but was nil") + continue + case err != nil && !expected.Err: + t.Errorf("unexpected error %v", err) + continue + } + if expected.Value != actual { + t.Errorf("%s: expected %s, got %s", k, expected.Value, actual) + continue + } + } +} + func TestListEmptyPods(t *testing.T) { c := &testClient{ Request: testRequest{Method: "GET", Path: "/pods"}, diff --git a/pkg/client/fake.go b/pkg/client/fake.go index b03f599a1c7..dbe49d26ac8 100644 --- a/pkg/client/fake.go +++ b/pkg/client/fake.go @@ -35,6 +35,7 @@ type Fake struct { Actions []FakeAction Pods api.PodList Ctrl api.ReplicationController + Watch watch.Interface } func (c *Fake) ListPods(selector labels.Selector) (api.PodList, error) { @@ -88,8 +89,8 @@ func (c *Fake) DeleteReplicationController(controller string) error { } func (c *Fake) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - c.Actions = append(c.Actions, FakeAction{Action: "watch-controllers"}) - return watch.NewFake(), nil + c.Actions = append(c.Actions, FakeAction{Action: "watch-controllers", Value: resourceVersion}) + return c.Watch, nil } func (c *Fake) GetService(name string) (api.Service, error) { @@ -112,6 +113,16 @@ func (c *Fake) DeleteService(service string) error { return nil } +func (c *Fake) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + c.Actions = append(c.Actions, FakeAction{Action: "watch-services", Value: resourceVersion}) + return c.Watch, nil +} + +func (c *Fake) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + c.Actions = append(c.Actions, FakeAction{Action: "watch-endpoints", Value: resourceVersion}) + return c.Watch, nil +} + func (c *Fake) ServerVersion() (*version.Info, error) { c.Actions = append(c.Actions, FakeAction{Action: "get-version", Value: nil}) versionInfo := version.Get() diff --git a/pkg/labels/selector.go b/pkg/labels/selector.go index c35da49ac3a..79e7cac1a00 100644 --- a/pkg/labels/selector.go +++ b/pkg/labels/selector.go @@ -32,6 +32,12 @@ type Selector interface { // Empty returns true if this selector does not restrict the selection space. Empty() bool + // RequiresExactMatch allows a caller to introspect whether a given selector + // requires a single specific label to be set, and if so returns the value it + // requires. + // TODO: expand this to be more general + RequiresExactMatch(label string) (value string, found bool) + // String returns a human readable string that represents this selector. String() string } @@ -53,6 +59,13 @@ func (t *hasTerm) Empty() bool { return false } +func (t *hasTerm) RequiresExactMatch(label string) (value string, found bool) { + if t.label == label { + return t.value, true + } + return "", false +} + func (t *hasTerm) String() string { return fmt.Sprintf("%v=%v", t.label, t.value) } @@ -69,6 +82,10 @@ func (t *notHasTerm) Empty() bool { return false } +func (t *notHasTerm) RequiresExactMatch(label string) (value string, found bool) { + return "", false +} + func (t *notHasTerm) String() string { return fmt.Sprintf("%v!=%v", t.label, t.value) } @@ -99,6 +116,18 @@ func (t andTerm) Empty() bool { return true } +func (t andTerm) RequiresExactMatch(label string) (string, bool) { + if t == nil || len([]Selector(t)) == 0 { + return "", false + } + for i := range t { + if value, found := t[i].RequiresExactMatch(label); found { + return value, found + } + } + return "", false +} + func (t andTerm) String() string { var terms []string for _, q := range t { @@ -173,7 +202,7 @@ func SelectorFromSet(ls Set) Selector { return andTerm(items) } -// ParseSelector takes a string repsenting a selector and returns an +// ParseSelector takes a string representing a selector and returns an // object suitable for matching, or an error. func ParseSelector(selector string) (Selector, error) { parts := strings.Split(selector, ",") diff --git a/pkg/labels/selector_test.go b/pkg/labels/selector_test.go index 5539d71ea84..c151e400bf2 100644 --- a/pkg/labels/selector_test.go +++ b/pkg/labels/selector_test.go @@ -158,6 +158,9 @@ func TestSetIsEmpty(t *testing.T) { if (&hasTerm{}).Empty() { t.Errorf("hasTerm should not be empty") } + if (¬HasTerm{}).Empty() { + t.Errorf("notHasTerm should not be empty") + } if !(andTerm{andTerm{}}).Empty() { t.Errorf("Nested andTerm should be empty") } @@ -166,6 +169,36 @@ func TestSetIsEmpty(t *testing.T) { } } +func TestRequiresExactMatch(t *testing.T) { + testCases := map[string]struct { + S Selector + Label string + Value string + Found bool + }{ + "empty set": {Set{}.AsSelector(), "test", "", false}, + "nil andTerm": {andTerm(nil), "test", "", false}, + "empty hasTerm": {&hasTerm{}, "test", "", false}, + "skipped hasTerm": {&hasTerm{"a", "b"}, "test", "", false}, + "valid hasTerm": {&hasTerm{"test", "b"}, "test", "b", true}, + "valid hasTerm no value": {&hasTerm{"test", ""}, "test", "", true}, + "valid notHasTerm": {¬HasTerm{"test", "b"}, "test", "", false}, + "valid notHasTerm no value": {¬HasTerm{"test", ""}, "test", "", false}, + "nested andTerm": {andTerm{andTerm{}}, "test", "", false}, + "nested andTerm matches": {andTerm{&hasTerm{"test", "b"}}, "test", "b", true}, + "andTerm with non-match": {andTerm{&hasTerm{}, &hasTerm{"test", "b"}}, "test", "b", true}, + } + for k, v := range testCases { + value, found := v.S.RequiresExactMatch(v.Label) + if value != v.Value { + t.Errorf("%s: expected value %s, got %s", k, v.Value, value) + } + if found != v.Found { + t.Errorf("%s: expected found %s, got %s", k, v.Found, found) + } + } +} + func expectMatchRequirement(t *testing.T, req Requirement, ls Set) { if !req.Matches(ls) { t.Errorf("Wanted '%+v' to match '%s', but it did not.\n", req, ls) diff --git a/pkg/master/master.go b/pkg/master/master.go index 626291c6185..59e67c4867a 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -31,6 +31,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" + servicecontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" goetcd "github.com/coreos/go-etcd/etcd" @@ -54,6 +55,7 @@ type Master struct { podRegistry pod.Registry controllerRegistry controller.Registry serviceRegistry service.Registry + endpointRegistry endpoint.Registry minionRegistry minion.Registry bindingRegistry binding.Registry storage map[string]apiserver.RESTStorage @@ -68,6 +70,7 @@ func New(c *Config) *Master { podRegistry: etcd.NewRegistry(etcdClient, minionRegistry), controllerRegistry: etcd.NewRegistry(etcdClient, minionRegistry), serviceRegistry: etcd.NewRegistry(etcdClient, minionRegistry), + endpointRegistry: etcd.NewRegistry(etcdClient, minionRegistry), bindingRegistry: etcd.NewRegistry(etcdClient, minionRegistry), minionRegistry: minionRegistry, client: c.Client, @@ -106,7 +109,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf podCache := NewPodCache(podInfoGetter, m.podRegistry) go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30) - endpoints := endpoint.NewEndpointController(m.serviceRegistry, m.client) + endpoints := servicecontroller.NewEndpointController(m.serviceRegistry, m.client) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) m.storage = map[string]apiserver.RESTStorage{ @@ -118,6 +121,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf }), "replicationControllers": controller.NewRegistryStorage(m.controllerRegistry, m.podRegistry), "services": service.NewRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry), + "endpoints": endpoint.NewStorage(m.endpointRegistry), "minions": minion.NewRegistryStorage(m.minionRegistry), // TODO: should appear only in scheduler API group. diff --git a/pkg/proxy/config/api.go b/pkg/proxy/config/api.go new file mode 100644 index 00000000000..e9e5d625dcc --- /dev/null +++ b/pkg/proxy/config/api.go @@ -0,0 +1,145 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/golang/glog" +) + +// Watcher is the interface needed to receive changes to services and endpoints +type Watcher interface { + WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) +} + +// SourceAPI implements a configuration source for services and endpoints that +// uses the client watch API to efficiently detect changes. +type SourceAPI struct { + client Watcher + services chan<- ServiceUpdate + endpoints chan<- EndpointsUpdate + + waitDuration time.Duration + reconnectDuration time.Duration +} + +// NewSourceAPI creates a config source that watches for changes to the services and endpoints +func NewSourceAPI(client Watcher, period time.Duration, services chan<- ServiceUpdate, endpoints chan<- EndpointsUpdate) *SourceAPI { + config := &SourceAPI{ + client: client, + services: services, + endpoints: endpoints, + + waitDuration: period, + // prevent hot loops if the server starts to misbehave + reconnectDuration: time.Second * 1, + } + serviceVersion := uint64(0) + go util.Forever(func() { + config.runServices(&serviceVersion) + time.Sleep(wait.Jitter(config.reconnectDuration, 0.0)) + }, period) + endpointVersion := uint64(0) + go util.Forever(func() { + config.runEndpoints(&endpointVersion) + time.Sleep(wait.Jitter(config.reconnectDuration, 0.0)) + }, period) + return config +} + +// runServices loops forever looking for changes to services +func (s *SourceAPI) runServices(resourceVersion *uint64) { + watcher, err := s.client.WatchServices(labels.Everything(), labels.Everything(), *resourceVersion) + if err != nil { + glog.Errorf("Unable to watch for services changes: %v", err) + time.Sleep(wait.Jitter(s.waitDuration, 0.0)) + return + } + defer watcher.Stop() + + ch := watcher.ResultChan() + handleServicesWatch(resourceVersion, ch, s.services) +} + +// handleServicesWatch loops over an event channel and delivers config changes to an update channel +func handleServicesWatch(resourceVersion *uint64, ch <-chan watch.Event, updates chan<- ServiceUpdate) { + for { + select { + case event, ok := <-ch: + if !ok { + glog.V(2).Infof("WatchServices channel closed") + return + } + + service := event.Object.(*api.Service) + *resourceVersion = service.ResourceVersion + 1 + + switch event.Type { + case watch.Added, watch.Modified: + updates <- ServiceUpdate{Op: SET, Services: []api.Service{*service}} + + case watch.Deleted: + updates <- ServiceUpdate{Op: SET} + } + } + } +} + +// runEndpoints loops forever looking for changes to endpoints +func (s *SourceAPI) runEndpoints(resourceVersion *uint64) { + watcher, err := s.client.WatchEndpoints(labels.Everything(), labels.Everything(), *resourceVersion) + if err != nil { + glog.Errorf("Unable to watch for endpoints changes: %v", err) + time.Sleep(wait.Jitter(s.waitDuration, 0.0)) + return + } + defer watcher.Stop() + + ch := watcher.ResultChan() + handleEndpointsWatch(resourceVersion, ch, s.endpoints) +} + +// handleEndpointsWatch loops over an event channel and delivers config changes to an update channel +func handleEndpointsWatch(resourceVersion *uint64, ch <-chan watch.Event, updates chan<- EndpointsUpdate) { + for { + select { + case event, ok := <-ch: + if !ok { + glog.V(2).Infof("WatchEndpoints channel closed") + return + } + + endpoints := event.Object.(*api.Endpoints) + *resourceVersion = endpoints.ResourceVersion + 1 + + switch event.Type { + case watch.Added, watch.Modified: + updates <- EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints}} + + case watch.Deleted: + updates <- EndpointsUpdate{Op: SET} + } + } + } +} diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go new file mode 100644 index 00000000000..f08c6f0d2c3 --- /dev/null +++ b/pkg/proxy/config/api_test.go @@ -0,0 +1,116 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +func TestServices(t *testing.T) { + service := api.Service{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: uint64(2)}} + + fakeWatch := watch.NewFake() + fakeClient := &client.Fake{Watch: fakeWatch} + services := make(chan ServiceUpdate) + source := SourceAPI{client: fakeClient, services: services} + resourceVersion := uint64(0) + go func() { + // called twice + source.runServices(&resourceVersion) + source.runServices(&resourceVersion) + }() + + // test adding a service to the watch + fakeWatch.Add(&service) + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(0)}}) { + t.Errorf("expected call to watch-services, got %#v", fakeClient) + } + + actual := <-services + expected := ServiceUpdate{Op: SET, Services: []api.Service{service}} + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %#v, got %#v", expected, actual) + } + + // verify that a delete results in a config change + fakeWatch.Delete(&service) + actual = <-services + expected = ServiceUpdate{Op: SET} + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %#v, got %#v", expected, actual) + } + + // verify that closing the channel results in a new call to WatchServices with a higher resource version + newFakeWatch := watch.NewFake() + fakeClient.Watch = newFakeWatch + fakeWatch.Stop() + + newFakeWatch.Add(&service) + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(0)}, {"watch-services", uint64(3)}}) { + t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) + } +} + +func TestEndpoints(t *testing.T) { + endpoint := api.Endpoints{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: uint64(2)}, Endpoints: []string{"127.0.0.1:9000"}} + + fakeWatch := watch.NewFake() + fakeClient := &client.Fake{Watch: fakeWatch} + endpoints := make(chan EndpointsUpdate) + source := SourceAPI{client: fakeClient, endpoints: endpoints} + resourceVersion := uint64(0) + go func() { + // called twice + source.runEndpoints(&resourceVersion) + source.runEndpoints(&resourceVersion) + }() + + // test adding an endpoint to the watch + fakeWatch.Add(&endpoint) + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(0)}}) { + t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) + } + + actual := <-endpoints + expected := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{endpoint}} + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %#v, got %#v", expected, actual) + } + + // verify that a delete results in a config change + fakeWatch.Delete(&endpoint) + actual = <-endpoints + expected = EndpointsUpdate{Op: SET} + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %#v, got %#v", expected, actual) + } + + // verify that closing the channel results in a new call to WatchEndpoints with a higher resource version + newFakeWatch := watch.NewFake() + fakeClient.Watch = newFakeWatch + fakeWatch.Stop() + + newFakeWatch.Add(&endpoint) + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(0)}, {"watch-endpoints", uint64(3)}}) { + t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) + } +} diff --git a/pkg/registry/endpoint/registry.go b/pkg/registry/endpoint/registry.go new file mode 100644 index 00000000000..5384563a48f --- /dev/null +++ b/pkg/registry/endpoint/registry.go @@ -0,0 +1,30 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoint + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// Registry is an interface for things that know how to store endpoints. +type Registry interface { + GetEndpoints(name string) (*api.Endpoints, error) + WatchEndpoints(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) + UpdateEndpoints(e api.Endpoints) error +} diff --git a/pkg/registry/endpoint/storage.go b/pkg/registry/endpoint/storage.go new file mode 100644 index 00000000000..eb8956056c0 --- /dev/null +++ b/pkg/registry/endpoint/storage.go @@ -0,0 +1,74 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoint + +import ( + "errors" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// Storage adapts endpoints into apiserver's RESTStorage model. +type Storage struct { + registry Registry +} + +// NewStorage returns a new Storage implementation for endpoints +func NewStorage(registry Registry) apiserver.RESTStorage { + return &Storage{ + registry: registry, + } +} + +// Get satisfies the RESTStorage interface but is unimplemented +func (rs *Storage) Get(id string) (interface{}, error) { + return rs.registry.GetEndpoints(id) +} + +// List satisfies the RESTStorage interface but is unimplemented +func (rs *Storage) List(selector labels.Selector) (interface{}, error) { + return nil, errors.New("unimplemented") +} + +// Watch returns Endpoint events via a watch.Interface. +// It implements apiserver.ResourceWatcher. +func (rs *Storage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return rs.registry.WatchEndpoints(label, field, resourceVersion) +} + +// Create satisfies the RESTStorage interface but is unimplemented +func (rs *Storage) Create(obj interface{}) (<-chan interface{}, error) { + return nil, errors.New("unimplemented") +} + +// Update satisfies the RESTStorage interface but is unimplemented +func (rs *Storage) Update(obj interface{}) (<-chan interface{}, error) { + return nil, errors.New("unimplemented") +} + +// Delete satisfies the RESTStorage interface but is unimplemented +func (rs *Storage) Delete(id string) (<-chan interface{}, error) { + return nil, errors.New("unimplemented") +} + +// New implements the RESTStorage interface +func (rs Storage) New() interface{} { + return &api.Endpoints{} +} diff --git a/pkg/registry/endpoint/storage_test.go b/pkg/registry/endpoint/storage_test.go new file mode 100644 index 00000000000..105c26f3008 --- /dev/null +++ b/pkg/registry/endpoint/storage_test.go @@ -0,0 +1,69 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoint + +import ( + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" +) + +func TestGetEndpoints(t *testing.T) { + registry := ®istrytest.ServiceRegistry{ + Endpoints: api.Endpoints{ + JSONBase: api.JSONBase{ID: "foo"}, + Endpoints: []string{"127.0.0.1:9000"}, + }, + } + storage := NewStorage(registry) + obj, err := storage.Get("foo") + if err != nil { + t.Fatalf("unexpected error: %#v", err) + } + if !reflect.DeepEqual([]string{"127.0.0.1:9000"}, obj.(*api.Endpoints).Endpoints) { + t.Errorf("unexpected endpoints: %#v", obj) + } +} + +func TestGetEndpointsMissingService(t *testing.T) { + registry := ®istrytest.ServiceRegistry{ + Err: apiserver.NewNotFoundErr("service", "foo"), + } + storage := NewStorage(registry) + + // returns service not found + _, err := storage.Get("foo") + if !apiserver.IsNotFound(err) || !reflect.DeepEqual(err, apiserver.NewNotFoundErr("service", "foo")) { + t.Errorf("expected NotFound error, got %#v", err) + } + + // returns empty endpoints + registry.Err = nil + registry.Service = &api.Service{ + JSONBase: api.JSONBase{ID: "foo"}, + } + obj, err := storage.Get("foo") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if obj.(*api.Endpoints).Endpoints != nil { + t.Errorf("unexpected endpoints: %#v", obj) + } +} diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 41407b65c3b..0c08c3b586b 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -340,10 +340,54 @@ func (r *Registry) UpdateService(svc api.Service) error { return r.SetObj(makeServiceKey(svc.ID), svc) } +// WatchServices begins watching for new, changed, or deleted service configurations. +func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + if !label.Empty() { + return nil, fmt.Errorf("label selectors are not supported on services") + } + if value, found := field.RequiresExactMatch("ID"); found { + return r.Watch(makeServiceKey(value), resourceVersion) + } + if field.Empty() { + return r.WatchList("/registry/services/specs", resourceVersion, tools.Everything) + } + return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported") +} + +// GetEndpoints obtains endpoints specified by a service name +func (r *Registry) GetEndpoints(name string) (*api.Endpoints, error) { + obj := &api.Endpoints{} + if err := r.ExtractObj(makeServiceEndpointsKey(name), obj, false); err != nil { + if tools.IsEtcdNotFound(err) { + if _, err := r.GetService(name); err != nil && apiserver.IsNotFound(err) { + return nil, apiserver.NewNotFoundErr("service", name) + } + return obj, nil + } + return nil, err + } + return obj, nil +} + // UpdateEndpoints update Endpoints of a Service. func (r *Registry) UpdateEndpoints(e api.Endpoints) error { return r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{}, - func(interface{}) (interface{}, error) { + func(input interface{}) (interface{}, error) { + // TODO: racy - label query is returning different results for two simultaneous updaters return e, nil }) } + +// WatchEndpoints begins watching for new, changed, or deleted endpoint configurations. +func (r *Registry) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + if !label.Empty() { + return nil, fmt.Errorf("label selectors are not supported on endpoints") + } + if value, found := field.RequiresExactMatch("ID"); found { + return r.Watch(makeServiceEndpointsKey(value), resourceVersion) + } + if field.Empty() { + return r.WatchList("/registry/services/endpoints", resourceVersion, tools.Everything) + } + return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported") +} diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 1bf44bd501e..e7d574962a1 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -730,7 +730,7 @@ func TestEtcdGetService(t *testing.T) { } if service.ID != "foo" { - t.Errorf("Unexpected pod: %#v", service) + t.Errorf("Unexpected service: %#v", service) } } @@ -803,6 +803,23 @@ func TestEtcdUpdateService(t *testing.T) { } } +func TestEtcdGetEndpoints(t *testing.T) { + fakeClient := tools.NewFakeEtcdClient(t) + fakeClient.Set("/registry/services/endpoints/foo", api.EncodeOrDie(api.Endpoints{ + JSONBase: api.JSONBase{ID: "foo"}, + Endpoints: []string{"127.0.0.1:34855"}, + }), 0) + registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) + endpoints, err := registry.GetEndpoints("foo") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if endpoints.ID != "foo" || !reflect.DeepEqual(endpoints.Endpoints, []string{"127.0.0.1:34855"}) { + t.Errorf("Unexpected endpoints: %#v", endpoints) + } +} + func TestEtcdUpdateEndpoints(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true @@ -830,6 +847,104 @@ func TestEtcdUpdateEndpoints(t *testing.T) { } } +func TestEtcdWatchServices(t *testing.T) { + fakeClient := tools.NewFakeEtcdClient(t) + registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) + watching, err := registry.WatchServices( + labels.Everything(), + labels.SelectorFromSet(labels.Set{"ID": "foo"}), + 1, + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + select { + case _, ok := <-watching.ResultChan(): + if !ok { + t.Errorf("watching channel should be open") + } + default: + } + fakeClient.WatchInjectError <- nil + if _, ok := <-watching.ResultChan(); ok { + t.Errorf("watching channel should be closed") + } + watching.Stop() +} + +func TestEtcdWatchServicesBadSelector(t *testing.T) { + fakeClient := tools.NewFakeEtcdClient(t) + registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) + _, err := registry.WatchServices( + labels.Everything(), + labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}), + 0, + ) + if err == nil { + t.Errorf("unexpected non-error: %v", err) + } + + _, err = registry.WatchServices( + labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}), + labels.Everything(), + 0, + ) + if err == nil { + t.Errorf("unexpected non-error: %v", err) + } +} + +func TestEtcdWatchEndpoints(t *testing.T) { + fakeClient := tools.NewFakeEtcdClient(t) + registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) + watching, err := registry.WatchEndpoints( + labels.Everything(), + labels.SelectorFromSet(labels.Set{"ID": "foo"}), + 1, + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + select { + case _, ok := <-watching.ResultChan(): + if !ok { + t.Errorf("watching channel should be open") + } + default: + } + fakeClient.WatchInjectError <- nil + if _, ok := <-watching.ResultChan(); ok { + t.Errorf("watching channel should be closed") + } + watching.Stop() +} + +func TestEtcdWatchEndpointsBadSelector(t *testing.T) { + fakeClient := tools.NewFakeEtcdClient(t) + registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) + _, err := registry.WatchEndpoints( + labels.Everything(), + labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}), + 0, + ) + if err == nil { + t.Errorf("unexpected non-error: %v", err) + } + + _, err = registry.WatchEndpoints( + labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}), + labels.Everything(), + 0, + ) + if err == nil { + t.Errorf("unexpected non-error: %v", err) + } +} + // TODO We need a test for the compare and swap behavior. This basically requires two things: // 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that // channel, this will enable us to orchestrate the flow of etcd requests in the test. diff --git a/pkg/registry/registrytest/service.go b/pkg/registry/registrytest/service.go index 83a3ec8923d..33535077285 100644 --- a/pkg/registry/registrytest/service.go +++ b/pkg/registry/registrytest/service.go @@ -18,6 +18,8 @@ package registrytest import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) func NewServiceRegistry() *ServiceRegistry { @@ -60,7 +62,19 @@ func (r *ServiceRegistry) UpdateService(svc api.Service) error { return r.Err } +func (r *ServiceRegistry) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return nil, r.Err +} + +func (r *ServiceRegistry) GetEndpoints(name string) (*api.Endpoints, error) { + return &r.Endpoints, r.Err +} + func (r *ServiceRegistry) UpdateEndpoints(e api.Endpoints) error { r.Endpoints = e return r.Err } + +func (r *ServiceRegistry) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return nil, r.Err +} diff --git a/pkg/registry/service/registry.go b/pkg/registry/service/registry.go index 676ad46ab78..bcbfa461657 100644 --- a/pkg/registry/service/registry.go +++ b/pkg/registry/service/registry.go @@ -18,6 +18,9 @@ package service import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // Registry is an interface for things that know how to store services. @@ -27,5 +30,9 @@ type Registry interface { GetService(name string) (*api.Service, error) DeleteService(name string) error UpdateService(svc api.Service) error - UpdateEndpoints(e api.Endpoints) error + WatchServices(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) + + // TODO: endpoints and their implementation should be separated, setting endpoints should be + // supported via the API, and the endpoints-controller should use the API to update endpoints. + endpoint.Registry } diff --git a/pkg/registry/service/storage.go b/pkg/registry/service/storage.go index b73a17f7312..494e0abc2eb 100644 --- a/pkg/registry/service/storage.go +++ b/pkg/registry/service/storage.go @@ -27,6 +27,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // RegistryStorage adapts a service registry into apiserver's RESTStorage model. @@ -123,6 +124,12 @@ func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { return list, err } +// Watch returns Services events via a watch.Interface. +// It implements apiserver.ResourceWatcher. +func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return rs.registry.WatchServices(label, field, resourceVersion) +} + func (rs RegistryStorage) New() interface{} { return &api.Service{} } diff --git a/pkg/registry/endpoint/endpoints.go b/pkg/service/endpoints_controller.go similarity index 99% rename from pkg/registry/endpoint/endpoints.go rename to pkg/service/endpoints_controller.go index 1eaa5e32c37..5c0924555b0 100644 --- a/pkg/registry/endpoint/endpoints.go +++ b/pkg/service/endpoints_controller.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package endpoint +package service import ( "fmt" diff --git a/pkg/registry/endpoint/endpoints_test.go b/pkg/service/endpoints_controller_test.go similarity index 99% rename from pkg/registry/endpoint/endpoints_test.go rename to pkg/service/endpoints_controller_test.go index 58ba3913323..06f46b70573 100644 --- a/pkg/registry/endpoint/endpoints_test.go +++ b/pkg/service/endpoints_controller_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package endpoint +package service import ( "encoding/json" diff --git a/pkg/tools/etcd_tools_watch_test.go b/pkg/tools/etcd_tools_watch_test.go index b5639445bbd..0d0378cae4d 100644 --- a/pkg/tools/etcd_tools_watch_test.go +++ b/pkg/tools/etcd_tools_watch_test.go @@ -241,6 +241,122 @@ func TestWatch(t *testing.T) { } } +func TestWatchEtcdState(t *testing.T) { + type T struct { + Type watch.EventType + Endpoints []string + } + testCases := map[string]struct { + Initial map[string]EtcdResponseWithError + Responses []*etcd.Response + From uint64 + Expected []*T + }{ + "from not found": { + Initial: map[string]EtcdResponseWithError{}, + Responses: []*etcd.Response{ + { + Action: "create", + Node: &etcd.Node{ + Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})), + }, + }, + }, + From: 1, + Expected: []*T{ + {watch.Added, nil}, + }, + }, + "from version 1": { + Responses: []*etcd.Response{ + { + Action: "compareAndSwap", + Node: &etcd.Node{ + Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})), + CreatedIndex: 1, + ModifiedIndex: 2, + }, + PrevNode: &etcd.Node{ + Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})), + CreatedIndex: 1, + ModifiedIndex: 1, + }, + }, + }, + From: 1, + Expected: []*T{ + {watch.Modified, []string{"127.0.0.1:9000"}}, + }, + }, + "from initial state": { + Initial: map[string]EtcdResponseWithError{ + "/somekey/foo": { + R: &etcd.Response{ + Action: "get", + Node: &etcd.Node{ + Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})), + CreatedIndex: 1, + ModifiedIndex: 1, + }, + EtcdIndex: 1, + }, + }, + }, + Responses: []*etcd.Response{ + nil, + { + Action: "compareAndSwap", + Node: &etcd.Node{ + Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})), + CreatedIndex: 1, + ModifiedIndex: 2, + }, + PrevNode: &etcd.Node{ + Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})), + CreatedIndex: 1, + ModifiedIndex: 1, + }, + }, + }, + Expected: []*T{ + {watch.Added, nil}, + {watch.Modified, []string{"127.0.0.1:9000"}}, + }, + }, + } + + for k, testCase := range testCases { + fakeClient := NewFakeEtcdClient(t) + for key, value := range testCase.Initial { + fakeClient.Data[key] = value + } + h := EtcdHelper{fakeClient, codec, versioner} + watching, err := h.Watch("/somekey/foo", testCase.From) + if err != nil { + t.Errorf("%s: unexpected error: %v", k, err) + continue + } + fakeClient.WaitForWatchCompletion() + + t.Logf("Testing %v", k) + for i := range testCase.Responses { + if testCase.Responses[i] != nil { + fakeClient.WatchResponse <- testCase.Responses[i] + } + event := <-watching.ResultChan() + if e, a := testCase.Expected[i].Type, event.Type; e != a { + t.Errorf("%s: expected type %v, got %v", k, e, a) + break + } + if e, a := testCase.Expected[i].Endpoints, event.Object.(*api.Endpoints).Endpoints; !reflect.DeepEqual(e, a) { + t.Errorf("%s: expected type %v, got %v", k, e, a) + break + } + } + watching.Stop() + } +} + func TestWatchFromZeroIndex(t *testing.T) { pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} diff --git a/pkg/util/wait/wait.go b/pkg/util/wait/wait.go index d4c47142375..e22c6461dec 100644 --- a/pkg/util/wait/wait.go +++ b/pkg/util/wait/wait.go @@ -18,9 +18,21 @@ package wait import ( "errors" + "math/rand" "time" ) +// Jitter returns a time.Duration between duration and duration + maxFactor * duration, +// to allow clients to avoid converging on periodic behavior. If maxFactor is 0.0, a +// suggested default value will be chosen. +func Jitter(duration time.Duration, maxFactor float64) time.Duration { + if maxFactor <= 0.0 { + maxFactor = 1.0 + } + wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration)) + return wait +} + // ErrWaitTimeout is returned when the condition exited without success var ErrWaitTimeout = errors.New("timed out waiting for the condition")