Merge pull request #1326 from lavalamp/scheduler
Fix cache to use the "List then Watch" pattern.
This commit is contained in:
@@ -88,7 +88,7 @@ type SimpleRESTStorage struct {
|
|||||||
injectedFunction func(obj runtime.Object) (returnObj runtime.Object, err error)
|
injectedFunction func(obj runtime.Object) (returnObj runtime.Object, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (storage *SimpleRESTStorage) List(labels.Selector) (runtime.Object, error) {
|
func (storage *SimpleRESTStorage) List(label, field labels.Selector) (runtime.Object, error) {
|
||||||
result := &SimpleList{
|
result := &SimpleList{
|
||||||
Items: storage.list,
|
Items: storage.list,
|
||||||
}
|
}
|
||||||
|
@@ -30,8 +30,7 @@ type RESTStorage interface {
|
|||||||
New() runtime.Object
|
New() runtime.Object
|
||||||
|
|
||||||
// List selects resources in the storage which match to the selector.
|
// List selects resources in the storage which match to the selector.
|
||||||
// TODO: add field selector in addition to label selector.
|
List(label, field labels.Selector) (runtime.Object, error)
|
||||||
List(labels.Selector) (runtime.Object, error)
|
|
||||||
|
|
||||||
// Get finds a resource in the storage by id and returns it.
|
// Get finds a resource in the storage by id and returns it.
|
||||||
// Although it can return an arbitrary error value, IsNotFound(err) is true for the
|
// Although it can return an arbitrary error value, IsNotFound(err) is true for the
|
||||||
|
@@ -70,12 +70,17 @@ func (h *RESTHandler) handleRESTStorage(parts []string, req *http.Request, w htt
|
|||||||
case "GET":
|
case "GET":
|
||||||
switch len(parts) {
|
switch len(parts) {
|
||||||
case 1:
|
case 1:
|
||||||
selector, err := labels.ParseSelector(req.URL.Query().Get("labels"))
|
label, err := labels.ParseSelector(req.URL.Query().Get("labels"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errorJSON(err, h.codec, w)
|
errorJSON(err, h.codec, w)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
list, err := storage.List(selector)
|
field, err := labels.ParseSelector(req.URL.Query().Get("fields"))
|
||||||
|
if err != nil {
|
||||||
|
errorJSON(err, h.codec, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
list, err := storage.List(label, field)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errorJSON(err, h.codec, w)
|
errorJSON(err, h.codec, w)
|
||||||
return
|
return
|
||||||
|
17
pkg/client/cache/fifo.go
vendored
17
pkg/client/cache/fifo.go
vendored
@@ -116,6 +116,23 @@ func (f *FIFO) Pop() interface{} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Replace will delete the contents of 'f', using instead the given map.
|
||||||
|
// 'f' takes ownersip of the map, you should not reference the map again
|
||||||
|
// after calling this function. f's queue is reset, too; upon return, it
|
||||||
|
// will contain the items in the map, in no particular order.
|
||||||
|
func (f *FIFO) Replace(idToObj map[string]interface{}) {
|
||||||
|
f.lock.Lock()
|
||||||
|
defer f.lock.Unlock()
|
||||||
|
f.items = idToObj
|
||||||
|
f.queue = f.queue[:0]
|
||||||
|
for id := range idToObj {
|
||||||
|
f.queue = append(f.queue, id)
|
||||||
|
}
|
||||||
|
if len(f.queue) > 0 {
|
||||||
|
f.cond.Broadcast()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NewFIFO returns a Store which can be used to queue up items to
|
// NewFIFO returns a Store which can be used to queue up items to
|
||||||
// process.
|
// process.
|
||||||
func NewFIFO() *FIFO {
|
func NewFIFO() *FIFO {
|
||||||
|
26
pkg/client/cache/fifo_test.go
vendored
26
pkg/client/cache/fifo_test.go
vendored
@@ -81,3 +81,29 @@ func TestFIFO_addUpdate(t *testing.T) {
|
|||||||
t.Errorf("item did not get removed")
|
t.Errorf("item did not get removed")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFIFO_addReplace(t *testing.T) {
|
||||||
|
f := NewFIFO()
|
||||||
|
f.Add("foo", 10)
|
||||||
|
f.Replace(map[string]interface{}{"foo": 15})
|
||||||
|
got := make(chan int, 2)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
got <- f.Pop().(int)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
first := <-got
|
||||||
|
if e, a := 15, first; e != a {
|
||||||
|
t.Errorf("Didn't get updated value (%v), got %v", e, a)
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case unexpected := <-got:
|
||||||
|
t.Errorf("Got second value %v", unexpected)
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
}
|
||||||
|
_, exists := f.Get("foo")
|
||||||
|
if exists {
|
||||||
|
t.Errorf("item did not get removed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
95
pkg/client/cache/reflector.go
vendored
95
pkg/client/cache/reflector.go
vendored
@@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -26,58 +27,106 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
|
||||||
|
type ListerWatcher interface {
|
||||||
|
// List should return a list type object; the Items field will be extracted, and the
|
||||||
|
// ResourceVersion field will be used to start the watch in the right place.
|
||||||
|
List() (runtime.Object, error)
|
||||||
|
// Watch should begin a watch at the specified version.
|
||||||
|
Watch(resourceVersion uint64) (watch.Interface, error)
|
||||||
|
}
|
||||||
|
|
||||||
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
|
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
|
||||||
type Reflector struct {
|
type Reflector struct {
|
||||||
// The type of object we expect to place in the store.
|
// The type of object we expect to place in the store.
|
||||||
expectedType reflect.Type
|
expectedType reflect.Type
|
||||||
// The destination to sync up with the watch source
|
// The destination to sync up with the watch source
|
||||||
store Store
|
store Store
|
||||||
// watchFactory is called to initiate watches.
|
// listerWatcher is used to perform lists and watches.
|
||||||
watchFactory WatchFactory
|
listerWatcher ListerWatcher
|
||||||
// period controls timing between one watch ending and
|
// period controls timing between one watch ending and
|
||||||
// the beginning of the next one.
|
// the beginning of the next one.
|
||||||
period time.Duration
|
period time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchFactory should begin a watch at the specified version.
|
|
||||||
type WatchFactory func(resourceVersion uint64) (watch.Interface, error)
|
|
||||||
|
|
||||||
// NewReflector creates a new Reflector object which will keep the given store up to
|
// NewReflector creates a new Reflector object which will keep the given store up to
|
||||||
// date with the server's contents for the given resource. Reflector promises to
|
// date with the server's contents for the given resource. Reflector promises to
|
||||||
// only put things in the store that have the type of expectedType.
|
// only put things in the store that have the type of expectedType.
|
||||||
func NewReflector(watchFactory WatchFactory, expectedType interface{}, store Store) *Reflector {
|
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store) *Reflector {
|
||||||
gc := &Reflector{
|
r := &Reflector{
|
||||||
watchFactory: watchFactory,
|
listerWatcher: lw,
|
||||||
store: store,
|
store: store,
|
||||||
expectedType: reflect.TypeOf(expectedType),
|
expectedType: reflect.TypeOf(expectedType),
|
||||||
period: time.Second,
|
period: time.Second,
|
||||||
}
|
}
|
||||||
return gc
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
|
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
|
||||||
// Run starts a goroutine and returns immediately.
|
// Run starts a goroutine and returns immediately.
|
||||||
func (gc *Reflector) Run() {
|
func (r *Reflector) Run() {
|
||||||
|
go util.Forever(func() { r.listAndWatch() }, r.period)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Reflector) listAndWatch() {
|
||||||
var resourceVersion uint64
|
var resourceVersion uint64
|
||||||
go util.Forever(func() {
|
|
||||||
w, err := gc.watchFactory(resourceVersion)
|
list, err := r.listerWatcher.List()
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to list %v: %v", r.expectedType, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
jsonBase, err := runtime.FindJSONBase(list)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Unable to understand list result %#v", list)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resourceVersion = jsonBase.ResourceVersion()
|
||||||
|
items, err := runtime.ExtractList(list)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Unable to understand list result %#v (%v)", list, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = r.syncWith(items)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Unable to sync list result: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
w, err := r.listerWatcher.Watch(resourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("failed to watch %v: %v", gc.expectedType, err)
|
glog.Errorf("failed to watch %v: %v", r.expectedType, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
gc.watchHandler(w, &resourceVersion)
|
r.watchHandler(w, &resourceVersion)
|
||||||
}, gc.period)
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncWith replaces the store's items with the given list.
|
||||||
|
func (r *Reflector) syncWith(items []runtime.Object) error {
|
||||||
|
found := map[string]interface{}{}
|
||||||
|
for _, item := range items {
|
||||||
|
jsonBase, err := runtime.FindJSONBase(item)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unexpected item in list: %v", err)
|
||||||
|
}
|
||||||
|
found[jsonBase.ID()] = item
|
||||||
|
}
|
||||||
|
|
||||||
|
r.store.Replace(found)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// watchHandler watches w and keeps *resourceVersion up to date.
|
// watchHandler watches w and keeps *resourceVersion up to date.
|
||||||
func (gc *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) {
|
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) {
|
||||||
for {
|
for {
|
||||||
event, ok := <-w.ResultChan()
|
event, ok := <-w.ResultChan()
|
||||||
if !ok {
|
if !ok {
|
||||||
glog.Errorf("unexpected watch close")
|
glog.Errorf("unexpected watch close")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if e, a := gc.expectedType, reflect.TypeOf(event.Object); e != a {
|
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
|
||||||
glog.Errorf("expected type %v, but watch event object had type %v", e, a)
|
glog.Errorf("expected type %v, but watch event object had type %v", e, a)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -88,14 +137,14 @@ func (gc *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) {
|
|||||||
}
|
}
|
||||||
switch event.Type {
|
switch event.Type {
|
||||||
case watch.Added:
|
case watch.Added:
|
||||||
gc.store.Add(jsonBase.ID(), event.Object)
|
r.store.Add(jsonBase.ID(), event.Object)
|
||||||
case watch.Modified:
|
case watch.Modified:
|
||||||
gc.store.Update(jsonBase.ID(), event.Object)
|
r.store.Update(jsonBase.ID(), event.Object)
|
||||||
case watch.Deleted:
|
case watch.Deleted:
|
||||||
// TODO: Will any consumers need access to the "last known
|
// TODO: Will any consumers need access to the "last known
|
||||||
// state", which is passed in event.Object? If so, may need
|
// state", which is passed in event.Object? If so, may need
|
||||||
// to change this.
|
// to change this.
|
||||||
gc.store.Delete(jsonBase.ID())
|
r.store.Delete(jsonBase.ID())
|
||||||
default:
|
default:
|
||||||
glog.Errorf("unable to understand watch event %#v", event)
|
glog.Errorf("unable to understand watch event %#v", event)
|
||||||
}
|
}
|
||||||
|
147
pkg/client/cache/reflector_test.go
vendored
147
pkg/client/cache/reflector_test.go
vendored
@@ -17,15 +17,27 @@ limitations under the License.
|
|||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type testLW struct {
|
||||||
|
ListFunc func() (runtime.Object, error)
|
||||||
|
WatchFunc func(resourceVersion uint64) (watch.Interface, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testLW) List() (runtime.Object, error) { return t.ListFunc() }
|
||||||
|
func (t *testLW) Watch(resourceVersion uint64) (watch.Interface, error) {
|
||||||
|
return t.WatchFunc(resourceVersion)
|
||||||
|
}
|
||||||
|
|
||||||
func TestReflector_watchHandler(t *testing.T) {
|
func TestReflector_watchHandler(t *testing.T) {
|
||||||
s := NewStore()
|
s := NewStore()
|
||||||
g := NewReflector(nil, &api.Pod{}, s)
|
g := NewReflector(&testLW{}, &api.Pod{}, s)
|
||||||
fw := watch.NewFake()
|
fw := watch.NewFake()
|
||||||
s.Add("foo", &api.Pod{JSONBase: api.JSONBase{ID: "foo"}})
|
s.Add("foo", &api.Pod{JSONBase: api.JSONBase{ID: "foo"}})
|
||||||
s.Add("bar", &api.Pod{JSONBase: api.JSONBase{ID: "bar"}})
|
s.Add("bar", &api.Pod{JSONBase: api.JSONBase{ID: "bar"}})
|
||||||
@@ -68,27 +80,32 @@ func TestReflector_watchHandler(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReflector_Run(t *testing.T) {
|
func TestReflector_listAndWatch(t *testing.T) {
|
||||||
createdFakes := make(chan *watch.FakeWatcher)
|
createdFakes := make(chan *watch.FakeWatcher)
|
||||||
|
|
||||||
// Expect our starter to get called at the beginning of the watch with 0, and again with 3 when we
|
// The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc
|
||||||
// inject an error at 2.
|
// to get called at the beginning of the watch with 1, and again with 4 when we
|
||||||
expectedRVs := []uint64{0, 3}
|
// inject an error at 3.
|
||||||
watchStarter := func(rv uint64) (watch.Interface, error) {
|
expectedRVs := []uint64{1, 4}
|
||||||
fw := watch.NewFake()
|
lw := &testLW{
|
||||||
if e, a := expectedRVs[0], rv; e != a {
|
WatchFunc: func(rv uint64) (watch.Interface, error) {
|
||||||
t.Errorf("Expected rv %v, but got %v", e, a)
|
fw := watch.NewFake()
|
||||||
}
|
if e, a := expectedRVs[0], rv; e != a {
|
||||||
expectedRVs = expectedRVs[1:]
|
t.Errorf("Expected rv %v, but got %v", e, a)
|
||||||
// channel is not buffered because the for loop below needs to block. But
|
}
|
||||||
// we don't want to block here, so report the new fake via a go routine.
|
expectedRVs = expectedRVs[1:]
|
||||||
go func() { createdFakes <- fw }()
|
// channel is not buffered because the for loop below needs to block. But
|
||||||
return fw, nil
|
// we don't want to block here, so report the new fake via a go routine.
|
||||||
|
go func() { createdFakes <- fw }()
|
||||||
|
return fw, nil
|
||||||
|
},
|
||||||
|
ListFunc: func() (runtime.Object, error) {
|
||||||
|
return &api.PodList{JSONBase: api.JSONBase{ResourceVersion: 1}}, nil
|
||||||
|
},
|
||||||
}
|
}
|
||||||
s := NewFIFO()
|
s := NewFIFO()
|
||||||
r := NewReflector(watchStarter, &api.Pod{}, s)
|
r := NewReflector(lw, &api.Pod{}, s)
|
||||||
r.period = 0
|
go r.listAndWatch()
|
||||||
r.Run()
|
|
||||||
|
|
||||||
ids := []string{"foo", "bar", "baz", "qux", "zoo"}
|
ids := []string{"foo", "bar", "baz", "qux", "zoo"}
|
||||||
var fw *watch.FakeWatcher
|
var fw *watch.FakeWatcher
|
||||||
@@ -96,9 +113,9 @@ func TestReflector_Run(t *testing.T) {
|
|||||||
if fw == nil {
|
if fw == nil {
|
||||||
fw = <-createdFakes
|
fw = <-createdFakes
|
||||||
}
|
}
|
||||||
sendingRV := uint64(i + 1)
|
sendingRV := uint64(i + 2)
|
||||||
fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: id, ResourceVersion: sendingRV}})
|
fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: id, ResourceVersion: sendingRV}})
|
||||||
if sendingRV == 2 {
|
if sendingRV == 3 {
|
||||||
// Inject a failure.
|
// Inject a failure.
|
||||||
fw.Stop()
|
fw.Stop()
|
||||||
fw = nil
|
fw = nil
|
||||||
@@ -111,7 +128,7 @@ func TestReflector_Run(t *testing.T) {
|
|||||||
if e, a := id, pod.ID; e != a {
|
if e, a := id, pod.ID; e != a {
|
||||||
t.Errorf("%v: Expected %v, got %v", i, e, a)
|
t.Errorf("%v: Expected %v, got %v", i, e, a)
|
||||||
}
|
}
|
||||||
if e, a := uint64(i+1), pod.ResourceVersion; e != a {
|
if e, a := uint64(i+2), pod.ResourceVersion; e != a {
|
||||||
t.Errorf("%v: Expected %v, got %v", i, e, a)
|
t.Errorf("%v: Expected %v, got %v", i, e, a)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -120,3 +137,91 @@ func TestReflector_Run(t *testing.T) {
|
|||||||
t.Error("called watchStarter an unexpected number of times")
|
t.Error("called watchStarter an unexpected number of times")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReflector_listAndWatchWithErrors(t *testing.T) {
|
||||||
|
mkPod := func(id string, rv uint64) *api.Pod {
|
||||||
|
return &api.Pod{JSONBase: api.JSONBase{ID: id, ResourceVersion: rv}}
|
||||||
|
}
|
||||||
|
mkList := func(rv uint64, pods ...*api.Pod) *api.PodList {
|
||||||
|
list := &api.PodList{JSONBase: api.JSONBase{ResourceVersion: rv}}
|
||||||
|
for _, pod := range pods {
|
||||||
|
list.Items = append(list.Items, *pod)
|
||||||
|
}
|
||||||
|
return list
|
||||||
|
}
|
||||||
|
table := []struct {
|
||||||
|
list *api.PodList
|
||||||
|
listErr error
|
||||||
|
events []watch.Event
|
||||||
|
watchErr error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
list: mkList(1),
|
||||||
|
events: []watch.Event{
|
||||||
|
{watch.Added, mkPod("foo", 2)},
|
||||||
|
{watch.Added, mkPod("bar", 3)},
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
list: mkList(3, mkPod("foo", 2), mkPod("bar", 3)),
|
||||||
|
events: []watch.Event{
|
||||||
|
{watch.Deleted, mkPod("foo", 4)},
|
||||||
|
{watch.Added, mkPod("qux", 5)},
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
listErr: fmt.Errorf("a list error"),
|
||||||
|
}, {
|
||||||
|
list: mkList(5, mkPod("bar", 3), mkPod("qux", 5)),
|
||||||
|
watchErr: fmt.Errorf("a watch error"),
|
||||||
|
}, {
|
||||||
|
list: mkList(5, mkPod("bar", 3), mkPod("qux", 5)),
|
||||||
|
events: []watch.Event{
|
||||||
|
{watch.Added, mkPod("baz", 6)},
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
list: mkList(6, mkPod("bar", 3), mkPod("qux", 5), mkPod("baz", 6)),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
s := NewFIFO()
|
||||||
|
for line, item := range table {
|
||||||
|
if item.list != nil {
|
||||||
|
// Test that the list is what currently exists in the store.
|
||||||
|
current := s.List()
|
||||||
|
checkMap := map[string]uint64{}
|
||||||
|
for _, item := range current {
|
||||||
|
pod := item.(*api.Pod)
|
||||||
|
checkMap[pod.ID] = pod.ResourceVersion
|
||||||
|
}
|
||||||
|
for _, pod := range item.list.Items {
|
||||||
|
if e, a := pod.ResourceVersion, checkMap[pod.ID]; e != a {
|
||||||
|
t.Errorf("%v: expected %v, got %v for pod %v", line, e, a, pod.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if e, a := len(item.list.Items), len(checkMap); e != a {
|
||||||
|
t.Errorf("%v: expected %v, got %v", line, e, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
watchRet, watchErr := item.events, item.watchErr
|
||||||
|
lw := &testLW{
|
||||||
|
WatchFunc: func(rv uint64) (watch.Interface, error) {
|
||||||
|
if watchErr != nil {
|
||||||
|
return nil, watchErr
|
||||||
|
}
|
||||||
|
watchErr = fmt.Errorf("second watch")
|
||||||
|
fw := watch.NewFake()
|
||||||
|
go func() {
|
||||||
|
for _, e := range watchRet {
|
||||||
|
fw.Action(e.Type, e.Object)
|
||||||
|
}
|
||||||
|
fw.Stop()
|
||||||
|
}()
|
||||||
|
return fw, nil
|
||||||
|
},
|
||||||
|
ListFunc: func() (runtime.Object, error) {
|
||||||
|
return item.list, item.listErr
|
||||||
|
},
|
||||||
|
}
|
||||||
|
r := NewReflector(lw, &api.Pod{}, s)
|
||||||
|
r.listAndWatch()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
14
pkg/client/cache/store.go
vendored
14
pkg/client/cache/store.go
vendored
@@ -33,6 +33,11 @@ type Store interface {
|
|||||||
List() []interface{}
|
List() []interface{}
|
||||||
Contains() util.StringSet
|
Contains() util.StringSet
|
||||||
Get(id string) (item interface{}, exists bool)
|
Get(id string) (item interface{}, exists bool)
|
||||||
|
|
||||||
|
// Replace will delete the contents of the store, using instead the
|
||||||
|
// given map. Store takes ownership of the map, you should not reference
|
||||||
|
// it after calling this function.
|
||||||
|
Replace(idToObj map[string]interface{})
|
||||||
}
|
}
|
||||||
|
|
||||||
type cache struct {
|
type cache struct {
|
||||||
@@ -95,6 +100,15 @@ func (c *cache) Get(id string) (item interface{}, exists bool) {
|
|||||||
return item, exists
|
return item, exists
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Replace will delete the contents of 'c', using instead the given map.
|
||||||
|
// 'c' takes ownership of the map, you should not reference the map again
|
||||||
|
// after calling this function.
|
||||||
|
func (c *cache) Replace(idToObj map[string]interface{}) {
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
c.items = idToObj
|
||||||
|
}
|
||||||
|
|
||||||
// NewStore returns a Store implemented simply with a map and a lock.
|
// NewStore returns a Store implemented simply with a map and a lock.
|
||||||
func NewStore() Store {
|
func NewStore() Store {
|
||||||
return &cache{items: map[string]interface{}{}}
|
return &cache{items: map[string]interface{}{}}
|
||||||
|
64
pkg/client/cache/store_test.go
vendored
64
pkg/client/cache/store_test.go
vendored
@@ -45,28 +45,58 @@ func doTestStore(t *testing.T, store Store) {
|
|||||||
t.Errorf("found deleted item??")
|
t.Errorf("found deleted item??")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test List
|
// Test List.
|
||||||
store.Add("a", "b")
|
store.Add("a", "b")
|
||||||
store.Add("c", "d")
|
store.Add("c", "d")
|
||||||
store.Add("e", "e")
|
store.Add("e", "e")
|
||||||
found := util.StringSet{}
|
{
|
||||||
for _, item := range store.List() {
|
found := util.StringSet{}
|
||||||
found.Insert(item.(string))
|
for _, item := range store.List() {
|
||||||
}
|
found.Insert(item.(string))
|
||||||
if !found.HasAll("b", "d", "e") {
|
}
|
||||||
t.Errorf("missing items")
|
if !found.HasAll("b", "d", "e") {
|
||||||
}
|
t.Errorf("missing items")
|
||||||
if len(found) != 3 {
|
}
|
||||||
t.Errorf("extra items")
|
if len(found) != 3 {
|
||||||
|
t.Errorf("extra items")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that ID list is correct.
|
||||||
|
ids := store.Contains()
|
||||||
|
if !ids.HasAll("a", "c", "e") {
|
||||||
|
t.Errorf("missing items")
|
||||||
|
}
|
||||||
|
if len(ids) != 3 {
|
||||||
|
t.Errorf("extra items")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that ID list is correct.
|
// Test Replace.
|
||||||
ids := store.Contains()
|
store.Replace(map[string]interface{}{
|
||||||
if !ids.HasAll("a", "c", "e") {
|
"foo": "foo",
|
||||||
t.Errorf("missing items")
|
"bar": "bar",
|
||||||
}
|
})
|
||||||
if len(ids) != 3 {
|
|
||||||
t.Errorf("extra items")
|
{
|
||||||
|
found := util.StringSet{}
|
||||||
|
for _, item := range store.List() {
|
||||||
|
found.Insert(item.(string))
|
||||||
|
}
|
||||||
|
if !found.HasAll("foo", "bar") {
|
||||||
|
t.Errorf("missing items")
|
||||||
|
}
|
||||||
|
if len(found) != 2 {
|
||||||
|
t.Errorf("extra items")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that ID list is correct.
|
||||||
|
ids := store.Contains()
|
||||||
|
if !ids.HasAll("foo", "bar") {
|
||||||
|
t.Errorf("missing items")
|
||||||
|
}
|
||||||
|
if len(ids) != 2 {
|
||||||
|
t.Errorf("extra items")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -41,7 +41,7 @@ func NewREST(bindingRegistry Registry) *REST {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// List returns an error because bindings are write-only objects.
|
// List returns an error because bindings are write-only objects.
|
||||||
func (*REST) List(selector labels.Selector) (runtime.Object, error) {
|
func (*REST) List(label, field labels.Selector) (runtime.Object, error) {
|
||||||
return nil, errors.NewNotFound("binding", "list")
|
return nil, errors.NewNotFound("binding", "list")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -65,7 +65,7 @@ func TestRESTUnsupported(t *testing.T) {
|
|||||||
if _, err := b.Get("binding id"); err == nil {
|
if _, err := b.Get("binding id"); err == nil {
|
||||||
t.Errorf("unexpected non-error")
|
t.Errorf("unexpected non-error")
|
||||||
}
|
}
|
||||||
if _, err := b.List(labels.Set{"name": "foo"}.AsSelector()); err == nil {
|
if _, err := b.List(labels.Set{"name": "foo"}.AsSelector(), labels.Everything()); err == nil {
|
||||||
t.Errorf("unexpected non-error")
|
t.Errorf("unexpected non-error")
|
||||||
}
|
}
|
||||||
// Try sending wrong object just to get 100% coverage
|
// Try sending wrong object just to get 100% coverage
|
||||||
|
@@ -97,14 +97,17 @@ func (rs *REST) Get(id string) (runtime.Object, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// List obtains a list of ReplicationControllers that match selector.
|
// List obtains a list of ReplicationControllers that match selector.
|
||||||
func (rs *REST) List(selector labels.Selector) (runtime.Object, error) {
|
func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) {
|
||||||
|
if !field.Empty() {
|
||||||
|
return nil, fmt.Errorf("field selector not supported yet")
|
||||||
|
}
|
||||||
controllers, err := rs.registry.ListControllers()
|
controllers, err := rs.registry.ListControllers()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
filtered := []api.ReplicationController{}
|
filtered := []api.ReplicationController{}
|
||||||
for _, controller := range controllers.Items {
|
for _, controller := range controllers.Items {
|
||||||
if selector.Matches(labels.Set(controller.Labels)) {
|
if label.Matches(labels.Set(controller.Labels)) {
|
||||||
rs.fillCurrentState(&controller)
|
rs.fillCurrentState(&controller)
|
||||||
filtered = append(filtered, controller)
|
filtered = append(filtered, controller)
|
||||||
}
|
}
|
||||||
|
@@ -39,7 +39,7 @@ func TestListControllersError(t *testing.T) {
|
|||||||
storage := REST{
|
storage := REST{
|
||||||
registry: &mockRegistry,
|
registry: &mockRegistry,
|
||||||
}
|
}
|
||||||
controllers, err := storage.List(nil)
|
controllers, err := storage.List(labels.Everything(), labels.Everything())
|
||||||
if err != mockRegistry.Err {
|
if err != mockRegistry.Err {
|
||||||
t.Errorf("Expected %#v, Got %#v", mockRegistry.Err, err)
|
t.Errorf("Expected %#v, Got %#v", mockRegistry.Err, err)
|
||||||
}
|
}
|
||||||
@@ -53,7 +53,7 @@ func TestListEmptyControllerList(t *testing.T) {
|
|||||||
storage := REST{
|
storage := REST{
|
||||||
registry: &mockRegistry,
|
registry: &mockRegistry,
|
||||||
}
|
}
|
||||||
controllers, err := storage.List(labels.Everything())
|
controllers, err := storage.List(labels.Everything(), labels.Everything())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@@ -86,7 +86,7 @@ func TestListControllerList(t *testing.T) {
|
|||||||
storage := REST{
|
storage := REST{
|
||||||
registry: &mockRegistry,
|
registry: &mockRegistry,
|
||||||
}
|
}
|
||||||
controllersObj, err := storage.List(labels.Everything())
|
controllersObj, err := storage.List(labels.Everything(), labels.Everything())
|
||||||
controllers := controllersObj.(*api.ReplicationControllerList)
|
controllers := controllersObj.(*api.ReplicationControllerList)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
@@ -43,9 +43,9 @@ func (rs *REST) Get(id string) (runtime.Object, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// List satisfies the RESTStorage interface.
|
// List satisfies the RESTStorage interface.
|
||||||
func (rs *REST) List(selector labels.Selector) (runtime.Object, error) {
|
func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) {
|
||||||
if !selector.Empty() {
|
if !label.Empty() || !field.Empty() {
|
||||||
return nil, errors.New("label selectors are not supported on endpoints")
|
return nil, errors.New("label/field selectors are not supported on endpoints")
|
||||||
}
|
}
|
||||||
return rs.registry.ListEndpoints()
|
return rs.registry.ListEndpoints()
|
||||||
}
|
}
|
||||||
|
@@ -79,7 +79,7 @@ func TestEndpointsRegistryList(t *testing.T) {
|
|||||||
{JSONBase: api.JSONBase{ID: "bar"}},
|
{JSONBase: api.JSONBase{ID: "bar"}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
s, _ := storage.List(labels.Everything())
|
s, _ := storage.List(labels.Everything(), labels.Everything())
|
||||||
sl := s.(*api.EndpointsList)
|
sl := s.(*api.EndpointsList)
|
||||||
if len(sl.Items) != 2 {
|
if len(sl.Items) != 2 {
|
||||||
t.Fatalf("Expected 2 endpoints, but got %v", len(sl.Items))
|
t.Fatalf("Expected 2 endpoints, but got %v", len(sl.Items))
|
||||||
|
@@ -60,8 +60,15 @@ func makePodKey(podID string) string {
|
|||||||
return "/registry/pods/" + podID
|
return "/registry/pods/" + podID
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListPods obtains a list of pods that match selector.
|
// ListPods obtains a list of pods with labels that match selector.
|
||||||
func (r *Registry) ListPods(selector labels.Selector) (*api.PodList, error) {
|
func (r *Registry) ListPods(selector labels.Selector) (*api.PodList, error) {
|
||||||
|
return r.ListPodsPredicate(func(pod *api.Pod) bool {
|
||||||
|
return selector.Matches(labels.Set(pod.Labels))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListPodsPredicate obtains a list of pods that match filter.
|
||||||
|
func (r *Registry) ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, error) {
|
||||||
allPods := api.PodList{}
|
allPods := api.PodList{}
|
||||||
err := r.ExtractList("/registry/pods", &allPods.Items, &allPods.ResourceVersion)
|
err := r.ExtractList("/registry/pods", &allPods.Items, &allPods.ResourceVersion)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -69,7 +76,7 @@ func (r *Registry) ListPods(selector labels.Selector) (*api.PodList, error) {
|
|||||||
}
|
}
|
||||||
filtered := []api.Pod{}
|
filtered := []api.Pod{}
|
||||||
for _, pod := range allPods.Items {
|
for _, pod := range allPods.Items {
|
||||||
if selector.Matches(labels.Set(pod.Labels)) {
|
if filter(&pod) {
|
||||||
// TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets
|
// TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets
|
||||||
// the CurrentState.Host and Status fields. Here we pretend that reality perfectly
|
// the CurrentState.Host and Status fields. Here we pretend that reality perfectly
|
||||||
// matches our desires.
|
// matches our desires.
|
||||||
|
@@ -86,7 +86,7 @@ func (rs *REST) Get(id string) (runtime.Object, error) {
|
|||||||
return rs.toApiMinion(id), err
|
return rs.toApiMinion(id), err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rs *REST) List(selector labels.Selector) (runtime.Object, error) {
|
func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) {
|
||||||
nameList, err := rs.registry.List()
|
nameList, err := rs.registry.List()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@@ -67,7 +67,7 @@ func TestMinionREST(t *testing.T) {
|
|||||||
t.Errorf("delete returned wrong error")
|
t.Errorf("delete returned wrong error")
|
||||||
}
|
}
|
||||||
|
|
||||||
list, err := ms.List(labels.Everything())
|
list, err := ms.List(labels.Everything(), labels.Everything())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("got error calling List")
|
t.Errorf("got error calling List")
|
||||||
}
|
}
|
||||||
|
@@ -24,8 +24,10 @@ import (
|
|||||||
|
|
||||||
// Registry is an interface implemented by things that know how to store Pod objects.
|
// Registry is an interface implemented by things that know how to store Pod objects.
|
||||||
type Registry interface {
|
type Registry interface {
|
||||||
// ListPods obtains a list of pods that match selector.
|
// ListPods obtains a list of pods having labels which match selector.
|
||||||
ListPods(selector labels.Selector) (*api.PodList, error)
|
ListPods(selector labels.Selector) (*api.PodList, error)
|
||||||
|
// ListPodsPredicate obtains a list of pods for which filter returns true.
|
||||||
|
ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, error)
|
||||||
// Watch for new/changed/deleted pods
|
// Watch for new/changed/deleted pods
|
||||||
WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error)
|
WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error)
|
||||||
// Get a specific pod
|
// Get a specific pod
|
||||||
|
@@ -114,8 +114,25 @@ func (rs *REST) Get(id string) (runtime.Object, error) {
|
|||||||
return pod, err
|
return pod, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rs *REST) List(selector labels.Selector) (runtime.Object, error) {
|
func (rs *REST) podToSelectableFields(pod *api.Pod) labels.Set {
|
||||||
pods, err := rs.registry.ListPods(selector)
|
return labels.Set{
|
||||||
|
"ID": pod.ID,
|
||||||
|
"DesiredState.Status": string(pod.DesiredState.Status),
|
||||||
|
"DesiredState.Host": pod.DesiredState.Host,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// filterFunc returns a predicate based on label & field selectors that can be passed to registry's
|
||||||
|
// ListPods & WatchPods.
|
||||||
|
func (rs *REST) filterFunc(label, field labels.Selector) func(*api.Pod) bool {
|
||||||
|
return func(pod *api.Pod) bool {
|
||||||
|
fields := rs.podToSelectableFields(pod)
|
||||||
|
return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) {
|
||||||
|
pods, err := rs.registry.ListPodsPredicate(rs.filterFunc(label, field))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
for i := range pods.Items {
|
for i := range pods.Items {
|
||||||
pod := &pods.Items[i]
|
pod := &pods.Items[i]
|
||||||
@@ -133,14 +150,7 @@ func (rs *REST) List(selector labels.Selector) (runtime.Object, error) {
|
|||||||
|
|
||||||
// Watch begins watching for new, changed, or deleted pods.
|
// Watch begins watching for new, changed, or deleted pods.
|
||||||
func (rs *REST) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
|
func (rs *REST) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
|
||||||
return rs.registry.WatchPods(resourceVersion, func(pod *api.Pod) bool {
|
return rs.registry.WatchPods(resourceVersion, rs.filterFunc(label, field))
|
||||||
fields := labels.Set{
|
|
||||||
"ID": pod.ID,
|
|
||||||
"DesiredState.Status": string(pod.DesiredState.Status),
|
|
||||||
"DesiredState.Host": pod.DesiredState.Host,
|
|
||||||
}
|
|
||||||
return label.Matches(labels.Set(pod.Labels)) && field.Matches(fields)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*REST) New() runtime.Object {
|
func (*REST) New() runtime.Object {
|
||||||
|
@@ -30,6 +30,7 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
|
|
||||||
"github.com/fsouza/go-dockerclient"
|
"github.com/fsouza/go-dockerclient"
|
||||||
)
|
)
|
||||||
@@ -130,7 +131,7 @@ func TestListPodsError(t *testing.T) {
|
|||||||
storage := REST{
|
storage := REST{
|
||||||
registry: podRegistry,
|
registry: podRegistry,
|
||||||
}
|
}
|
||||||
pods, err := storage.List(labels.Everything())
|
pods, err := storage.List(labels.Everything(), labels.Everything())
|
||||||
if err != podRegistry.Err {
|
if err != podRegistry.Err {
|
||||||
t.Errorf("Expected %#v, Got %#v", podRegistry.Err, err)
|
t.Errorf("Expected %#v, Got %#v", podRegistry.Err, err)
|
||||||
}
|
}
|
||||||
@@ -144,7 +145,7 @@ func TestListEmptyPodList(t *testing.T) {
|
|||||||
storage := REST{
|
storage := REST{
|
||||||
registry: podRegistry,
|
registry: podRegistry,
|
||||||
}
|
}
|
||||||
pods, err := storage.List(labels.Everything())
|
pods, err := storage.List(labels.Everything(), labels.Everything())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@@ -176,7 +177,7 @@ func TestListPodList(t *testing.T) {
|
|||||||
storage := REST{
|
storage := REST{
|
||||||
registry: podRegistry,
|
registry: podRegistry,
|
||||||
}
|
}
|
||||||
podsObj, err := storage.List(labels.Everything())
|
podsObj, err := storage.List(labels.Everything(), labels.Everything())
|
||||||
pods := podsObj.(*api.PodList)
|
pods := podsObj.(*api.PodList)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@@ -193,6 +194,86 @@ func TestListPodList(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestListPodListSelection(t *testing.T) {
|
||||||
|
podRegistry := registrytest.NewPodRegistry(nil)
|
||||||
|
podRegistry.Pods = &api.PodList{
|
||||||
|
Items: []api.Pod{
|
||||||
|
{
|
||||||
|
JSONBase: api.JSONBase{ID: "foo"},
|
||||||
|
}, {
|
||||||
|
JSONBase: api.JSONBase{ID: "bar"},
|
||||||
|
DesiredState: api.PodState{Host: "barhost"},
|
||||||
|
}, {
|
||||||
|
JSONBase: api.JSONBase{ID: "baz"},
|
||||||
|
DesiredState: api.PodState{Status: "bazstatus"},
|
||||||
|
}, {
|
||||||
|
JSONBase: api.JSONBase{ID: "qux"},
|
||||||
|
Labels: map[string]string{"label": "qux"},
|
||||||
|
}, {
|
||||||
|
JSONBase: api.JSONBase{ID: "zot"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
storage := REST{
|
||||||
|
registry: podRegistry,
|
||||||
|
}
|
||||||
|
|
||||||
|
table := []struct {
|
||||||
|
label, field string
|
||||||
|
expectedIDs util.StringSet
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
expectedIDs: util.NewStringSet("foo", "bar", "baz", "qux", "zot"),
|
||||||
|
}, {
|
||||||
|
field: "ID=zot",
|
||||||
|
expectedIDs: util.NewStringSet("zot"),
|
||||||
|
}, {
|
||||||
|
label: "label=qux",
|
||||||
|
expectedIDs: util.NewStringSet("qux"),
|
||||||
|
}, {
|
||||||
|
field: "DesiredState.Status=bazstatus",
|
||||||
|
expectedIDs: util.NewStringSet("baz"),
|
||||||
|
}, {
|
||||||
|
field: "DesiredState.Host=barhost",
|
||||||
|
expectedIDs: util.NewStringSet("bar"),
|
||||||
|
}, {
|
||||||
|
field: "DesiredState.Host=",
|
||||||
|
expectedIDs: util.NewStringSet("foo", "baz", "qux", "zot"),
|
||||||
|
}, {
|
||||||
|
field: "DesiredState.Host!=",
|
||||||
|
expectedIDs: util.NewStringSet("bar"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for index, item := range table {
|
||||||
|
label, err := labels.ParseSelector(item.label)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
field, err := labels.ParseSelector(item.field)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
podsObj, err := storage.List(label, field)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
pods := podsObj.(*api.PodList)
|
||||||
|
|
||||||
|
if e, a := len(item.expectedIDs), len(pods.Items); e != a {
|
||||||
|
t.Errorf("%v: Expected %v, got %v", index, e, a)
|
||||||
|
}
|
||||||
|
for _, pod := range pods.Items {
|
||||||
|
if !item.expectedIDs.Has(pod.ID) {
|
||||||
|
t.Errorf("%v: Unexpected pod %v", index, pod.ID)
|
||||||
|
}
|
||||||
|
t.Logf("%v: Got pod ID: %v", index, pod.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPodDecode(t *testing.T) {
|
func TestPodDecode(t *testing.T) {
|
||||||
podRegistry := registrytest.NewPodRegistry(nil)
|
podRegistry := registrytest.NewPodRegistry(nil)
|
||||||
storage := REST{
|
storage := REST{
|
||||||
|
@@ -40,7 +40,7 @@ func NewPodRegistry(pods *api.PodList) *PodRegistry {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *PodRegistry) ListPods(selector labels.Selector) (*api.PodList, error) {
|
func (r *PodRegistry) ListPodsPredicate(filter func(*api.Pod) bool) (*api.PodList, error) {
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
if r.Err != nil {
|
if r.Err != nil {
|
||||||
@@ -48,7 +48,7 @@ func (r *PodRegistry) ListPods(selector labels.Selector) (*api.PodList, error) {
|
|||||||
}
|
}
|
||||||
var filtered []api.Pod
|
var filtered []api.Pod
|
||||||
for _, pod := range r.Pods.Items {
|
for _, pod := range r.Pods.Items {
|
||||||
if selector.Matches(labels.Set(pod.Labels)) {
|
if filter(&pod) {
|
||||||
filtered = append(filtered, pod)
|
filtered = append(filtered, pod)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -57,6 +57,12 @@ func (r *PodRegistry) ListPods(selector labels.Selector) (*api.PodList, error) {
|
|||||||
return &pods, nil
|
return &pods, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *PodRegistry) ListPods(selector labels.Selector) (*api.PodList, error) {
|
||||||
|
return r.ListPodsPredicate(func(pod *api.Pod) bool {
|
||||||
|
return selector.Matches(labels.Set(pod.Labels))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func (r *PodRegistry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) {
|
func (r *PodRegistry) WatchPods(resourceVersion uint64, filter func(*api.Pod) bool) (watch.Interface, error) {
|
||||||
// TODO: wire filter down into the mux; it needs access to current and previous state :(
|
// TODO: wire filter down into the mux; it needs access to current and previous state :(
|
||||||
return r.mux.Watch(), nil
|
return r.mux.Watch(), nil
|
||||||
|
@@ -113,14 +113,15 @@ func (rs *REST) Get(id string) (runtime.Object, error) {
|
|||||||
return s, err
|
return s, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rs *REST) List(selector labels.Selector) (runtime.Object, error) {
|
// TODO: implement field selector?
|
||||||
|
func (rs *REST) List(label, field labels.Selector) (runtime.Object, error) {
|
||||||
list, err := rs.registry.ListServices()
|
list, err := rs.registry.ListServices()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var filtered []api.Service
|
var filtered []api.Service
|
||||||
for _, service := range list.Items {
|
for _, service := range list.Items {
|
||||||
if selector.Matches(labels.Set(service.Labels)) {
|
if label.Matches(labels.Set(service.Labels)) {
|
||||||
filtered = append(filtered, service)
|
filtered = append(filtered, service)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -319,7 +319,7 @@ func TestServiceRegistryList(t *testing.T) {
|
|||||||
Selector: map[string]string{"bar2": "baz2"},
|
Selector: map[string]string{"bar2": "baz2"},
|
||||||
})
|
})
|
||||||
registry.List.ResourceVersion = 1
|
registry.List.ResourceVersion = 1
|
||||||
s, _ := storage.List(labels.Everything())
|
s, _ := storage.List(labels.Everything(), labels.Everything())
|
||||||
sl := s.(*api.ServiceList)
|
sl := s.(*api.ServiceList)
|
||||||
if len(fakeCloud.Calls) != 0 {
|
if len(fakeCloud.Calls) != 0 {
|
||||||
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
|
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
|
||||||
|
@@ -26,6 +26,7 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
|
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
@@ -43,19 +44,19 @@ type ConfigFactory struct {
|
|||||||
func (factory *ConfigFactory) Create() *scheduler.Config {
|
func (factory *ConfigFactory) Create() *scheduler.Config {
|
||||||
// Watch and queue pods that need scheduling.
|
// Watch and queue pods that need scheduling.
|
||||||
podQueue := cache.NewFIFO()
|
podQueue := cache.NewFIFO()
|
||||||
cache.NewReflector(factory.createUnassignedPodWatch, &api.Pod{}, podQueue).Run()
|
cache.NewReflector(factory.createUnassignedPodLW(), &api.Pod{}, podQueue).Run()
|
||||||
|
|
||||||
// Watch and cache all running pods. Scheduler needs to find all pods
|
// Watch and cache all running pods. Scheduler needs to find all pods
|
||||||
// so it knows where it's safe to place a pod. Cache this locally.
|
// so it knows where it's safe to place a pod. Cache this locally.
|
||||||
podCache := cache.NewStore()
|
podCache := cache.NewStore()
|
||||||
cache.NewReflector(factory.createAssignedPodWatch, &api.Pod{}, podCache).Run()
|
cache.NewReflector(factory.createAssignedPodLW(), &api.Pod{}, podCache).Run()
|
||||||
|
|
||||||
// Watch minions.
|
// Watch minions.
|
||||||
// Minions may be listed frequently, so provide a local up-to-date cache.
|
// Minions may be listed frequently, so provide a local up-to-date cache.
|
||||||
minionCache := cache.NewStore()
|
minionCache := cache.NewStore()
|
||||||
if false {
|
if false {
|
||||||
// Disable this code until minions support watches.
|
// Disable this code until minions support watches.
|
||||||
cache.NewReflector(factory.createMinionWatch, &api.Minion{}, minionCache).Run()
|
cache.NewReflector(factory.createMinionLW(), &api.Minion{}, minionCache).Run()
|
||||||
} else {
|
} else {
|
||||||
cache.NewPoller(factory.pollMinions, 10*time.Second, minionCache).Run()
|
cache.NewPoller(factory.pollMinions, 10*time.Second, minionCache).Run()
|
||||||
}
|
}
|
||||||
@@ -82,38 +83,66 @@ func (factory *ConfigFactory) Create() *scheduler.Config {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// createUnassignedPodWatch starts a watch that finds all pods that need to be
|
type listWatch struct {
|
||||||
|
client *client.Client
|
||||||
|
fieldSelector labels.Selector
|
||||||
|
resource string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lw *listWatch) List() (runtime.Object, error) {
|
||||||
|
return lw.client.
|
||||||
|
Get().
|
||||||
|
Path(lw.resource).
|
||||||
|
SelectorParam("fields", lw.fieldSelector).
|
||||||
|
Do().
|
||||||
|
Get()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lw *listWatch) Watch(resourceVersion uint64) (watch.Interface, error) {
|
||||||
|
return lw.client.
|
||||||
|
Get().
|
||||||
|
Path("watch").
|
||||||
|
Path(lw.resource).
|
||||||
|
SelectorParam("fields", lw.fieldSelector).
|
||||||
|
UintParam("resourceVersion", resourceVersion).
|
||||||
|
Watch()
|
||||||
|
}
|
||||||
|
|
||||||
|
// createUnassignedPodLW returns a listWatch that finds all pods that need to be
|
||||||
// scheduled.
|
// scheduled.
|
||||||
func (factory *ConfigFactory) createUnassignedPodWatch(resourceVersion uint64) (watch.Interface, error) {
|
func (factory *ConfigFactory) createUnassignedPodLW() *listWatch {
|
||||||
return factory.Client.
|
return &listWatch{
|
||||||
Get().
|
client: factory.Client,
|
||||||
Path("watch").
|
fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
|
||||||
Path("pods").
|
resource: "pods",
|
||||||
SelectorParam("fields", labels.Set{"DesiredState.Host": ""}.AsSelector()).
|
}
|
||||||
UintParam("resourceVersion", resourceVersion).
|
|
||||||
Watch()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// createUnassignedPodWatch starts a watch that finds all pods that are
|
func parseSelectorOrDie(s string) labels.Selector {
|
||||||
|
selector, err := labels.ParseSelector(s)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return selector
|
||||||
|
}
|
||||||
|
|
||||||
|
// createUnassignedPodLW returns a listWatch that finds all pods that are
|
||||||
// already scheduled.
|
// already scheduled.
|
||||||
func (factory *ConfigFactory) createAssignedPodWatch(resourceVersion uint64) (watch.Interface, error) {
|
func (factory *ConfigFactory) createAssignedPodLW() *listWatch {
|
||||||
return factory.Client.
|
return &listWatch{
|
||||||
Get().
|
client: factory.Client,
|
||||||
Path("watch").
|
fieldSelector: parseSelectorOrDie("DesiredState.Host!="),
|
||||||
Path("pods").
|
resource: "pods",
|
||||||
ParseSelectorParam("fields", "DesiredState.Host!=").
|
}
|
||||||
UintParam("resourceVersion", resourceVersion).
|
|
||||||
Watch()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// createMinionWatch starts a watch that gets all changes to minions.
|
// createMinionLW returns a listWatch that gets all changes to minions.
|
||||||
func (factory *ConfigFactory) createMinionWatch(resourceVersion uint64) (watch.Interface, error) {
|
func (factory *ConfigFactory) createMinionLW() *listWatch {
|
||||||
return factory.Client.
|
return &listWatch{
|
||||||
Get().
|
client: factory.Client,
|
||||||
Path("watch").
|
fieldSelector: parseSelectorOrDie(""),
|
||||||
Path("minions").
|
resource: "minions",
|
||||||
UintParam("resourceVersion", resourceVersion).
|
}
|
||||||
Watch()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// pollMinions lists all minions and returns an enumerator for cache.Poller.
|
// pollMinions lists all minions and returns an enumerator for cache.Poller.
|
||||||
|
@@ -30,7 +30,6 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestCreate(t *testing.T) {
|
func TestCreate(t *testing.T) {
|
||||||
@@ -45,42 +44,26 @@ func TestCreate(t *testing.T) {
|
|||||||
factory.Create()
|
factory.Create()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCreateWatches(t *testing.T) {
|
func TestCreateLists(t *testing.T) {
|
||||||
factory := ConfigFactory{nil}
|
factory := ConfigFactory{nil}
|
||||||
table := []struct {
|
table := []struct {
|
||||||
rv uint64
|
location string
|
||||||
location string
|
factory func() *listWatch
|
||||||
watchFactory func(rv uint64) (watch.Interface, error)
|
|
||||||
}{
|
}{
|
||||||
// Minion watch
|
// Minion
|
||||||
{
|
{
|
||||||
rv: 0,
|
location: "/api/v1beta1/minions?fields=",
|
||||||
location: "/api/v1beta1/watch/minions?resourceVersion=0",
|
factory: factory.createMinionLW,
|
||||||
watchFactory: factory.createMinionWatch,
|
|
||||||
}, {
|
|
||||||
rv: 42,
|
|
||||||
location: "/api/v1beta1/watch/minions?resourceVersion=42",
|
|
||||||
watchFactory: factory.createMinionWatch,
|
|
||||||
},
|
},
|
||||||
// Assigned pod watches
|
// Assigned pod
|
||||||
{
|
{
|
||||||
rv: 0,
|
location: "/api/v1beta1/pods?fields=DesiredState.Host!%3D",
|
||||||
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=0",
|
factory: factory.createAssignedPodLW,
|
||||||
watchFactory: factory.createAssignedPodWatch,
|
|
||||||
}, {
|
|
||||||
rv: 42,
|
|
||||||
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42",
|
|
||||||
watchFactory: factory.createAssignedPodWatch,
|
|
||||||
},
|
},
|
||||||
// Unassigned pod watches
|
// Unassigned pod
|
||||||
{
|
{
|
||||||
rv: 0,
|
location: "/api/v1beta1/pods?fields=DesiredState.Host%3D",
|
||||||
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=0",
|
factory: factory.createUnassignedPodLW,
|
||||||
watchFactory: factory.createUnassignedPodWatch,
|
|
||||||
}, {
|
|
||||||
rv: 42,
|
|
||||||
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42",
|
|
||||||
watchFactory: factory.createUnassignedPodWatch,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -93,7 +76,60 @@ func TestCreateWatches(t *testing.T) {
|
|||||||
server := httptest.NewServer(&handler)
|
server := httptest.NewServer(&handler)
|
||||||
factory.Client = client.NewOrDie(server.URL, nil)
|
factory.Client = client.NewOrDie(server.URL, nil)
|
||||||
// This test merely tests that the correct request is made.
|
// This test merely tests that the correct request is made.
|
||||||
item.watchFactory(item.rv)
|
item.factory().List()
|
||||||
|
handler.ValidateRequest(t, item.location, "GET", nil)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCreateWatches(t *testing.T) {
|
||||||
|
factory := ConfigFactory{nil}
|
||||||
|
table := []struct {
|
||||||
|
rv uint64
|
||||||
|
location string
|
||||||
|
factory func() *listWatch
|
||||||
|
}{
|
||||||
|
// Minion watch
|
||||||
|
{
|
||||||
|
rv: 0,
|
||||||
|
location: "/api/v1beta1/watch/minions?fields=&resourceVersion=0",
|
||||||
|
factory: factory.createMinionLW,
|
||||||
|
}, {
|
||||||
|
rv: 42,
|
||||||
|
location: "/api/v1beta1/watch/minions?fields=&resourceVersion=42",
|
||||||
|
factory: factory.createMinionLW,
|
||||||
|
},
|
||||||
|
// Assigned pod watches
|
||||||
|
{
|
||||||
|
rv: 0,
|
||||||
|
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=0",
|
||||||
|
factory: factory.createAssignedPodLW,
|
||||||
|
}, {
|
||||||
|
rv: 42,
|
||||||
|
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42",
|
||||||
|
factory: factory.createAssignedPodLW,
|
||||||
|
},
|
||||||
|
// Unassigned pod watches
|
||||||
|
{
|
||||||
|
rv: 0,
|
||||||
|
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=0",
|
||||||
|
factory: factory.createUnassignedPodLW,
|
||||||
|
}, {
|
||||||
|
rv: 42,
|
||||||
|
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42",
|
||||||
|
factory: factory.createUnassignedPodLW,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, item := range table {
|
||||||
|
handler := util.FakeHandler{
|
||||||
|
StatusCode: 500,
|
||||||
|
ResponseBody: "",
|
||||||
|
T: t,
|
||||||
|
}
|
||||||
|
server := httptest.NewServer(&handler)
|
||||||
|
factory.Client = client.NewOrDie(server.URL, nil)
|
||||||
|
// This test merely tests that the correct request is made.
|
||||||
|
item.factory().Watch(item.rv)
|
||||||
handler.ValidateRequest(t, item.location, "GET", nil)
|
handler.ValidateRequest(t, item.location, "GET", nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user