Removal of fakeClient from etcd_watcher_test in leiu of NewEtcdTestClientServer
This commit is contained in:
		| @@ -17,19 +17,15 @@ limitations under the License. | |||||||
| package etcd | package etcd | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"fmt" | 	rt "runtime" | ||||||
| 	"testing" | 	"testing" | ||||||
| 	"time" |  | ||||||
|  |  | ||||||
| 	"github.com/coreos/go-etcd/etcd" | 	"github.com/coreos/go-etcd/etcd" | ||||||
| 	"k8s.io/kubernetes/pkg/api" | 	"k8s.io/kubernetes/pkg/api" | ||||||
| 	"k8s.io/kubernetes/pkg/api/testapi" | 	"k8s.io/kubernetes/pkg/api/testapi" | ||||||
| 	"k8s.io/kubernetes/pkg/api/unversioned" |  | ||||||
| 	"k8s.io/kubernetes/pkg/runtime" | 	"k8s.io/kubernetes/pkg/runtime" | ||||||
| 	"k8s.io/kubernetes/pkg/storage" | 	"k8s.io/kubernetes/pkg/storage" | ||||||
| 	"k8s.io/kubernetes/pkg/tools" |  | ||||||
| 	"k8s.io/kubernetes/pkg/tools/etcdtest" | 	"k8s.io/kubernetes/pkg/tools/etcdtest" | ||||||
| 	"k8s.io/kubernetes/pkg/util" |  | ||||||
| 	"k8s.io/kubernetes/pkg/watch" | 	"k8s.io/kubernetes/pkg/watch" | ||||||
|  |  | ||||||
| 	"golang.org/x/net/context" | 	"golang.org/x/net/context" | ||||||
| @@ -218,62 +214,45 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | /* TODO: So believe it or not... but this test is flakey with the go-etcd client library | ||||||
|  |  * which I'm surprised by.  Apprently you can close the client that is performing the watch | ||||||
|  |  * and the watch *never returns.*  I would like to still keep this test here and re-enable | ||||||
|  |  * with the new 2.2+ client library. | ||||||
| func TestWatchEtcdError(t *testing.T) { | func TestWatchEtcdError(t *testing.T) { | ||||||
| 	codec := testapi.Default.Codec() | 	codec := testapi.Default.Codec() | ||||||
| 	fakeClient := tools.NewFakeEtcdClient(t) | 	server := NewEtcdTestClientServer(t) | ||||||
| 	fakeClient.ExpectNotFoundGet("/some/key") | 	h := newEtcdHelper(server.client, codec, etcdtest.PathPrefix()) | ||||||
| 	fakeClient.WatchImmediateError = fmt.Errorf("immediate error") |  | ||||||
| 	h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) |  | ||||||
|  |  | ||||||
| 	watching, err := h.Watch(context.TODO(), "/some/key", 4, storage.Everything) | 	watching, err := h.Watch(context.TODO(), "/some/key", 4, storage.Everything) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("Unexpected error: %v", err) | 		t.Fatalf("Unexpected error: %v", err) | ||||||
| 	} | 	} | ||||||
| 	defer watching.Stop() | 	server.Terminate(t) | ||||||
|  |  | ||||||
| 	got := <-watching.ResultChan() | 	got := <-watching.ResultChan() | ||||||
| 	if got.Type != watch.Error { | 	if got.Type != watch.Error { | ||||||
| 		t.Fatalf("Unexpected non-error") | 		t.Fatalf("Unexpected non-error") | ||||||
| 	} | 	} | ||||||
| 	status, ok := got.Object.(*unversioned.Status) | 	watching.Stop() | ||||||
| 	if !ok { | } */ | ||||||
| 		t.Fatalf("Unexpected non-error object type") |  | ||||||
| 	} |  | ||||||
| 	if status.Message != "immediate error" { |  | ||||||
| 		t.Errorf("Unexpected wrong error") |  | ||||||
| 	} |  | ||||||
| 	if status.Status != unversioned.StatusFailure { |  | ||||||
| 		t.Errorf("Unexpected wrong error status") |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func TestWatch(t *testing.T) { | func TestWatch(t *testing.T) { | ||||||
| 	codec := testapi.Default.Codec() | 	codec := testapi.Default.Codec() | ||||||
| 	fakeClient := tools.NewFakeEtcdClient(t) | 	server := NewEtcdTestClientServer(t) | ||||||
|  | 	defer server.Terminate(t) | ||||||
| 	key := "/some/key" | 	key := "/some/key" | ||||||
| 	prefixedKey := etcdtest.AddPrefix(key) | 	h := newEtcdHelper(server.client, codec, etcdtest.PathPrefix()) | ||||||
| 	fakeClient.ExpectNotFoundGet(prefixedKey) |  | ||||||
| 	h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) |  | ||||||
|  |  | ||||||
| 	watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) | 	watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("Unexpected error: %v", err) | 		t.Fatalf("Unexpected error: %v", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	fakeClient.WaitForWatchCompletion() |  | ||||||
| 	// when server returns not found, the watch index starts at the next value (1) |  | ||||||
| 	if fakeClient.WatchIndex != 1 { |  | ||||||
| 		t.Errorf("Expected client to be at index %d, got %#v", 1, fakeClient) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Test normal case | 	// Test normal case | ||||||
| 	pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} | 	pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} | ||||||
| 	podBytes, _ := codec.Encode(pod) | 	returnObj := &api.Pod{} | ||||||
| 	fakeClient.WatchResponse <- &etcd.Response{ | 	err = h.Set(context.TODO(), key, pod, returnObj, 0) | ||||||
| 		Action: "set", | 	if err != nil { | ||||||
| 		Node: &etcd.Node{ | 		t.Fatalf("Unexpected error: %v", err) | ||||||
| 			Value: string(podBytes), |  | ||||||
| 		}, |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	event := <-watching.ResultChan() | 	event := <-watching.ResultChan() | ||||||
| @@ -284,24 +263,8 @@ func TestWatch(t *testing.T) { | |||||||
| 		t.Errorf("Expected %v, got %v", e, a) | 		t.Errorf("Expected %v, got %v", e, a) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Test error case | 	watching.Stop() | ||||||
| 	fakeClient.WatchInjectError <- fmt.Errorf("Injected error") |  | ||||||
|  |  | ||||||
| 	if errEvent, ok := <-watching.ResultChan(); !ok { |  | ||||||
| 		t.Errorf("no error result?") |  | ||||||
| 	} else { |  | ||||||
| 		if e, a := watch.Error, errEvent.Type; e != a { |  | ||||||
| 			t.Errorf("Expected %v, got %v", e, a) |  | ||||||
| 		} |  | ||||||
| 		if e, a := "Injected error", errEvent.Object.(*unversioned.Status).Message; e != a { |  | ||||||
| 			t.Errorf("Expected %v, got %v", e, a) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Did everything shut down? |  | ||||||
| 	if _, open := <-fakeClient.WatchResponse; open { |  | ||||||
| 		t.Errorf("An injected error did not cause a graceful shutdown") |  | ||||||
| 	} |  | ||||||
| 	if _, open := <-watching.ResultChan(); open { | 	if _, open := <-watching.ResultChan(); open { | ||||||
| 		t.Errorf("An injected error did not cause a graceful shutdown") | 		t.Errorf("An injected error did not cause a graceful shutdown") | ||||||
| 	} | 	} | ||||||
| @@ -320,405 +283,171 @@ func makeSubsets(ip string, port int) []api.EndpointSubset { | |||||||
|  |  | ||||||
| func TestWatchEtcdState(t *testing.T) { | func TestWatchEtcdState(t *testing.T) { | ||||||
| 	codec := testapi.Default.Codec() | 	codec := testapi.Default.Codec() | ||||||
| 	baseKey := "/somekey/foo" | 	key := etcdtest.AddPrefix("/somekey/foo") | ||||||
| 	prefixedKey := etcdtest.AddPrefix(baseKey) | 	server := NewEtcdTestClientServer(t) | ||||||
| 	type T struct { | 	defer server.Terminate(t) | ||||||
| 		Type      watch.EventType |  | ||||||
| 		Endpoints []api.EndpointSubset |  | ||||||
| 	} |  | ||||||
| 	testCases := map[string]struct { |  | ||||||
| 		Initial   map[string]tools.EtcdResponseWithError |  | ||||||
| 		Responses []*etcd.Response |  | ||||||
| 		From      uint64 |  | ||||||
| 		Expected  []*T |  | ||||||
| 	}{ |  | ||||||
| 		"from not found": { |  | ||||||
| 			Initial: map[string]tools.EtcdResponseWithError{}, |  | ||||||
| 			Responses: []*etcd.Response{ |  | ||||||
| 				{ |  | ||||||
| 					Action: "create", |  | ||||||
| 					Node: &etcd.Node{ |  | ||||||
| 						Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ |  | ||||||
| 							ObjectMeta: api.ObjectMeta{Name: "foo"}, |  | ||||||
| 							Subsets:    emptySubsets(), |  | ||||||
| 						})), |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 			From: 1, |  | ||||||
| 			Expected: []*T{ |  | ||||||
| 				{watch.Added, nil}, |  | ||||||
| 			}, |  | ||||||
| 		}, |  | ||||||
| 		"from version 1": { |  | ||||||
| 			Responses: []*etcd.Response{ |  | ||||||
| 				{ |  | ||||||
| 					Action: "compareAndSwap", |  | ||||||
| 					Node: &etcd.Node{ |  | ||||||
| 						Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ |  | ||||||
| 							ObjectMeta: api.ObjectMeta{Name: "foo"}, |  | ||||||
| 							Subsets:    makeSubsets("127.0.0.1", 9000), |  | ||||||
| 						})), |  | ||||||
| 						CreatedIndex:  1, |  | ||||||
| 						ModifiedIndex: 2, |  | ||||||
| 					}, |  | ||||||
| 					PrevNode: &etcd.Node{ |  | ||||||
| 						Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ |  | ||||||
| 							ObjectMeta: api.ObjectMeta{Name: "foo"}, |  | ||||||
| 							Subsets:    emptySubsets(), |  | ||||||
| 						})), |  | ||||||
| 						CreatedIndex:  1, |  | ||||||
| 						ModifiedIndex: 1, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 			From: 1, |  | ||||||
| 			Expected: []*T{ |  | ||||||
| 				{watch.Modified, makeSubsets("127.0.0.1", 9000)}, |  | ||||||
| 			}, |  | ||||||
| 		}, |  | ||||||
| 		"from initial state": { |  | ||||||
| 			Initial: map[string]tools.EtcdResponseWithError{ |  | ||||||
| 				prefixedKey: { |  | ||||||
| 					R: &etcd.Response{ |  | ||||||
| 						Action: "get", |  | ||||||
| 						Node: &etcd.Node{ |  | ||||||
| 							Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ |  | ||||||
| 								ObjectMeta: api.ObjectMeta{Name: "foo"}, |  | ||||||
| 								Subsets:    emptySubsets(), |  | ||||||
| 							})), |  | ||||||
| 							CreatedIndex:  1, |  | ||||||
| 							ModifiedIndex: 1, |  | ||||||
| 						}, |  | ||||||
| 						EtcdIndex: 1, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 			Responses: []*etcd.Response{ |  | ||||||
| 				nil, |  | ||||||
| 				{ |  | ||||||
| 					Action: "compareAndSwap", |  | ||||||
| 					Node: &etcd.Node{ |  | ||||||
| 						Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ |  | ||||||
| 							ObjectMeta: api.ObjectMeta{Name: "foo"}, |  | ||||||
| 							Subsets:    makeSubsets("127.0.0.1", 9000), |  | ||||||
| 						})), |  | ||||||
| 						CreatedIndex:  1, |  | ||||||
| 						ModifiedIndex: 2, |  | ||||||
| 					}, |  | ||||||
| 					PrevNode: &etcd.Node{ |  | ||||||
| 						Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ |  | ||||||
| 							ObjectMeta: api.ObjectMeta{Name: "foo"}, |  | ||||||
| 							Subsets:    emptySubsets(), |  | ||||||
| 						})), |  | ||||||
| 						CreatedIndex:  1, |  | ||||||
| 						ModifiedIndex: 1, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 			Expected: []*T{ |  | ||||||
| 				{watch.Added, nil}, |  | ||||||
| 				{watch.Modified, makeSubsets("127.0.0.1", 9000)}, |  | ||||||
| 			}, |  | ||||||
| 		}, |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	for k, testCase := range testCases { | 	h := newEtcdHelper(server.client, codec, etcdtest.PathPrefix()) | ||||||
| 		fakeClient := tools.NewFakeEtcdClient(t) | 	watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) | ||||||
| 		for key, value := range testCase.Initial { |  | ||||||
| 			fakeClient.Data[key] = value |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) |  | ||||||
| 		watching, err := h.Watch(context.TODO(), baseKey, testCase.From, storage.Everything) |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("Unexpected error: %v", err) | 		t.Fatalf("Unexpected error: %v", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 		fakeClient.WaitForWatchCompletion() | 	endpoint := &api.Endpoints{ | ||||||
|  | 		ObjectMeta: api.ObjectMeta{Name: "foo"}, | ||||||
|  | 		Subsets:    emptySubsets(), | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 		t.Logf("Testing %v", k) | 	err = h.Set(context.TODO(), key, endpoint, endpoint, 0) | ||||||
| 		for i := range testCase.Responses { | 	if err != nil { | ||||||
| 			if testCase.Responses[i] != nil { | 		t.Fatalf("Unexpected error: %v", err) | ||||||
| 				fakeClient.WatchResponse <- testCase.Responses[i] |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	event := <-watching.ResultChan() | 	event := <-watching.ResultChan() | ||||||
| 			if e, a := testCase.Expected[i].Type, event.Type; e != a { | 	if event.Type != watch.Added { | ||||||
| 				t.Errorf("%s: expected type %v, got %v", k, e, a) | 		t.Errorf("Unexpected event %#v", event) | ||||||
| 				break |  | ||||||
| 	} | 	} | ||||||
| 			if e, a := testCase.Expected[i].Endpoints, event.Object.(*api.Endpoints).Subsets; !api.Semantic.DeepDerivative(e, a) { |  | ||||||
| 				t.Errorf("%s: expected type %v, got %v", k, e, a) | 	subset := makeSubsets("127.0.0.1", 9000) | ||||||
| 				break | 	endpoint.Subsets = subset | ||||||
|  |  | ||||||
|  | 	// CAS the previous value | ||||||
|  | 	err = h.Set(context.TODO(), key, endpoint, endpoint, 0) | ||||||
|  | 	if err != nil { | ||||||
|  | 		t.Fatalf("Unexpected error: %v", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	event = <-watching.ResultChan() | ||||||
|  | 	if event.Type != watch.Modified { | ||||||
|  | 		t.Errorf("Unexpected event %#v", event) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	if e, a := endpoint, event.Object; !api.Semantic.DeepDerivative(e, a) { | ||||||
|  | 		t.Errorf("%s: expected %v, got %v", e, a) | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	watching.Stop() | 	watching.Stop() | ||||||
| 	} |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestWatchFromZeroIndex(t *testing.T) { | func TestWatchFromZeroIndex(t *testing.T) { | ||||||
| 	codec := testapi.Default.Codec() | 	codec := testapi.Default.Codec() | ||||||
| 	pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} | 	pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} | ||||||
|  |  | ||||||
| 	testCases := map[string]struct { | 	key := etcdtest.AddPrefix("/somekey/foo") | ||||||
| 		Response        tools.EtcdResponseWithError | 	server := NewEtcdTestClientServer(t) | ||||||
| 		ExpectedVersion string | 	defer server.Terminate(t) | ||||||
| 		ExpectedType    watch.EventType |  | ||||||
| 	}{ | 	h := newEtcdHelper(server.client, codec, etcdtest.PathPrefix()) | ||||||
| 		"get value created": { |  | ||||||
| 			tools.EtcdResponseWithError{ | 	// set before the watch and verify events | ||||||
| 				R: &etcd.Response{ | 	err := h.Set(context.TODO(), key, pod, pod, 0) | ||||||
| 					Node: &etcd.Node{ | 	if err != nil { | ||||||
| 						Value:         runtime.EncodeOrDie(codec, pod), | 		t.Fatalf("Unexpected error: %v", err) | ||||||
| 						CreatedIndex:  1, |  | ||||||
| 						ModifiedIndex: 1, |  | ||||||
| 					}, |  | ||||||
| 					Action:    "get", |  | ||||||
| 					EtcdIndex: 2, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 			"1", |  | ||||||
| 			watch.Added, |  | ||||||
| 		}, |  | ||||||
| 		"get value modified": { |  | ||||||
| 			tools.EtcdResponseWithError{ |  | ||||||
| 				R: &etcd.Response{ |  | ||||||
| 					Node: &etcd.Node{ |  | ||||||
| 						Value:         runtime.EncodeOrDie(codec, pod), |  | ||||||
| 						CreatedIndex:  1, |  | ||||||
| 						ModifiedIndex: 2, |  | ||||||
| 					}, |  | ||||||
| 					Action:    "get", |  | ||||||
| 					EtcdIndex: 3, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 			"2", |  | ||||||
| 			watch.Modified, |  | ||||||
| 		}, |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	for k, testCase := range testCases { | 	// check for concatenation on watch event with CAS | ||||||
| 		fakeClient := tools.NewFakeEtcdClient(t) | 	pod.Name = "bar" | ||||||
| 		key := "/some/key" | 	err = h.Set(context.TODO(), key, pod, pod, 0) | ||||||
| 		prefixedKey := etcdtest.AddPrefix(key) | 	if err != nil { | ||||||
| 		fakeClient.Data[prefixedKey] = testCase.Response | 		t.Fatalf("Unexpected error: %v", err) | ||||||
| 		h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) | 	} | ||||||
|  |  | ||||||
| 	watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) | 	watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("Unexpected error: %v", err) | 		t.Fatalf("Unexpected error: %v", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 		fakeClient.WaitForWatchCompletion() | 	// marked as modified b/c of concatenation | ||||||
| 		if e, a := testCase.Response.R.EtcdIndex+1, fakeClient.WatchIndex; e != a { | 	event := <-watching.ResultChan() | ||||||
| 			t.Errorf("%s: expected watch index to be %d, got %d", k, e, a) | 	if event.Type != watch.Modified { | ||||||
|  | 		t.Errorf("Unexpected event %#v", event) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 		// the existing node is detected and the index set | 	err = h.Set(context.TODO(), key, pod, pod, 0) | ||||||
| 		event := <-watching.ResultChan() | 	if err != nil { | ||||||
| 		if e, a := testCase.ExpectedType, event.Type; e != a { | 		t.Fatalf("Unexpected error: %v", err) | ||||||
| 			t.Errorf("%s: expected %v, got %v", k, e, a) |  | ||||||
| 	} | 	} | ||||||
| 		actualPod, ok := event.Object.(*api.Pod) |  | ||||||
| 		if !ok { | 	event = <-watching.ResultChan() | ||||||
| 			t.Fatalf("%s: expected a pod, got %#v", k, event.Object) | 	if event.Type != watch.Modified { | ||||||
|  | 		t.Errorf("Unexpected event %#v", event) | ||||||
| 	} | 	} | ||||||
| 		if actualPod.ResourceVersion != testCase.ExpectedVersion { |  | ||||||
| 			t.Errorf("%s: expected pod with resource version %v, Got %#v", k, testCase.ExpectedVersion, actualPod) |  | ||||||
| 		} |  | ||||||
| 		pod.ResourceVersion = testCase.ExpectedVersion |  | ||||||
| 	if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) { | 	if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) { | ||||||
| 			t.Errorf("%s: expected %v, got %v", k, e, a) | 		t.Errorf("%s: expected %v, got %v", e, a) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	watching.Stop() | 	watching.Stop() | ||||||
| 	} |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestWatchListFromZeroIndex(t *testing.T) { | func TestWatchListFromZeroIndex(t *testing.T) { | ||||||
| 	codec := testapi.Default.Codec() | 	codec := testapi.Default.Codec() | ||||||
| 	pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} | 	key := etcdtest.AddPrefix("/some/key") | ||||||
| 	key := "/some/key" | 	server := NewEtcdTestClientServer(t) | ||||||
| 	prefixedKey := etcdtest.AddPrefix(key) | 	defer server.Terminate(t) | ||||||
| 	fakeClient := tools.NewFakeEtcdClient(t) | 	h := newEtcdHelper(server.client, codec, key) | ||||||
| 	fakeClient.Data[prefixedKey] = tools.EtcdResponseWithError{ |  | ||||||
| 		R: &etcd.Response{ |  | ||||||
| 			Node: &etcd.Node{ |  | ||||||
| 				Dir: true, |  | ||||||
| 				Nodes: etcd.Nodes{ |  | ||||||
| 					&etcd.Node{ |  | ||||||
| 						Value:         runtime.EncodeOrDie(codec, pod), |  | ||||||
| 						CreatedIndex:  1, |  | ||||||
| 						ModifiedIndex: 1, |  | ||||||
| 						Nodes:         etcd.Nodes{}, |  | ||||||
| 					}, |  | ||||||
| 					&etcd.Node{ |  | ||||||
| 						Value:         runtime.EncodeOrDie(codec, pod), |  | ||||||
| 						CreatedIndex:  2, |  | ||||||
| 						ModifiedIndex: 2, |  | ||||||
| 						Nodes:         etcd.Nodes{}, |  | ||||||
| 					}, |  | ||||||
| 				}, |  | ||||||
| 			}, |  | ||||||
| 			Action:    "get", |  | ||||||
| 			EtcdIndex: 3, |  | ||||||
| 		}, |  | ||||||
| 	} |  | ||||||
| 	h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) |  | ||||||
|  |  | ||||||
| 	watching, err := h.WatchList(context.TODO(), key, 0, storage.Everything) | 	watching, err := h.WatchList(context.TODO(), key, 0, storage.Everything) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("Unexpected error: %v", err) | 		t.Fatalf("Unexpected error: %v", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// the existing node is detected and the index set | 	// creates key/foo which should trigger the WatchList for "key" | ||||||
| 	event, open := <-watching.ResultChan() | 	pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} | ||||||
| 	if !open { | 	err = h.Create(context.TODO(), pod.Name, pod, pod, 0) | ||||||
| 		t.Fatalf("unexpected channel close") | 	if err != nil { | ||||||
| 	} | 		t.Fatalf("Unexpected error: %v", err) | ||||||
| 	for i := 0; i < 2; i++ { | 	} | ||||||
| 		if e, a := watch.Added, event.Type; e != a { |  | ||||||
| 			t.Errorf("Expected %v, got %v", e, a) | 	event, _ := <-watching.ResultChan() | ||||||
| 		} | 	if event.Type != watch.Added { | ||||||
| 		actualPod, ok := event.Object.(*api.Pod) | 		t.Errorf("Unexpected event %#v", event) | ||||||
| 		if !ok { | 	} | ||||||
| 			t.Fatalf("expected a pod, got %#v", event.Object) |  | ||||||
| 		} | 	if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) { | ||||||
| 		if actualPod.ResourceVersion != "1" { | 		t.Errorf("%s: expected %v, got %v", e, a) | ||||||
| 			t.Errorf("Expected pod with resource version %d, Got %#v", 1, actualPod) |  | ||||||
| 		} |  | ||||||
| 		pod.ResourceVersion = "1" |  | ||||||
| 		if e, a := pod, event.Object; !api.Semantic.DeepDerivative(e, a) { |  | ||||||
| 			t.Errorf("Expected %v, got %v", e, a) |  | ||||||
| 		} |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	fakeClient.WaitForWatchCompletion() |  | ||||||
| 	watching.Stop() | 	watching.Stop() | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestWatchListIgnoresRootKey(t *testing.T) { | func TestWatchListIgnoresRootKey(t *testing.T) { | ||||||
| 	codec := testapi.Default.Codec() | 	codec := testapi.Default.Codec() | ||||||
| 	pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} | 	pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} | ||||||
| 	key := "/some/key" | 	key := etcdtest.AddPrefix("/some/key") | ||||||
| 	prefixedKey := etcdtest.AddPrefix(key) | 	server := NewEtcdTestClientServer(t) | ||||||
|  | 	defer server.Terminate(t) | ||||||
|  | 	h := newEtcdHelper(server.client, codec, key) | ||||||
|  |  | ||||||
| 	fakeClient := tools.NewFakeEtcdClient(t) | 	watching, err := h.WatchList(context.TODO(), key, 0, storage.Everything) | ||||||
| 	h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) |  | ||||||
|  |  | ||||||
| 	watching, err := h.WatchList(context.TODO(), key, 1, storage.Everything) |  | ||||||
| 	if err != nil { |  | ||||||
| 		t.Fatalf("Unexpected error: %v", err) |  | ||||||
| 	} |  | ||||||
| 	fakeClient.WaitForWatchCompletion() |  | ||||||
|  |  | ||||||
| 	// This is the root directory of the watch, which happens to have a value encoded |  | ||||||
| 	fakeClient.WatchResponse <- &etcd.Response{ |  | ||||||
| 		Action: "delete", |  | ||||||
| 		PrevNode: &etcd.Node{ |  | ||||||
| 			Key:           prefixedKey, |  | ||||||
| 			Value:         runtime.EncodeOrDie(codec, pod), |  | ||||||
| 			CreatedIndex:  1, |  | ||||||
| 			ModifiedIndex: 1, |  | ||||||
| 		}, |  | ||||||
| 	} |  | ||||||
| 	// Delete of the parent directory of a key is an event that a list watch would receive, |  | ||||||
| 	// but will have no value so the decode will fail. |  | ||||||
| 	fakeClient.WatchResponse <- &etcd.Response{ |  | ||||||
| 		Action: "delete", |  | ||||||
| 		PrevNode: &etcd.Node{ |  | ||||||
| 			Key:           prefixedKey, |  | ||||||
| 			Value:         "", |  | ||||||
| 			CreatedIndex:  1, |  | ||||||
| 			ModifiedIndex: 1, |  | ||||||
| 		}, |  | ||||||
| 	} |  | ||||||
| 	close(fakeClient.WatchStop) |  | ||||||
|  |  | ||||||
| 	// the existing node is detected and the index set |  | ||||||
| 	_, open := <-watching.ResultChan() |  | ||||||
| 	if open { |  | ||||||
| 		t.Fatalf("unexpected channel open") |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	watching.Stop() |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func TestWatchFromNotFound(t *testing.T) { |  | ||||||
| 	fakeClient := tools.NewFakeEtcdClient(t) |  | ||||||
| 	key := "/some/key" |  | ||||||
| 	prefixedKey := etcdtest.AddPrefix(key) |  | ||||||
| 	fakeClient.Data[prefixedKey] = tools.EtcdResponseWithError{ |  | ||||||
| 		R: &etcd.Response{ |  | ||||||
| 			Node: nil, |  | ||||||
| 		}, |  | ||||||
| 		E: &etcd.EtcdError{ |  | ||||||
| 			Index:     2, |  | ||||||
| 			ErrorCode: 100, |  | ||||||
| 		}, |  | ||||||
| 	} |  | ||||||
| 	h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) |  | ||||||
|  |  | ||||||
| 	watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) |  | ||||||
| 	if err != nil { |  | ||||||
| 		t.Fatalf("Unexpected error: %v", err) |  | ||||||
| 	} |  | ||||||
| 	fakeClient.WaitForWatchCompletion() |  | ||||||
| 	if fakeClient.WatchIndex != 3 { |  | ||||||
| 		t.Errorf("Expected client to wait for %d, got %#v", 3, fakeClient) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	watching.Stop() |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func TestWatchFromOtherError(t *testing.T) { |  | ||||||
| 	fakeClient := tools.NewFakeEtcdClient(t) |  | ||||||
| 	key := "/some/key" |  | ||||||
| 	prefixedKey := etcdtest.AddPrefix(key) |  | ||||||
| 	fakeClient.Data[prefixedKey] = tools.EtcdResponseWithError{ |  | ||||||
| 		R: &etcd.Response{ |  | ||||||
| 			Node: nil, |  | ||||||
| 		}, |  | ||||||
| 		E: &etcd.EtcdError{ |  | ||||||
| 			Index:     2, |  | ||||||
| 			ErrorCode: 101, |  | ||||||
| 		}, |  | ||||||
| 	} |  | ||||||
| 	h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) |  | ||||||
|  |  | ||||||
| 	watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("Unexpected error: %v", err) | 		t.Fatalf("Unexpected error: %v", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	errEvent := <-watching.ResultChan() | 	// creates key/foo which should trigger the WatchList for "key" | ||||||
| 	if e, a := watch.Error, errEvent.Type; e != a { | 	err = h.Create(context.TODO(), key, pod, pod, 0) | ||||||
| 		t.Errorf("Expected %v, got %v", e, a) | 	if err != nil { | ||||||
| 	} | 		t.Fatalf("Unexpected error: %v", err) | ||||||
| 	if e, a := "101:  () [2]", errEvent.Object.(*unversioned.Status).Message; e != a { |  | ||||||
| 		t.Errorf("Expected %v, got %v", e, a) |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// force context switch to ensure watches would catch and notify. | ||||||
|  | 	rt.Gosched() | ||||||
|  |  | ||||||
| 	select { | 	select { | ||||||
| 	case _, ok := <-watching.ResultChan(): | 	case event, _ := <-watching.ResultChan(): | ||||||
| 		if ok { | 		t.Fatalf("Unexpected event: %#v", event) | ||||||
| 			t.Fatalf("expected result channel to be closed") | 	default: | ||||||
| 		} | 		// fall through, expected behavior | ||||||
| 	case <-time.After(util.ForeverTestTimeout): |  | ||||||
| 		t.Fatalf("watch should have closed channel: %#v", watching) |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if fakeClient.WatchResponse != nil || fakeClient.WatchIndex != 0 { | 	watching.Stop() | ||||||
| 		t.Fatalf("Watch should not have been invoked: %#v", fakeClient) |  | ||||||
| 	} |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestWatchPurposefulShutdown(t *testing.T) { | func TestWatchPurposefulShutdown(t *testing.T) { | ||||||
| 	fakeClient := tools.NewFakeEtcdClient(t) | 	server := NewEtcdTestClientServer(t) | ||||||
|  | 	defer server.Terminate(t) | ||||||
| 	h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) |  | ||||||
| 	key := "/some/key" | 	key := "/some/key" | ||||||
| 	prefixedKey := etcdtest.AddPrefix(key) | 	h := newEtcdHelper(server.client, codec, etcdtest.PathPrefix()) | ||||||
| 	fakeClient.ExpectNotFoundGet(prefixedKey) |  | ||||||
|  |  | ||||||
| 	// Test purposeful shutdown | 	// Test purposeful shutdown | ||||||
| 	watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) | 	watching, err := h.Watch(context.TODO(), key, 0, storage.Everything) | ||||||
| @@ -726,14 +455,10 @@ func TestWatchPurposefulShutdown(t *testing.T) { | |||||||
| 		t.Fatalf("Unexpected error: %v", err) | 		t.Fatalf("Unexpected error: %v", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	fakeClient.WaitForWatchCompletion() |  | ||||||
| 	watching.Stop() | 	watching.Stop() | ||||||
|  | 	rt.Gosched() | ||||||
|  |  | ||||||
| 	// Did everything shut down? |  | ||||||
| 	if _, open := <-fakeClient.WatchResponse; open { |  | ||||||
| 		t.Errorf("A stop did not cause a graceful shutdown") |  | ||||||
| 	} |  | ||||||
| 	if _, open := <-watching.ResultChan(); open { | 	if _, open := <-watching.ResultChan(); open { | ||||||
| 		t.Errorf("An injected error did not cause a graceful shutdown") | 		t.Errorf("Channel should be closed") | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Timothy St. Clair
					Timothy St. Clair