Merge pull request #899 from smarterclayton/watch_services_and_endpoints

Allow Kube proxy to watch for service/endpoint changes
This commit is contained in:
Daniel Smith 2014-08-27 13:11:39 -07:00
commit e169da2abf
23 changed files with 942 additions and 24 deletions

View File

@ -47,7 +47,7 @@ func main() {
}
controllerManager := controller.NewReplicationManager(
client.New("http://"+*master, nil))
client.New(*master, nil))
controllerManager.Run(10 * time.Second)
select {}

View File

@ -18,7 +18,9 @@ package main
import (
"flag"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/proxy"
"github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -29,11 +31,12 @@ import (
var (
configFile = flag.String("configfile", "/tmp/proxy_config", "Configuration file for the proxy")
master = flag.String("master", "", "The address of the Kubernetes API server (optional)")
etcdServerList util.StringList
)
func init() {
flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated")
flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated (optional)")
}
func main() {
@ -43,24 +46,39 @@ func main() {
verflag.PrintAndExitIfRequested()
// Set up logger for etcd client
etcd.SetLogger(util.NewLogger("etcd "))
glog.Infof("Using configuration file %s and etcd_servers %v", *configFile, etcdServerList)
serviceConfig := config.NewServiceConfig()
endpointsConfig := config.NewEndpointsConfig()
// define api config source
if *master != "" {
glog.Infof("Using api calls to get config %v", *master)
//TODO: add auth info
client := client.New(*master, nil)
config.NewSourceAPI(
client,
30*time.Second,
serviceConfig.Channel("api"),
endpointsConfig.Channel("api"),
)
}
// Create a configuration source that handles configuration from etcd.
if len(etcdServerList) > 0 && *master == "" {
glog.Infof("Using etcd servers %v", etcdServerList)
// Set up logger for etcd client
etcd.SetLogger(util.NewLogger("etcd "))
etcdClient := etcd.NewClient(etcdServerList)
config.NewConfigSourceEtcd(etcdClient,
serviceConfig.Channel("etcd"),
endpointsConfig.Channel("etcd"))
}
// And create a configuration source that reads from a local file
config.NewConfigSourceFile(*configFile,
serviceConfig.Channel("file"),
endpointsConfig.Channel("file"))
glog.Infof("Using configuration file %s", *configFile)
loadBalancer := proxy.NewLoadBalancerRR()
proxier := proxy.NewProxier(loadBalancer)

View File

@ -66,7 +66,7 @@ KUBELET_PID=$!
PROXY_LOG=/tmp/kube-proxy.log
${GO_OUT}/proxy \
--etcd_servers="http://127.0.0.1:4001" &> ${PROXY_LOG} &
--master="http://${API_HOST}:${API_PORT}" &> ${PROXY_LOG} &
PROXY_PID=$!
echo "Local Kubernetes cluster is running. Press Ctrl-C to shut it down."

View File

@ -23,6 +23,7 @@ import (
"io"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -67,6 +68,9 @@ type ServiceInterface interface {
CreateService(api.Service) (api.Service, error)
UpdateService(api.Service) (api.Service, error)
DeleteService(string) error
WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
}
// VersionInterface has a method to retrieve the server version
@ -183,7 +187,12 @@ func (c *RESTClient) doRequest(request *http.Request) ([]byte, error) {
// requestBody is the body of the request. Can be nil.
// target the interface to marshal the JSON response into. Can be nil.
func (c *RESTClient) rawRequest(method, path string, requestBody io.Reader, target interface{}) ([]byte, error) {
request, err := http.NewRequest(method, c.makeURL(path), requestBody)
reqUrl, err := c.makeURL(path)
if err != nil {
return nil, err
}
request, err := http.NewRequest(method, reqUrl, requestBody)
if err != nil {
return nil, err
}
@ -201,8 +210,24 @@ func (c *RESTClient) rawRequest(method, path string, requestBody io.Reader, targ
return body, err
}
func (c *RESTClient) makeURL(path string) string {
return c.host + c.Prefix + path
func (c *RESTClient) makeURL(path string) (string, error) {
base := c.host
hostURL, err := url.Parse(base)
if err != nil {
return "", err
}
if hostURL.Scheme == "" {
hostURL, err = url.Parse("http://" + base)
if err != nil {
return "", err
}
if hostURL.Path != "" && hostURL.Path != "/" {
return "", fmt.Errorf("host must be a URL or a host:port pair: %s", base)
}
}
hostURL.Path += c.Prefix + path
return hostURL.String(), nil
}
// ListPods takes a selector, and returns the list of pods that match that selector
@ -309,6 +334,28 @@ func (c *Client) DeleteService(name string) error {
return c.Delete().Path("services").Path(name).Do().Error()
}
// WatchService returns a watch.Interface that watches the requested services.
func (c *Client) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return c.Get().
Path("watch").
Path("services").
UintParam("resourceVersion", resourceVersion).
SelectorParam("labels", label).
SelectorParam("fields", field).
Watch()
}
// WatchEndpoints returns a watch.Interface that watches the requested endpoints for a service.
func (c *Client) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return c.Get().
Path("watch").
Path("endpoints").
UintParam("resourceVersion", resourceVersion).
SelectorParam("labels", label).
SelectorParam("fields", field).
Watch()
}
// ServerVersion retrieves and parses the server's version.
func (c *Client) ServerVersion() (*version.Info, error) {
body, err := c.Get().AbsPath("/version").Do().Raw()

View File

@ -37,6 +37,33 @@ func makeURL(suffix string) string {
return apiPath + suffix
}
func TestValidatesHostParameter(t *testing.T) {
testCases := map[string]struct {
Value string
Err bool
}{
"foo.bar.com": {"http://foo.bar.com/api/v1beta1/", false},
"http://host/server": {"http://host/server/api/v1beta1/", false},
"host/server": {"", true},
}
for k, expected := range testCases {
c := RESTClient{host: k, Prefix: "/api/v1beta1/"}
actual, err := c.makeURL("")
switch {
case err == nil && expected.Err:
t.Errorf("expected error but was nil")
continue
case err != nil && !expected.Err:
t.Errorf("unexpected error %v", err)
continue
}
if expected.Value != actual {
t.Errorf("%s: expected %s, got %s", k, expected.Value, actual)
continue
}
}
}
func TestListEmptyPods(t *testing.T) {
c := &testClient{
Request: testRequest{Method: "GET", Path: "/pods"},

View File

@ -35,6 +35,7 @@ type Fake struct {
Actions []FakeAction
Pods api.PodList
Ctrl api.ReplicationController
Watch watch.Interface
}
func (c *Fake) ListPods(selector labels.Selector) (api.PodList, error) {
@ -88,8 +89,8 @@ func (c *Fake) DeleteReplicationController(controller string) error {
}
func (c *Fake) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
c.Actions = append(c.Actions, FakeAction{Action: "watch-controllers"})
return watch.NewFake(), nil
c.Actions = append(c.Actions, FakeAction{Action: "watch-controllers", Value: resourceVersion})
return c.Watch, nil
}
func (c *Fake) GetService(name string) (api.Service, error) {
@ -112,6 +113,16 @@ func (c *Fake) DeleteService(service string) error {
return nil
}
func (c *Fake) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
c.Actions = append(c.Actions, FakeAction{Action: "watch-services", Value: resourceVersion})
return c.Watch, nil
}
func (c *Fake) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
c.Actions = append(c.Actions, FakeAction{Action: "watch-endpoints", Value: resourceVersion})
return c.Watch, nil
}
func (c *Fake) ServerVersion() (*version.Info, error) {
c.Actions = append(c.Actions, FakeAction{Action: "get-version", Value: nil})
versionInfo := version.Get()

View File

@ -32,6 +32,12 @@ type Selector interface {
// Empty returns true if this selector does not restrict the selection space.
Empty() bool
// RequiresExactMatch allows a caller to introspect whether a given selector
// requires a single specific label to be set, and if so returns the value it
// requires.
// TODO: expand this to be more general
RequiresExactMatch(label string) (value string, found bool)
// String returns a human readable string that represents this selector.
String() string
}
@ -53,6 +59,13 @@ func (t *hasTerm) Empty() bool {
return false
}
func (t *hasTerm) RequiresExactMatch(label string) (value string, found bool) {
if t.label == label {
return t.value, true
}
return "", false
}
func (t *hasTerm) String() string {
return fmt.Sprintf("%v=%v", t.label, t.value)
}
@ -69,6 +82,10 @@ func (t *notHasTerm) Empty() bool {
return false
}
func (t *notHasTerm) RequiresExactMatch(label string) (value string, found bool) {
return "", false
}
func (t *notHasTerm) String() string {
return fmt.Sprintf("%v!=%v", t.label, t.value)
}
@ -99,6 +116,18 @@ func (t andTerm) Empty() bool {
return true
}
func (t andTerm) RequiresExactMatch(label string) (string, bool) {
if t == nil || len([]Selector(t)) == 0 {
return "", false
}
for i := range t {
if value, found := t[i].RequiresExactMatch(label); found {
return value, found
}
}
return "", false
}
func (t andTerm) String() string {
var terms []string
for _, q := range t {
@ -173,7 +202,7 @@ func SelectorFromSet(ls Set) Selector {
return andTerm(items)
}
// ParseSelector takes a string repsenting a selector and returns an
// ParseSelector takes a string representing a selector and returns an
// object suitable for matching, or an error.
func ParseSelector(selector string) (Selector, error) {
parts := strings.Split(selector, ",")

View File

@ -158,6 +158,9 @@ func TestSetIsEmpty(t *testing.T) {
if (&hasTerm{}).Empty() {
t.Errorf("hasTerm should not be empty")
}
if (&notHasTerm{}).Empty() {
t.Errorf("notHasTerm should not be empty")
}
if !(andTerm{andTerm{}}).Empty() {
t.Errorf("Nested andTerm should be empty")
}
@ -166,6 +169,36 @@ func TestSetIsEmpty(t *testing.T) {
}
}
func TestRequiresExactMatch(t *testing.T) {
testCases := map[string]struct {
S Selector
Label string
Value string
Found bool
}{
"empty set": {Set{}.AsSelector(), "test", "", false},
"nil andTerm": {andTerm(nil), "test", "", false},
"empty hasTerm": {&hasTerm{}, "test", "", false},
"skipped hasTerm": {&hasTerm{"a", "b"}, "test", "", false},
"valid hasTerm": {&hasTerm{"test", "b"}, "test", "b", true},
"valid hasTerm no value": {&hasTerm{"test", ""}, "test", "", true},
"valid notHasTerm": {&notHasTerm{"test", "b"}, "test", "", false},
"valid notHasTerm no value": {&notHasTerm{"test", ""}, "test", "", false},
"nested andTerm": {andTerm{andTerm{}}, "test", "", false},
"nested andTerm matches": {andTerm{&hasTerm{"test", "b"}}, "test", "b", true},
"andTerm with non-match": {andTerm{&hasTerm{}, &hasTerm{"test", "b"}}, "test", "b", true},
}
for k, v := range testCases {
value, found := v.S.RequiresExactMatch(v.Label)
if value != v.Value {
t.Errorf("%s: expected value %s, got %s", k, v.Value, value)
}
if found != v.Found {
t.Errorf("%s: expected found %s, got %s", k, v.Found, found)
}
}
}
func expectMatchRequirement(t *testing.T, req Requirement, ls Set) {
if !req.Matches(ls) {
t.Errorf("Wanted '%+v' to match '%s', but it did not.\n", req, ls)

View File

@ -31,6 +31,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
servicecontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
goetcd "github.com/coreos/go-etcd/etcd"
@ -54,6 +55,7 @@ type Master struct {
podRegistry pod.Registry
controllerRegistry controller.Registry
serviceRegistry service.Registry
endpointRegistry endpoint.Registry
minionRegistry minion.Registry
bindingRegistry binding.Registry
storage map[string]apiserver.RESTStorage
@ -68,6 +70,7 @@ func New(c *Config) *Master {
podRegistry: etcd.NewRegistry(etcdClient, minionRegistry),
controllerRegistry: etcd.NewRegistry(etcdClient, minionRegistry),
serviceRegistry: etcd.NewRegistry(etcdClient, minionRegistry),
endpointRegistry: etcd.NewRegistry(etcdClient, minionRegistry),
bindingRegistry: etcd.NewRegistry(etcdClient, minionRegistry),
minionRegistry: minionRegistry,
client: c.Client,
@ -106,7 +109,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf
podCache := NewPodCache(podInfoGetter, m.podRegistry)
go util.Forever(func() { podCache.UpdateAllContainers() }, time.Second*30)
endpoints := endpoint.NewEndpointController(m.serviceRegistry, m.client)
endpoints := servicecontroller.NewEndpointController(m.serviceRegistry, m.client)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
m.storage = map[string]apiserver.RESTStorage{
@ -118,6 +121,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf
}),
"replicationControllers": controller.NewRegistryStorage(m.controllerRegistry, m.podRegistry),
"services": service.NewRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry),
"endpoints": endpoint.NewStorage(m.endpointRegistry),
"minions": minion.NewRegistryStorage(m.minionRegistry),
// TODO: should appear only in scheduler API group.

145
pkg/proxy/config/api.go Normal file
View File

@ -0,0 +1,145 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// Watcher is the interface needed to receive changes to services and endpoints
type Watcher interface {
WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
}
// SourceAPI implements a configuration source for services and endpoints that
// uses the client watch API to efficiently detect changes.
type SourceAPI struct {
client Watcher
services chan<- ServiceUpdate
endpoints chan<- EndpointsUpdate
waitDuration time.Duration
reconnectDuration time.Duration
}
// NewSourceAPI creates a config source that watches for changes to the services and endpoints
func NewSourceAPI(client Watcher, period time.Duration, services chan<- ServiceUpdate, endpoints chan<- EndpointsUpdate) *SourceAPI {
config := &SourceAPI{
client: client,
services: services,
endpoints: endpoints,
waitDuration: period,
// prevent hot loops if the server starts to misbehave
reconnectDuration: time.Second * 1,
}
serviceVersion := uint64(0)
go util.Forever(func() {
config.runServices(&serviceVersion)
time.Sleep(wait.Jitter(config.reconnectDuration, 0.0))
}, period)
endpointVersion := uint64(0)
go util.Forever(func() {
config.runEndpoints(&endpointVersion)
time.Sleep(wait.Jitter(config.reconnectDuration, 0.0))
}, period)
return config
}
// runServices loops forever looking for changes to services
func (s *SourceAPI) runServices(resourceVersion *uint64) {
watcher, err := s.client.WatchServices(labels.Everything(), labels.Everything(), *resourceVersion)
if err != nil {
glog.Errorf("Unable to watch for services changes: %v", err)
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
return
}
defer watcher.Stop()
ch := watcher.ResultChan()
handleServicesWatch(resourceVersion, ch, s.services)
}
// handleServicesWatch loops over an event channel and delivers config changes to an update channel
func handleServicesWatch(resourceVersion *uint64, ch <-chan watch.Event, updates chan<- ServiceUpdate) {
for {
select {
case event, ok := <-ch:
if !ok {
glog.V(2).Infof("WatchServices channel closed")
return
}
service := event.Object.(*api.Service)
*resourceVersion = service.ResourceVersion + 1
switch event.Type {
case watch.Added, watch.Modified:
updates <- ServiceUpdate{Op: SET, Services: []api.Service{*service}}
case watch.Deleted:
updates <- ServiceUpdate{Op: SET}
}
}
}
}
// runEndpoints loops forever looking for changes to endpoints
func (s *SourceAPI) runEndpoints(resourceVersion *uint64) {
watcher, err := s.client.WatchEndpoints(labels.Everything(), labels.Everything(), *resourceVersion)
if err != nil {
glog.Errorf("Unable to watch for endpoints changes: %v", err)
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
return
}
defer watcher.Stop()
ch := watcher.ResultChan()
handleEndpointsWatch(resourceVersion, ch, s.endpoints)
}
// handleEndpointsWatch loops over an event channel and delivers config changes to an update channel
func handleEndpointsWatch(resourceVersion *uint64, ch <-chan watch.Event, updates chan<- EndpointsUpdate) {
for {
select {
case event, ok := <-ch:
if !ok {
glog.V(2).Infof("WatchEndpoints channel closed")
return
}
endpoints := event.Object.(*api.Endpoints)
*resourceVersion = endpoints.ResourceVersion + 1
switch event.Type {
case watch.Added, watch.Modified:
updates <- EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints}}
case watch.Deleted:
updates <- EndpointsUpdate{Op: SET}
}
}
}
}

View File

@ -0,0 +1,116 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
func TestServices(t *testing.T) {
service := api.Service{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: uint64(2)}}
fakeWatch := watch.NewFake()
fakeClient := &client.Fake{Watch: fakeWatch}
services := make(chan ServiceUpdate)
source := SourceAPI{client: fakeClient, services: services}
resourceVersion := uint64(0)
go func() {
// called twice
source.runServices(&resourceVersion)
source.runServices(&resourceVersion)
}()
// test adding a service to the watch
fakeWatch.Add(&service)
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(0)}}) {
t.Errorf("expected call to watch-services, got %#v", fakeClient)
}
actual := <-services
expected := ServiceUpdate{Op: SET, Services: []api.Service{service}}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual)
}
// verify that a delete results in a config change
fakeWatch.Delete(&service)
actual = <-services
expected = ServiceUpdate{Op: SET}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual)
}
// verify that closing the channel results in a new call to WatchServices with a higher resource version
newFakeWatch := watch.NewFake()
fakeClient.Watch = newFakeWatch
fakeWatch.Stop()
newFakeWatch.Add(&service)
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(0)}, {"watch-services", uint64(3)}}) {
t.Errorf("expected call to watch-endpoints, got %#v", fakeClient)
}
}
func TestEndpoints(t *testing.T) {
endpoint := api.Endpoints{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: uint64(2)}, Endpoints: []string{"127.0.0.1:9000"}}
fakeWatch := watch.NewFake()
fakeClient := &client.Fake{Watch: fakeWatch}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{client: fakeClient, endpoints: endpoints}
resourceVersion := uint64(0)
go func() {
// called twice
source.runEndpoints(&resourceVersion)
source.runEndpoints(&resourceVersion)
}()
// test adding an endpoint to the watch
fakeWatch.Add(&endpoint)
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(0)}}) {
t.Errorf("expected call to watch-endpoints, got %#v", fakeClient)
}
actual := <-endpoints
expected := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{endpoint}}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual)
}
// verify that a delete results in a config change
fakeWatch.Delete(&endpoint)
actual = <-endpoints
expected = EndpointsUpdate{Op: SET}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual)
}
// verify that closing the channel results in a new call to WatchEndpoints with a higher resource version
newFakeWatch := watch.NewFake()
fakeClient.Watch = newFakeWatch
fakeWatch.Stop()
newFakeWatch.Add(&endpoint)
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(0)}, {"watch-endpoints", uint64(3)}}) {
t.Errorf("expected call to watch-endpoints, got %#v", fakeClient)
}
}

View File

@ -0,0 +1,30 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package endpoint
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// Registry is an interface for things that know how to store endpoints.
type Registry interface {
GetEndpoints(name string) (*api.Endpoints, error)
WatchEndpoints(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error)
UpdateEndpoints(e api.Endpoints) error
}

View File

@ -0,0 +1,74 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package endpoint
import (
"errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// Storage adapts endpoints into apiserver's RESTStorage model.
type Storage struct {
registry Registry
}
// NewStorage returns a new Storage implementation for endpoints
func NewStorage(registry Registry) apiserver.RESTStorage {
return &Storage{
registry: registry,
}
}
// Get satisfies the RESTStorage interface but is unimplemented
func (rs *Storage) Get(id string) (interface{}, error) {
return rs.registry.GetEndpoints(id)
}
// List satisfies the RESTStorage interface but is unimplemented
func (rs *Storage) List(selector labels.Selector) (interface{}, error) {
return nil, errors.New("unimplemented")
}
// Watch returns Endpoint events via a watch.Interface.
// It implements apiserver.ResourceWatcher.
func (rs *Storage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return rs.registry.WatchEndpoints(label, field, resourceVersion)
}
// Create satisfies the RESTStorage interface but is unimplemented
func (rs *Storage) Create(obj interface{}) (<-chan interface{}, error) {
return nil, errors.New("unimplemented")
}
// Update satisfies the RESTStorage interface but is unimplemented
func (rs *Storage) Update(obj interface{}) (<-chan interface{}, error) {
return nil, errors.New("unimplemented")
}
// Delete satisfies the RESTStorage interface but is unimplemented
func (rs *Storage) Delete(id string) (<-chan interface{}, error) {
return nil, errors.New("unimplemented")
}
// New implements the RESTStorage interface
func (rs Storage) New() interface{} {
return &api.Endpoints{}
}

View File

@ -0,0 +1,69 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package endpoint
import (
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
func TestGetEndpoints(t *testing.T) {
registry := &registrytest.ServiceRegistry{
Endpoints: api.Endpoints{
JSONBase: api.JSONBase{ID: "foo"},
Endpoints: []string{"127.0.0.1:9000"},
},
}
storage := NewStorage(registry)
obj, err := storage.Get("foo")
if err != nil {
t.Fatalf("unexpected error: %#v", err)
}
if !reflect.DeepEqual([]string{"127.0.0.1:9000"}, obj.(*api.Endpoints).Endpoints) {
t.Errorf("unexpected endpoints: %#v", obj)
}
}
func TestGetEndpointsMissingService(t *testing.T) {
registry := &registrytest.ServiceRegistry{
Err: apiserver.NewNotFoundErr("service", "foo"),
}
storage := NewStorage(registry)
// returns service not found
_, err := storage.Get("foo")
if !apiserver.IsNotFound(err) || !reflect.DeepEqual(err, apiserver.NewNotFoundErr("service", "foo")) {
t.Errorf("expected NotFound error, got %#v", err)
}
// returns empty endpoints
registry.Err = nil
registry.Service = &api.Service{
JSONBase: api.JSONBase{ID: "foo"},
}
obj, err := storage.Get("foo")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if obj.(*api.Endpoints).Endpoints != nil {
t.Errorf("unexpected endpoints: %#v", obj)
}
}

View File

@ -340,10 +340,54 @@ func (r *Registry) UpdateService(svc api.Service) error {
return r.SetObj(makeServiceKey(svc.ID), svc)
}
// WatchServices begins watching for new, changed, or deleted service configurations.
func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
if !label.Empty() {
return nil, fmt.Errorf("label selectors are not supported on services")
}
if value, found := field.RequiresExactMatch("ID"); found {
return r.Watch(makeServiceKey(value), resourceVersion)
}
if field.Empty() {
return r.WatchList("/registry/services/specs", resourceVersion, tools.Everything)
}
return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported")
}
// GetEndpoints obtains endpoints specified by a service name
func (r *Registry) GetEndpoints(name string) (*api.Endpoints, error) {
obj := &api.Endpoints{}
if err := r.ExtractObj(makeServiceEndpointsKey(name), obj, false); err != nil {
if tools.IsEtcdNotFound(err) {
if _, err := r.GetService(name); err != nil && apiserver.IsNotFound(err) {
return nil, apiserver.NewNotFoundErr("service", name)
}
return obj, nil
}
return nil, err
}
return obj, nil
}
// UpdateEndpoints update Endpoints of a Service.
func (r *Registry) UpdateEndpoints(e api.Endpoints) error {
return r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{},
func(interface{}) (interface{}, error) {
func(input interface{}) (interface{}, error) {
// TODO: racy - label query is returning different results for two simultaneous updaters
return e, nil
})
}
// WatchEndpoints begins watching for new, changed, or deleted endpoint configurations.
func (r *Registry) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
if !label.Empty() {
return nil, fmt.Errorf("label selectors are not supported on endpoints")
}
if value, found := field.RequiresExactMatch("ID"); found {
return r.Watch(makeServiceEndpointsKey(value), resourceVersion)
}
if field.Empty() {
return r.WatchList("/registry/services/endpoints", resourceVersion, tools.Everything)
}
return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported")
}

View File

@ -730,7 +730,7 @@ func TestEtcdGetService(t *testing.T) {
}
if service.ID != "foo" {
t.Errorf("Unexpected pod: %#v", service)
t.Errorf("Unexpected service: %#v", service)
}
}
@ -803,6 +803,23 @@ func TestEtcdUpdateService(t *testing.T) {
}
}
func TestEtcdGetEndpoints(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Set("/registry/services/endpoints/foo", api.EncodeOrDie(api.Endpoints{
JSONBase: api.JSONBase{ID: "foo"},
Endpoints: []string{"127.0.0.1:34855"},
}), 0)
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
endpoints, err := registry.GetEndpoints("foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if endpoints.ID != "foo" || !reflect.DeepEqual(endpoints.Endpoints, []string{"127.0.0.1:34855"}) {
t.Errorf("Unexpected endpoints: %#v", endpoints)
}
}
func TestEtcdUpdateEndpoints(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
@ -830,6 +847,104 @@ func TestEtcdUpdateEndpoints(t *testing.T) {
}
}
func TestEtcdWatchServices(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
watching, err := registry.WatchServices(
labels.Everything(),
labels.SelectorFromSet(labels.Set{"ID": "foo"}),
1,
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
func TestEtcdWatchServicesBadSelector(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
_, err := registry.WatchServices(
labels.Everything(),
labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}),
0,
)
if err == nil {
t.Errorf("unexpected non-error: %v", err)
}
_, err = registry.WatchServices(
labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}),
labels.Everything(),
0,
)
if err == nil {
t.Errorf("unexpected non-error: %v", err)
}
}
func TestEtcdWatchEndpoints(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
watching, err := registry.WatchEndpoints(
labels.Everything(),
labels.SelectorFromSet(labels.Set{"ID": "foo"}),
1,
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
func TestEtcdWatchEndpointsBadSelector(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
_, err := registry.WatchEndpoints(
labels.Everything(),
labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}),
0,
)
if err == nil {
t.Errorf("unexpected non-error: %v", err)
}
_, err = registry.WatchEndpoints(
labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}),
labels.Everything(),
0,
)
if err == nil {
t.Errorf("unexpected non-error: %v", err)
}
}
// TODO We need a test for the compare and swap behavior. This basically requires two things:
// 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that
// channel, this will enable us to orchestrate the flow of etcd requests in the test.

View File

@ -18,6 +18,8 @@ package registrytest
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
func NewServiceRegistry() *ServiceRegistry {
@ -60,7 +62,19 @@ func (r *ServiceRegistry) UpdateService(svc api.Service) error {
return r.Err
}
func (r *ServiceRegistry) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return nil, r.Err
}
func (r *ServiceRegistry) GetEndpoints(name string) (*api.Endpoints, error) {
return &r.Endpoints, r.Err
}
func (r *ServiceRegistry) UpdateEndpoints(e api.Endpoints) error {
r.Endpoints = e
return r.Err
}
func (r *ServiceRegistry) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return nil, r.Err
}

View File

@ -18,6 +18,9 @@ package service
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// Registry is an interface for things that know how to store services.
@ -27,5 +30,9 @@ type Registry interface {
GetService(name string) (*api.Service, error)
DeleteService(name string) error
UpdateService(svc api.Service) error
UpdateEndpoints(e api.Endpoints) error
WatchServices(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error)
// TODO: endpoints and their implementation should be separated, setting endpoints should be
// supported via the API, and the endpoints-controller should use the API to update endpoints.
endpoint.Registry
}

View File

@ -27,6 +27,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// RegistryStorage adapts a service registry into apiserver's RESTStorage model.
@ -123,6 +124,12 @@ func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) {
return list, err
}
// Watch returns Services events via a watch.Interface.
// It implements apiserver.ResourceWatcher.
func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return rs.registry.WatchServices(label, field, resourceVersion)
}
func (rs RegistryStorage) New() interface{} {
return &api.Service{}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package endpoint
package service
import (
"fmt"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package endpoint
package service
import (
"encoding/json"

View File

@ -241,6 +241,122 @@ func TestWatch(t *testing.T) {
}
}
func TestWatchEtcdState(t *testing.T) {
type T struct {
Type watch.EventType
Endpoints []string
}
testCases := map[string]struct {
Initial map[string]EtcdResponseWithError
Responses []*etcd.Response
From uint64
Expected []*T
}{
"from not found": {
Initial: map[string]EtcdResponseWithError{},
Responses: []*etcd.Response{
{
Action: "create",
Node: &etcd.Node{
Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})),
},
},
},
From: 1,
Expected: []*T{
{watch.Added, nil},
},
},
"from version 1": {
Responses: []*etcd.Response{
{
Action: "compareAndSwap",
Node: &etcd.Node{
Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})),
CreatedIndex: 1,
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
},
},
From: 1,
Expected: []*T{
{watch.Modified, []string{"127.0.0.1:9000"}},
},
},
"from initial state": {
Initial: map[string]EtcdResponseWithError{
"/somekey/foo": {
R: &etcd.Response{
Action: "get",
Node: &etcd.Node{
Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
EtcdIndex: 1,
},
},
},
Responses: []*etcd.Response{
nil,
{
Action: "compareAndSwap",
Node: &etcd.Node{
Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})),
CreatedIndex: 1,
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
},
},
Expected: []*T{
{watch.Added, nil},
{watch.Modified, []string{"127.0.0.1:9000"}},
},
},
}
for k, testCase := range testCases {
fakeClient := NewFakeEtcdClient(t)
for key, value := range testCase.Initial {
fakeClient.Data[key] = value
}
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.Watch("/somekey/foo", testCase.From)
if err != nil {
t.Errorf("%s: unexpected error: %v", k, err)
continue
}
fakeClient.WaitForWatchCompletion()
t.Logf("Testing %v", k)
for i := range testCase.Responses {
if testCase.Responses[i] != nil {
fakeClient.WatchResponse <- testCase.Responses[i]
}
event := <-watching.ResultChan()
if e, a := testCase.Expected[i].Type, event.Type; e != a {
t.Errorf("%s: expected type %v, got %v", k, e, a)
break
}
if e, a := testCase.Expected[i].Endpoints, event.Object.(*api.Endpoints).Endpoints; !reflect.DeepEqual(e, a) {
t.Errorf("%s: expected type %v, got %v", k, e, a)
break
}
}
watching.Stop()
}
}
func TestWatchFromZeroIndex(t *testing.T) {
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}

View File

@ -18,9 +18,21 @@ package wait
import (
"errors"
"math/rand"
"time"
)
// Jitter returns a time.Duration between duration and duration + maxFactor * duration,
// to allow clients to avoid converging on periodic behavior. If maxFactor is 0.0, a
// suggested default value will be chosen.
func Jitter(duration time.Duration, maxFactor float64) time.Duration {
if maxFactor <= 0.0 {
maxFactor = 1.0
}
wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
return wait
}
// ErrWaitTimeout is returned when the condition exited without success
var ErrWaitTimeout = errors.New("timed out waiting for the condition")