allow watching old resources with kubectl
This commit is contained in:
		@@ -182,9 +182,20 @@ func RunGet(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []string
 | 
				
			|||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		rv, err := mapping.MetadataAccessor.ResourceVersion(obj)
 | 
					
 | 
				
			||||||
		if err != nil {
 | 
							// watching from resourceVersion 0, starts the watch at ~now and
 | 
				
			||||||
			return err
 | 
							// will return an initial watch event.  Starting form ~now, rather
 | 
				
			||||||
 | 
							// the rv of the object will insure that we start the watch from
 | 
				
			||||||
 | 
							// inside the watch window, which the rv of the object might not be.
 | 
				
			||||||
 | 
							rv := "0"
 | 
				
			||||||
 | 
							isList := meta.IsListType(obj)
 | 
				
			||||||
 | 
							if isList {
 | 
				
			||||||
 | 
								// the resourceVersion of list objects is ~now but won't return
 | 
				
			||||||
 | 
								// an initial watch event
 | 
				
			||||||
 | 
								rv, err = mapping.MetadataAccessor.ResourceVersion(obj)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									return err
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// print the current object
 | 
							// print the current object
 | 
				
			||||||
@@ -200,7 +211,13 @@ func RunGet(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []string
 | 
				
			|||||||
			return err
 | 
								return err
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							first := true
 | 
				
			||||||
		kubectl.WatchLoop(w, func(e watch.Event) error {
 | 
							kubectl.WatchLoop(w, func(e watch.Event) error {
 | 
				
			||||||
 | 
								if !isList && first {
 | 
				
			||||||
 | 
									// drop the initial watch event in the single resource case
 | 
				
			||||||
 | 
									first = false
 | 
				
			||||||
 | 
									return nil
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
			return printer.PrintObj(e.Object, out)
 | 
								return printer.PrintObj(e.Object, out)
 | 
				
			||||||
		})
 | 
							})
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -672,6 +672,14 @@ func TestGetByNameForcesFlag(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func watchTestData() ([]api.Pod, []watch.Event) {
 | 
					func watchTestData() ([]api.Pod, []watch.Event) {
 | 
				
			||||||
	pods := []api.Pod{
 | 
						pods := []api.Pod{
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								ObjectMeta: api.ObjectMeta{
 | 
				
			||||||
 | 
									Name:            "bar",
 | 
				
			||||||
 | 
									Namespace:       "test",
 | 
				
			||||||
 | 
									ResourceVersion: "9",
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
								Spec: apitesting.DeepEqualSafePodSpec(),
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			ObjectMeta: api.ObjectMeta{
 | 
								ObjectMeta: api.ObjectMeta{
 | 
				
			||||||
				Name:            "foo",
 | 
									Name:            "foo",
 | 
				
			||||||
@@ -682,6 +690,30 @@ func watchTestData() ([]api.Pod, []watch.Event) {
 | 
				
			|||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	events := []watch.Event{
 | 
						events := []watch.Event{
 | 
				
			||||||
 | 
							// current state events
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								Type: watch.Added,
 | 
				
			||||||
 | 
								Object: &api.Pod{
 | 
				
			||||||
 | 
									ObjectMeta: api.ObjectMeta{
 | 
				
			||||||
 | 
										Name:            "bar",
 | 
				
			||||||
 | 
										Namespace:       "test",
 | 
				
			||||||
 | 
										ResourceVersion: "9",
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
									Spec: apitesting.DeepEqualSafePodSpec(),
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								Type: watch.Added,
 | 
				
			||||||
 | 
								Object: &api.Pod{
 | 
				
			||||||
 | 
									ObjectMeta: api.ObjectMeta{
 | 
				
			||||||
 | 
										Name:            "foo",
 | 
				
			||||||
 | 
										Namespace:       "test",
 | 
				
			||||||
 | 
										ResourceVersion: "10",
 | 
				
			||||||
 | 
									},
 | 
				
			||||||
 | 
									Spec: apitesting.DeepEqualSafePodSpec(),
 | 
				
			||||||
 | 
								},
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							// resource events
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			Type: watch.Modified,
 | 
								Type: watch.Modified,
 | 
				
			||||||
			Object: &api.Pod{
 | 
								Object: &api.Pod{
 | 
				
			||||||
@@ -713,6 +745,12 @@ func TestWatchSelector(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	f, tf, codec, ns := NewAPIFactory()
 | 
						f, tf, codec, ns := NewAPIFactory()
 | 
				
			||||||
	tf.Printer = &testPrinter{}
 | 
						tf.Printer = &testPrinter{}
 | 
				
			||||||
 | 
						podList := &api.PodList{
 | 
				
			||||||
 | 
							Items: pods,
 | 
				
			||||||
 | 
							ListMeta: unversioned.ListMeta{
 | 
				
			||||||
 | 
								ResourceVersion: "10",
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	tf.Client = &fake.RESTClient{
 | 
						tf.Client = &fake.RESTClient{
 | 
				
			||||||
		NegotiatedSerializer: ns,
 | 
							NegotiatedSerializer: ns,
 | 
				
			||||||
		Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
 | 
							Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
 | 
				
			||||||
@@ -721,9 +759,9 @@ func TestWatchSelector(t *testing.T) {
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
			switch req.URL.Path {
 | 
								switch req.URL.Path {
 | 
				
			||||||
			case "/namespaces/test/pods":
 | 
								case "/namespaces/test/pods":
 | 
				
			||||||
				return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.PodList{Items: pods})}, nil
 | 
									return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, podList)}, nil
 | 
				
			||||||
			case "/watch/namespaces/test/pods":
 | 
								case "/watch/namespaces/test/pods":
 | 
				
			||||||
				return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events)}, nil
 | 
									return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events[2:])}, nil
 | 
				
			||||||
			default:
 | 
								default:
 | 
				
			||||||
				t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
 | 
									t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
 | 
				
			||||||
				return nil, nil
 | 
									return nil, nil
 | 
				
			||||||
@@ -740,10 +778,10 @@ func TestWatchSelector(t *testing.T) {
 | 
				
			|||||||
	cmd.Flags().Set("selector", "a=b")
 | 
						cmd.Flags().Set("selector", "a=b")
 | 
				
			||||||
	cmd.Run(cmd, []string{"pods"})
 | 
						cmd.Run(cmd, []string{"pods"})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	expected := []runtime.Object{&api.PodList{Items: pods}, events[0].Object, events[1].Object}
 | 
						expected := []runtime.Object{podList, events[2].Object, events[3].Object}
 | 
				
			||||||
	actual := tf.Printer.(*testPrinter).Objects
 | 
						actual := tf.Printer.(*testPrinter).Objects
 | 
				
			||||||
	if !reflect.DeepEqual(expected, actual) {
 | 
						if !reflect.DeepEqual(expected, actual) {
 | 
				
			||||||
		t.Errorf("unexpected object:\nExpected: %#v\n\nGot: %#v\n\n", expected[0], actual[0])
 | 
							t.Errorf("unexpected object:\nExpected: %#v\n\nGot: %#v\n\n", expected, actual)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if len(buf.String()) == 0 {
 | 
						if len(buf.String()) == 0 {
 | 
				
			||||||
		t.Errorf("unexpected empty output")
 | 
							t.Errorf("unexpected empty output")
 | 
				
			||||||
@@ -760,9 +798,9 @@ func TestWatchResource(t *testing.T) {
 | 
				
			|||||||
		Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
 | 
							Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
 | 
				
			||||||
			switch req.URL.Path {
 | 
								switch req.URL.Path {
 | 
				
			||||||
			case "/namespaces/test/pods/foo":
 | 
								case "/namespaces/test/pods/foo":
 | 
				
			||||||
				return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &pods[0])}, nil
 | 
									return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &pods[1])}, nil
 | 
				
			||||||
			case "/watch/namespaces/test/pods/foo":
 | 
								case "/watch/namespaces/test/pods/foo":
 | 
				
			||||||
				return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events)}, nil
 | 
									return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events[1:])}, nil
 | 
				
			||||||
			default:
 | 
								default:
 | 
				
			||||||
				t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
 | 
									t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
 | 
				
			||||||
				return nil, nil
 | 
									return nil, nil
 | 
				
			||||||
@@ -778,7 +816,7 @@ func TestWatchResource(t *testing.T) {
 | 
				
			|||||||
	cmd.Flags().Set("watch", "true")
 | 
						cmd.Flags().Set("watch", "true")
 | 
				
			||||||
	cmd.Run(cmd, []string{"pods", "foo"})
 | 
						cmd.Run(cmd, []string{"pods", "foo"})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	expected := []runtime.Object{&pods[0], events[0].Object, events[1].Object}
 | 
						expected := []runtime.Object{&pods[1], events[2].Object, events[3].Object}
 | 
				
			||||||
	actual := tf.Printer.(*testPrinter).Objects
 | 
						actual := tf.Printer.(*testPrinter).Objects
 | 
				
			||||||
	if !reflect.DeepEqual(expected, actual) {
 | 
						if !reflect.DeepEqual(expected, actual) {
 | 
				
			||||||
		t.Errorf("unexpected object:\nExpected: %#v\n\nGot: %#v\n\n", expected, actual)
 | 
							t.Errorf("unexpected object:\nExpected: %#v\n\nGot: %#v\n\n", expected, actual)
 | 
				
			||||||
@@ -798,9 +836,9 @@ func TestWatchResourceIdentifiedByFile(t *testing.T) {
 | 
				
			|||||||
		Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
 | 
							Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
 | 
				
			||||||
			switch req.URL.Path {
 | 
								switch req.URL.Path {
 | 
				
			||||||
			case "/namespaces/test/replicationcontrollers/cassandra":
 | 
								case "/namespaces/test/replicationcontrollers/cassandra":
 | 
				
			||||||
				return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &pods[0])}, nil
 | 
									return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &pods[1])}, nil
 | 
				
			||||||
			case "/watch/namespaces/test/replicationcontrollers/cassandra":
 | 
								case "/watch/namespaces/test/replicationcontrollers/cassandra":
 | 
				
			||||||
				return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events)}, nil
 | 
									return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events[1:])}, nil
 | 
				
			||||||
			default:
 | 
								default:
 | 
				
			||||||
				t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
 | 
									t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
 | 
				
			||||||
				return nil, nil
 | 
									return nil, nil
 | 
				
			||||||
@@ -816,7 +854,7 @@ func TestWatchResourceIdentifiedByFile(t *testing.T) {
 | 
				
			|||||||
	cmd.Flags().Set("filename", "../../../examples/storage/cassandra/cassandra-controller.yaml")
 | 
						cmd.Flags().Set("filename", "../../../examples/storage/cassandra/cassandra-controller.yaml")
 | 
				
			||||||
	cmd.Run(cmd, []string{})
 | 
						cmd.Run(cmd, []string{})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	expected := []runtime.Object{&pods[0], events[0].Object, events[1].Object}
 | 
						expected := []runtime.Object{&pods[1], events[2].Object, events[3].Object}
 | 
				
			||||||
	actual := tf.Printer.(*testPrinter).Objects
 | 
						actual := tf.Printer.(*testPrinter).Objects
 | 
				
			||||||
	if !reflect.DeepEqual(expected, actual) {
 | 
						if !reflect.DeepEqual(expected, actual) {
 | 
				
			||||||
		t.Errorf("expected object: %#v unexpected object: %#v", expected, actual)
 | 
							t.Errorf("expected object: %#v unexpected object: %#v", expected, actual)
 | 
				
			||||||
@@ -837,9 +875,9 @@ func TestWatchOnlyResource(t *testing.T) {
 | 
				
			|||||||
		Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
 | 
							Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
 | 
				
			||||||
			switch req.URL.Path {
 | 
								switch req.URL.Path {
 | 
				
			||||||
			case "/namespaces/test/pods/foo":
 | 
								case "/namespaces/test/pods/foo":
 | 
				
			||||||
				return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &pods[0])}, nil
 | 
									return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &pods[1])}, nil
 | 
				
			||||||
			case "/watch/namespaces/test/pods/foo":
 | 
								case "/watch/namespaces/test/pods/foo":
 | 
				
			||||||
				return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events)}, nil
 | 
									return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events[1:])}, nil
 | 
				
			||||||
			default:
 | 
								default:
 | 
				
			||||||
				t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
 | 
									t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
 | 
				
			||||||
				return nil, nil
 | 
									return nil, nil
 | 
				
			||||||
@@ -855,7 +893,51 @@ func TestWatchOnlyResource(t *testing.T) {
 | 
				
			|||||||
	cmd.Flags().Set("watch-only", "true")
 | 
						cmd.Flags().Set("watch-only", "true")
 | 
				
			||||||
	cmd.Run(cmd, []string{"pods", "foo"})
 | 
						cmd.Run(cmd, []string{"pods", "foo"})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	expected := []runtime.Object{events[0].Object, events[1].Object}
 | 
						expected := []runtime.Object{events[2].Object, events[3].Object}
 | 
				
			||||||
 | 
						actual := tf.Printer.(*testPrinter).Objects
 | 
				
			||||||
 | 
						if !reflect.DeepEqual(expected, actual) {
 | 
				
			||||||
 | 
							t.Errorf("unexpected object: %#v", actual)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(buf.String()) == 0 {
 | 
				
			||||||
 | 
							t.Errorf("unexpected empty output")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestWatchOnlyList(t *testing.T) {
 | 
				
			||||||
 | 
						pods, events := watchTestData()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						f, tf, codec, ns := NewAPIFactory()
 | 
				
			||||||
 | 
						tf.Printer = &testPrinter{}
 | 
				
			||||||
 | 
						podList := &api.PodList{
 | 
				
			||||||
 | 
							Items: pods,
 | 
				
			||||||
 | 
							ListMeta: unversioned.ListMeta{
 | 
				
			||||||
 | 
								ResourceVersion: "10",
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						tf.Client = &fake.RESTClient{
 | 
				
			||||||
 | 
							NegotiatedSerializer: ns,
 | 
				
			||||||
 | 
							Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
 | 
				
			||||||
 | 
								switch req.URL.Path {
 | 
				
			||||||
 | 
								case "/namespaces/test/pods":
 | 
				
			||||||
 | 
									return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, podList)}, nil
 | 
				
			||||||
 | 
								case "/watch/namespaces/test/pods":
 | 
				
			||||||
 | 
									return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events[2:])}, nil
 | 
				
			||||||
 | 
								default:
 | 
				
			||||||
 | 
									t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
 | 
				
			||||||
 | 
									return nil, nil
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						tf.Namespace = "test"
 | 
				
			||||||
 | 
						buf := bytes.NewBuffer([]byte{})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						cmd := NewCmdGet(f, buf)
 | 
				
			||||||
 | 
						cmd.SetOutput(buf)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						cmd.Flags().Set("watch-only", "true")
 | 
				
			||||||
 | 
						cmd.Run(cmd, []string{"pods"})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						expected := []runtime.Object{events[2].Object, events[3].Object}
 | 
				
			||||||
	actual := tf.Printer.(*testPrinter).Objects
 | 
						actual := tf.Printer.(*testPrinter).Objects
 | 
				
			||||||
	if !reflect.DeepEqual(expected, actual) {
 | 
						if !reflect.DeepEqual(expected, actual) {
 | 
				
			||||||
		t.Errorf("unexpected object: %#v", actual)
 | 
							t.Errorf("unexpected object: %#v", actual)
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user