|
|
|
@@ -924,53 +924,57 @@ func TestTransformUnstructuredError(t *testing.T) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type errorReader struct {
|
|
|
|
|
err error
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r errorReader) Read(data []byte) (int, error) { return 0, r.err }
|
|
|
|
|
func (r errorReader) Close() error { return nil }
|
|
|
|
|
|
|
|
|
|
func TestRequestWatch(t *testing.T) {
|
|
|
|
|
testCases := []struct {
|
|
|
|
|
Request *Request
|
|
|
|
|
Expect []watch.Event
|
|
|
|
|
Err bool
|
|
|
|
|
ErrFn func(error) bool
|
|
|
|
|
Empty bool
|
|
|
|
|
name string
|
|
|
|
|
Request *Request
|
|
|
|
|
maxRetries int
|
|
|
|
|
serverReturns []responseErr
|
|
|
|
|
Expect []watch.Event
|
|
|
|
|
attemptsExpected int
|
|
|
|
|
Err bool
|
|
|
|
|
ErrFn func(error) bool
|
|
|
|
|
Empty bool
|
|
|
|
|
}{
|
|
|
|
|
{
|
|
|
|
|
Request: &Request{err: errors.New("bail")},
|
|
|
|
|
Err: true,
|
|
|
|
|
name: "Request has error",
|
|
|
|
|
Request: &Request{err: errors.New("bail")},
|
|
|
|
|
attemptsExpected: 0,
|
|
|
|
|
Err: true,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "Client is nil, should use http.DefaultClient",
|
|
|
|
|
Request: &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "%"},
|
|
|
|
|
Err: true,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "error is not retryable",
|
|
|
|
|
Request: &Request{
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
|
|
|
|
|
return nil, errors.New("err")
|
|
|
|
|
}),
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
Err: true,
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: nil, err: errors.New("err")},
|
|
|
|
|
},
|
|
|
|
|
attemptsExpected: 1,
|
|
|
|
|
Err: true,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "server returns forbidden",
|
|
|
|
|
Request: &Request{
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
content: defaultContentConfig(),
|
|
|
|
|
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
|
|
|
|
|
return &http.Response{
|
|
|
|
|
StatusCode: http.StatusForbidden,
|
|
|
|
|
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
|
|
|
|
|
}, nil
|
|
|
|
|
}),
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: &http.Response{
|
|
|
|
|
StatusCode: http.StatusForbidden,
|
|
|
|
|
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
|
|
|
|
|
}, err: nil},
|
|
|
|
|
},
|
|
|
|
|
attemptsExpected: 1,
|
|
|
|
|
Expect: []watch.Event{
|
|
|
|
|
{
|
|
|
|
|
Type: watch.Error,
|
|
|
|
@@ -1000,101 +1004,205 @@ func TestRequestWatch(t *testing.T) {
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "server returns forbidden",
|
|
|
|
|
Request: &Request{
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
content: defaultContentConfig(),
|
|
|
|
|
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
|
|
|
|
|
return &http.Response{
|
|
|
|
|
StatusCode: http.StatusForbidden,
|
|
|
|
|
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
|
|
|
|
|
}, nil
|
|
|
|
|
}),
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
Err: true,
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: &http.Response{
|
|
|
|
|
StatusCode: http.StatusForbidden,
|
|
|
|
|
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
|
|
|
|
|
}, err: nil},
|
|
|
|
|
},
|
|
|
|
|
attemptsExpected: 1,
|
|
|
|
|
Err: true,
|
|
|
|
|
ErrFn: func(err error) bool {
|
|
|
|
|
return apierrors.IsForbidden(err)
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "server returns unauthorized",
|
|
|
|
|
Request: &Request{
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
content: defaultContentConfig(),
|
|
|
|
|
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
|
|
|
|
|
return &http.Response{
|
|
|
|
|
StatusCode: http.StatusUnauthorized,
|
|
|
|
|
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
|
|
|
|
|
}, nil
|
|
|
|
|
}),
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
Err: true,
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: &http.Response{
|
|
|
|
|
StatusCode: http.StatusUnauthorized,
|
|
|
|
|
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
|
|
|
|
|
}, err: nil},
|
|
|
|
|
},
|
|
|
|
|
attemptsExpected: 1,
|
|
|
|
|
Err: true,
|
|
|
|
|
ErrFn: func(err error) bool {
|
|
|
|
|
return apierrors.IsUnauthorized(err)
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "server returns unauthorized",
|
|
|
|
|
Request: &Request{
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
content: defaultContentConfig(),
|
|
|
|
|
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
|
|
|
|
|
return &http.Response{
|
|
|
|
|
StatusCode: http.StatusUnauthorized,
|
|
|
|
|
Body: ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &metav1.Status{
|
|
|
|
|
Status: metav1.StatusFailure,
|
|
|
|
|
Reason: metav1.StatusReasonUnauthorized,
|
|
|
|
|
})))),
|
|
|
|
|
}, nil
|
|
|
|
|
}),
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
Err: true,
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: &http.Response{
|
|
|
|
|
StatusCode: http.StatusUnauthorized,
|
|
|
|
|
Body: ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &metav1.Status{
|
|
|
|
|
Status: metav1.StatusFailure,
|
|
|
|
|
Reason: metav1.StatusReasonUnauthorized,
|
|
|
|
|
})))),
|
|
|
|
|
}, err: nil},
|
|
|
|
|
},
|
|
|
|
|
attemptsExpected: 1,
|
|
|
|
|
Err: true,
|
|
|
|
|
ErrFn: func(err error) bool {
|
|
|
|
|
return apierrors.IsUnauthorized(err)
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "server returns EOF error",
|
|
|
|
|
Request: &Request{
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
|
|
|
|
|
return nil, io.EOF
|
|
|
|
|
}),
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: nil, err: io.EOF},
|
|
|
|
|
},
|
|
|
|
|
attemptsExpected: 1,
|
|
|
|
|
Empty: true,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "server returns can't write HTTP request on broken connection error",
|
|
|
|
|
Request: &Request{
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: nil, err: errors.New("http: can't write HTTP request on broken connection")},
|
|
|
|
|
},
|
|
|
|
|
attemptsExpected: 1,
|
|
|
|
|
Empty: true,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "server returns connection reset by peer",
|
|
|
|
|
Request: &Request{
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: nil, err: errors.New("foo: connection reset by peer")},
|
|
|
|
|
},
|
|
|
|
|
attemptsExpected: 1,
|
|
|
|
|
Empty: true,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "max retries 2, server always returns EOF error",
|
|
|
|
|
Request: &Request{
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
maxRetries: 2,
|
|
|
|
|
attemptsExpected: 3,
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: nil, err: io.EOF},
|
|
|
|
|
{response: nil, err: io.EOF},
|
|
|
|
|
{response: nil, err: io.EOF},
|
|
|
|
|
},
|
|
|
|
|
Empty: true,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "max retries 1, server returns a retry-after response, request body seek error",
|
|
|
|
|
Request: &Request{
|
|
|
|
|
body: &readSeeker{err: io.EOF},
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
|
|
|
|
|
return nil, errors.New("http: can't write HTTP request on broken connection")
|
|
|
|
|
}),
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
maxRetries: 1,
|
|
|
|
|
attemptsExpected: 1,
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: retryAfterResponse(), err: nil},
|
|
|
|
|
},
|
|
|
|
|
Err: true,
|
|
|
|
|
ErrFn: func(err error) bool {
|
|
|
|
|
return apierrors.IsInternalError(err)
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "max retries 1, server returns a retryable error, request body seek error",
|
|
|
|
|
Request: &Request{
|
|
|
|
|
body: &readSeeker{err: io.EOF},
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
maxRetries: 1,
|
|
|
|
|
attemptsExpected: 1,
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: nil, err: io.EOF},
|
|
|
|
|
},
|
|
|
|
|
Empty: true,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "max retries 2, server always returns a response with Retry-After header",
|
|
|
|
|
Request: &Request{
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
|
|
|
|
|
return nil, errors.New("foo: connection reset by peer")
|
|
|
|
|
}),
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
Empty: true,
|
|
|
|
|
maxRetries: 2,
|
|
|
|
|
attemptsExpected: 3,
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: retryAfterResponse(), err: nil},
|
|
|
|
|
{response: retryAfterResponse(), err: nil},
|
|
|
|
|
{response: retryAfterResponse(), err: nil},
|
|
|
|
|
},
|
|
|
|
|
Err: true,
|
|
|
|
|
ErrFn: func(err error) bool {
|
|
|
|
|
return apierrors.IsInternalError(err)
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, testCase := range testCases {
|
|
|
|
|
t.Run("", func(t *testing.T) {
|
|
|
|
|
testCase.Request.backoff = &NoBackoff{}
|
|
|
|
|
testCase.Request.retry = &withRetry{}
|
|
|
|
|
t.Run(testCase.name, func(t *testing.T) {
|
|
|
|
|
var attemptsGot int
|
|
|
|
|
client := clientForFunc(func(req *http.Request) (*http.Response, error) {
|
|
|
|
|
defer func() {
|
|
|
|
|
attemptsGot++
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
if attemptsGot >= len(testCase.serverReturns) {
|
|
|
|
|
t.Fatalf("Wrong test setup, the server does not know what to return")
|
|
|
|
|
}
|
|
|
|
|
re := testCase.serverReturns[attemptsGot]
|
|
|
|
|
return re.response, re.err
|
|
|
|
|
})
|
|
|
|
|
if c := testCase.Request.c; c != nil && len(testCase.serverReturns) > 0 {
|
|
|
|
|
c.Client = client
|
|
|
|
|
}
|
|
|
|
|
testCase.Request.backoff = &noSleepBackOff{}
|
|
|
|
|
testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries}
|
|
|
|
|
|
|
|
|
|
watch, err := testCase.Request.Watch(context.Background())
|
|
|
|
|
|
|
|
|
|
if watch == nil && err == nil {
|
|
|
|
|
t.Fatal("Both watch.Interface and err returned by Watch are nil")
|
|
|
|
|
}
|
|
|
|
|
if testCase.attemptsExpected != attemptsGot {
|
|
|
|
|
t.Errorf("Expected RoundTrip to be invoked %d times, but got: %d", testCase.attemptsExpected, attemptsGot)
|
|
|
|
|
}
|
|
|
|
|
hasErr := err != nil
|
|
|
|
|
if hasErr != testCase.Err {
|
|
|
|
|
t.Fatalf("expected %t, got %t: %v", testCase.Err, hasErr, err)
|
|
|
|
@@ -1132,61 +1240,72 @@ func TestRequestWatch(t *testing.T) {
|
|
|
|
|
|
|
|
|
|
func TestRequestStream(t *testing.T) {
|
|
|
|
|
testCases := []struct {
|
|
|
|
|
Request *Request
|
|
|
|
|
Err bool
|
|
|
|
|
ErrFn func(error) bool
|
|
|
|
|
name string
|
|
|
|
|
Request *Request
|
|
|
|
|
maxRetries int
|
|
|
|
|
serverReturns []responseErr
|
|
|
|
|
attemptsExpected int
|
|
|
|
|
Err bool
|
|
|
|
|
ErrFn func(error) bool
|
|
|
|
|
}{
|
|
|
|
|
{
|
|
|
|
|
Request: &Request{err: errors.New("bail")},
|
|
|
|
|
Err: true,
|
|
|
|
|
name: "request has error",
|
|
|
|
|
Request: &Request{err: errors.New("bail")},
|
|
|
|
|
attemptsExpected: 0,
|
|
|
|
|
Err: true,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "Client is nil, should use http.DefaultClient",
|
|
|
|
|
Request: &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "%"},
|
|
|
|
|
Err: true,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "server returns an error",
|
|
|
|
|
Request: &Request{
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
|
|
|
|
|
return nil, errors.New("err")
|
|
|
|
|
}),
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
Err: true,
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: nil, err: errors.New("err")},
|
|
|
|
|
},
|
|
|
|
|
attemptsExpected: 1,
|
|
|
|
|
Err: true,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
Request: &Request{
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
|
|
|
|
|
return &http.Response{
|
|
|
|
|
StatusCode: http.StatusUnauthorized,
|
|
|
|
|
Body: ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &metav1.Status{
|
|
|
|
|
Status: metav1.StatusFailure,
|
|
|
|
|
Reason: metav1.StatusReasonUnauthorized,
|
|
|
|
|
})))),
|
|
|
|
|
}, nil
|
|
|
|
|
}),
|
|
|
|
|
content: defaultContentConfig(),
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
Err: true,
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: &http.Response{
|
|
|
|
|
StatusCode: http.StatusUnauthorized,
|
|
|
|
|
Body: ioutil.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &metav1.Status{
|
|
|
|
|
Status: metav1.StatusFailure,
|
|
|
|
|
Reason: metav1.StatusReasonUnauthorized,
|
|
|
|
|
})))),
|
|
|
|
|
}, err: nil},
|
|
|
|
|
},
|
|
|
|
|
attemptsExpected: 1,
|
|
|
|
|
Err: true,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
Request: &Request{
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
|
|
|
|
|
return &http.Response{
|
|
|
|
|
StatusCode: http.StatusBadRequest,
|
|
|
|
|
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"a container name must be specified for pod kube-dns-v20-mz5cv, choose one of: [kubedns dnsmasq healthz]","reason":"BadRequest","code":400}`))),
|
|
|
|
|
}, nil
|
|
|
|
|
}),
|
|
|
|
|
content: defaultContentConfig(),
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
Err: true,
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: &http.Response{
|
|
|
|
|
StatusCode: http.StatusBadRequest,
|
|
|
|
|
Body: ioutil.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"a container name must be specified for pod kube-dns-v20-mz5cv, choose one of: [kubedns dnsmasq healthz]","reason":"BadRequest","code":400}`))),
|
|
|
|
|
}, err: nil},
|
|
|
|
|
},
|
|
|
|
|
attemptsExpected: 1,
|
|
|
|
|
Err: true,
|
|
|
|
|
ErrFn: func(err error) bool {
|
|
|
|
|
if err.Error() == "a container name must be specified for pod kube-dns-v20-mz5cv, choose one of: [kubedns dnsmasq healthz]" {
|
|
|
|
|
return true
|
|
|
|
@@ -1194,25 +1313,124 @@ func TestRequestStream(t *testing.T) {
|
|
|
|
|
return false
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "max retries 1, server returns a retry-after response, request body seek error",
|
|
|
|
|
Request: &Request{
|
|
|
|
|
body: &readSeeker{err: io.EOF},
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
maxRetries: 1,
|
|
|
|
|
attemptsExpected: 1,
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: retryAfterResponse(), err: nil},
|
|
|
|
|
},
|
|
|
|
|
Err: true,
|
|
|
|
|
ErrFn: func(err error) bool {
|
|
|
|
|
return apierrors.IsInternalError(err)
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "max retries 2, server always returns a response with Retry-After header",
|
|
|
|
|
Request: &Request{
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
maxRetries: 2,
|
|
|
|
|
attemptsExpected: 3,
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: retryAfterResponse(), err: nil},
|
|
|
|
|
{response: retryAfterResponse(), err: nil},
|
|
|
|
|
{response: retryAfterResponse(), err: nil},
|
|
|
|
|
},
|
|
|
|
|
Err: true,
|
|
|
|
|
ErrFn: func(err error) bool {
|
|
|
|
|
return apierrors.IsInternalError(err)
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "server returns EOF after attempt 1, retry aborted",
|
|
|
|
|
Request: &Request{
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
maxRetries: 2,
|
|
|
|
|
attemptsExpected: 2,
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: retryAfterResponse(), err: nil},
|
|
|
|
|
{response: nil, err: io.EOF},
|
|
|
|
|
},
|
|
|
|
|
Err: true,
|
|
|
|
|
ErrFn: func(err error) bool {
|
|
|
|
|
return unWrap(err) == io.EOF
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "max retries 2, server returns success on the final attempt",
|
|
|
|
|
Request: &Request{
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
base: &url.URL{},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
maxRetries: 2,
|
|
|
|
|
attemptsExpected: 3,
|
|
|
|
|
serverReturns: []responseErr{
|
|
|
|
|
{response: retryAfterResponse(), err: nil},
|
|
|
|
|
{response: retryAfterResponse(), err: nil},
|
|
|
|
|
{response: &http.Response{
|
|
|
|
|
StatusCode: http.StatusOK,
|
|
|
|
|
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
|
|
|
|
|
}, err: nil},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for i, testCase := range testCases {
|
|
|
|
|
testCase.Request.backoff = &NoBackoff{}
|
|
|
|
|
testCase.Request.retry = &withRetry{maxRetries: 0}
|
|
|
|
|
body, err := testCase.Request.Stream(context.Background())
|
|
|
|
|
hasErr := err != nil
|
|
|
|
|
if hasErr != testCase.Err {
|
|
|
|
|
t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err)
|
|
|
|
|
}
|
|
|
|
|
if hasErr && body != nil {
|
|
|
|
|
t.Errorf("%d: body should be nil when error is returned", i)
|
|
|
|
|
}
|
|
|
|
|
for _, testCase := range testCases {
|
|
|
|
|
t.Run(testCase.name, func(t *testing.T) {
|
|
|
|
|
var attemptsGot int
|
|
|
|
|
client := clientForFunc(func(req *http.Request) (*http.Response, error) {
|
|
|
|
|
defer func() {
|
|
|
|
|
attemptsGot++
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
if hasErr {
|
|
|
|
|
if testCase.ErrFn != nil && !testCase.ErrFn(err) {
|
|
|
|
|
t.Errorf("unexpected error: %v", err)
|
|
|
|
|
if attemptsGot >= len(testCase.serverReturns) {
|
|
|
|
|
t.Fatalf("Wrong test setup, the server does not know what to return")
|
|
|
|
|
}
|
|
|
|
|
re := testCase.serverReturns[attemptsGot]
|
|
|
|
|
return re.response, re.err
|
|
|
|
|
})
|
|
|
|
|
if c := testCase.Request.c; c != nil && len(testCase.serverReturns) > 0 {
|
|
|
|
|
c.Client = client
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
testCase.Request.backoff = &noSleepBackOff{}
|
|
|
|
|
testCase.Request.retry = &withRetry{maxRetries: testCase.maxRetries}
|
|
|
|
|
|
|
|
|
|
body, err := testCase.Request.Stream(context.Background())
|
|
|
|
|
|
|
|
|
|
if body == nil && err == nil {
|
|
|
|
|
t.Fatal("Both body and err returned by Stream are nil")
|
|
|
|
|
}
|
|
|
|
|
if testCase.attemptsExpected != attemptsGot {
|
|
|
|
|
t.Errorf("Expected RoundTrip to be invoked %d times, but got: %d", testCase.attemptsExpected, attemptsGot)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
hasErr := err != nil
|
|
|
|
|
if hasErr != testCase.Err {
|
|
|
|
|
t.Errorf("expected %t, got %t: %v", testCase.Err, hasErr, err)
|
|
|
|
|
}
|
|
|
|
|
if hasErr && body != nil {
|
|
|
|
|
t.Error("body should be nil when error is returned")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if hasErr {
|
|
|
|
|
if testCase.ErrFn != nil && !testCase.ErrFn(err) {
|
|
|
|
|
t.Errorf("unexpected error: %#v", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -1840,57 +2058,87 @@ func TestBody(t *testing.T) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestWatch(t *testing.T) {
|
|
|
|
|
var table = []struct {
|
|
|
|
|
t watch.EventType
|
|
|
|
|
obj runtime.Object
|
|
|
|
|
tests := []struct {
|
|
|
|
|
name string
|
|
|
|
|
maxRetries int
|
|
|
|
|
}{
|
|
|
|
|
{watch.Added, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "first"}}},
|
|
|
|
|
{watch.Modified, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "second"}}},
|
|
|
|
|
{watch.Deleted, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "last"}}},
|
|
|
|
|
{
|
|
|
|
|
name: "no retry",
|
|
|
|
|
maxRetries: 0,
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "with retries",
|
|
|
|
|
maxRetries: 3,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
flusher, ok := w.(http.Flusher)
|
|
|
|
|
if !ok {
|
|
|
|
|
panic("need flusher!")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
w.Header().Set("Transfer-Encoding", "chunked")
|
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
|
|
|
flusher.Flush()
|
|
|
|
|
|
|
|
|
|
encoder := restclientwatch.NewEncoder(streaming.NewEncoder(w, scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion)), scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion))
|
|
|
|
|
for _, item := range table {
|
|
|
|
|
if err := encoder.Encode(&watch.Event{Type: item.t, Object: item.obj}); err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
for _, test := range tests {
|
|
|
|
|
t.Run(test.name, func(t *testing.T) {
|
|
|
|
|
var table = []struct {
|
|
|
|
|
t watch.EventType
|
|
|
|
|
obj runtime.Object
|
|
|
|
|
}{
|
|
|
|
|
{watch.Added, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "first"}}},
|
|
|
|
|
{watch.Modified, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "second"}}},
|
|
|
|
|
{watch.Deleted, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "last"}}},
|
|
|
|
|
}
|
|
|
|
|
flusher.Flush()
|
|
|
|
|
}
|
|
|
|
|
}))
|
|
|
|
|
defer testServer.Close()
|
|
|
|
|
|
|
|
|
|
s := testRESTClient(t, testServer)
|
|
|
|
|
watching, err := s.Get().Prefix("path/to/watch/thing").Watch(context.Background())
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
|
|
|
}
|
|
|
|
|
var attempts int
|
|
|
|
|
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
defer func() {
|
|
|
|
|
attempts++
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
for _, item := range table {
|
|
|
|
|
got, ok := <-watching.ResultChan()
|
|
|
|
|
if !ok {
|
|
|
|
|
t.Fatalf("Unexpected early close")
|
|
|
|
|
}
|
|
|
|
|
if e, a := item.t, got.Type; e != a {
|
|
|
|
|
t.Errorf("Expected %v, got %v", e, a)
|
|
|
|
|
}
|
|
|
|
|
if e, a := item.obj, got.Object; !apiequality.Semantic.DeepDerivative(e, a) {
|
|
|
|
|
t.Errorf("Expected %v, got %v", e, a)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
flusher, ok := w.(http.Flusher)
|
|
|
|
|
if !ok {
|
|
|
|
|
panic("need flusher!")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_, ok := <-watching.ResultChan()
|
|
|
|
|
if ok {
|
|
|
|
|
t.Fatal("Unexpected non-close")
|
|
|
|
|
if attempts < test.maxRetries {
|
|
|
|
|
w.Header().Set("Retry-After", "1")
|
|
|
|
|
w.WriteHeader(http.StatusTooManyRequests)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
w.Header().Set("Transfer-Encoding", "chunked")
|
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
|
|
|
flusher.Flush()
|
|
|
|
|
|
|
|
|
|
encoder := restclientwatch.NewEncoder(streaming.NewEncoder(w, scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion)), scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion))
|
|
|
|
|
for _, item := range table {
|
|
|
|
|
if err := encoder.Encode(&watch.Event{Type: item.t, Object: item.obj}); err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
flusher.Flush()
|
|
|
|
|
}
|
|
|
|
|
}))
|
|
|
|
|
defer testServer.Close()
|
|
|
|
|
|
|
|
|
|
s := testRESTClient(t, testServer)
|
|
|
|
|
watching, err := s.Get().Prefix("path/to/watch/thing").
|
|
|
|
|
MaxRetries(test.maxRetries).Watch(context.Background())
|
|
|
|
|
if err != nil {
|
|
|
|
|
t.Fatalf("Unexpected error: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, item := range table {
|
|
|
|
|
got, ok := <-watching.ResultChan()
|
|
|
|
|
if !ok {
|
|
|
|
|
t.Fatalf("Unexpected early close")
|
|
|
|
|
}
|
|
|
|
|
if e, a := item.t, got.Type; e != a {
|
|
|
|
|
t.Errorf("Expected %v, got %v", e, a)
|
|
|
|
|
}
|
|
|
|
|
if e, a := item.obj, got.Object; !apiequality.Semantic.DeepDerivative(e, a) {
|
|
|
|
|
t.Errorf("Expected %v, got %v", e, a)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_, ok := <-watching.ResultChan()
|
|
|
|
|
if ok {
|
|
|
|
|
t.Fatal("Unexpected non-close")
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -2333,14 +2581,27 @@ type seek struct {
|
|
|
|
|
type count struct {
|
|
|
|
|
// keeps track of the number of Seek(offset, whence) calls.
|
|
|
|
|
seeks []seek
|
|
|
|
|
|
|
|
|
|
// how many times {Request|Response}.Body.Close() has been invoked
|
|
|
|
|
lock sync.Mutex
|
|
|
|
|
closes int
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (c *count) close() {
|
|
|
|
|
c.lock.Lock()
|
|
|
|
|
defer c.lock.Unlock()
|
|
|
|
|
c.closes++
|
|
|
|
|
}
|
|
|
|
|
func (c *count) getCloseCount() int {
|
|
|
|
|
c.lock.Lock()
|
|
|
|
|
defer c.lock.Unlock()
|
|
|
|
|
return c.closes
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// used to track {Request|Response}.Body
|
|
|
|
|
type readTracker struct {
|
|
|
|
|
count *count
|
|
|
|
|
delegated io.Reader
|
|
|
|
|
count *count
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (r *readTracker) Seek(offset int64, whence int) (int64, error) {
|
|
|
|
@@ -2357,7 +2618,7 @@ func (r *readTracker) Read(p []byte) (n int, err error) {
|
|
|
|
|
|
|
|
|
|
func (r *readTracker) Close() error {
|
|
|
|
|
if closer, ok := r.delegated.(io.Closer); ok {
|
|
|
|
|
r.count.closes++
|
|
|
|
|
r.count.close()
|
|
|
|
|
return closer.Close()
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
@@ -2492,26 +2753,46 @@ func TestRequestWithRetry(t *testing.T) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestRequestDoWithRetry(t *testing.T) {
|
|
|
|
|
testRequestWithRetry(t, func(ctx context.Context, r *Request) {
|
|
|
|
|
testRequestWithRetry(t, "Do", func(ctx context.Context, r *Request) {
|
|
|
|
|
r.Do(ctx)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestRequestDORawWithRetry(t *testing.T) {
|
|
|
|
|
testRequestWithRetry(t, func(ctx context.Context, r *Request) {
|
|
|
|
|
func TestRequestDoRawWithRetry(t *testing.T) {
|
|
|
|
|
// both request.Do and request.DoRaw have the same behavior and expectations
|
|
|
|
|
testRequestWithRetry(t, "Do", func(ctx context.Context, r *Request) {
|
|
|
|
|
r.DoRaw(ctx)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func testRequestWithRetry(t *testing.T, doFunc func(ctx context.Context, r *Request)) {
|
|
|
|
|
func TestRequestStreamWithRetry(t *testing.T) {
|
|
|
|
|
testRequestWithRetry(t, "Stream", func(ctx context.Context, r *Request) {
|
|
|
|
|
r.Stream(ctx)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestRequestWatchWithRetry(t *testing.T) {
|
|
|
|
|
testRequestWithRetry(t, "Watch", func(ctx context.Context, r *Request) {
|
|
|
|
|
r.Watch(ctx)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) {
|
|
|
|
|
type expected struct {
|
|
|
|
|
attempts int
|
|
|
|
|
reqCount *count
|
|
|
|
|
respCount *count
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tests := []struct {
|
|
|
|
|
name string
|
|
|
|
|
verb string
|
|
|
|
|
body func() io.Reader
|
|
|
|
|
maxRetries int
|
|
|
|
|
serverReturns []responseErr
|
|
|
|
|
reqCountExpected *count
|
|
|
|
|
respCountExpected *count
|
|
|
|
|
name string
|
|
|
|
|
verb string
|
|
|
|
|
body func() io.Reader
|
|
|
|
|
maxRetries int
|
|
|
|
|
serverReturns []responseErr
|
|
|
|
|
|
|
|
|
|
// expectations differ based on whether it is 'Watch', 'Stream' or 'Do'
|
|
|
|
|
expectations map[string]expected
|
|
|
|
|
}{
|
|
|
|
|
{
|
|
|
|
|
name: "server always returns retry-after response",
|
|
|
|
@@ -2523,8 +2804,23 @@ func testRequestWithRetry(t *testing.T, doFunc func(ctx context.Context, r *Requ
|
|
|
|
|
{response: retryAfterResponse(), err: nil},
|
|
|
|
|
{response: retryAfterResponse(), err: nil},
|
|
|
|
|
},
|
|
|
|
|
reqCountExpected: &count{closes: 0, seeks: make([]seek, 2)},
|
|
|
|
|
respCountExpected: &count{closes: 3, seeks: []seek{}},
|
|
|
|
|
expectations: map[string]expected{
|
|
|
|
|
"Do": {
|
|
|
|
|
attempts: 3,
|
|
|
|
|
reqCount: &count{closes: 0, seeks: make([]seek, 2)},
|
|
|
|
|
respCount: &count{closes: 3, seeks: []seek{}},
|
|
|
|
|
},
|
|
|
|
|
"Watch": {
|
|
|
|
|
attempts: 3,
|
|
|
|
|
reqCount: &count{closes: 0, seeks: make([]seek, 2)},
|
|
|
|
|
respCount: &count{closes: 3, seeks: []seek{}},
|
|
|
|
|
},
|
|
|
|
|
"Stream": {
|
|
|
|
|
attempts: 3,
|
|
|
|
|
reqCount: &count{closes: 0, seeks: make([]seek, 2)},
|
|
|
|
|
respCount: &count{closes: 3, seeks: []seek{}},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "server always returns retryable error",
|
|
|
|
@@ -2536,8 +2832,24 @@ func testRequestWithRetry(t *testing.T, doFunc func(ctx context.Context, r *Requ
|
|
|
|
|
{response: nil, err: io.EOF},
|
|
|
|
|
{response: nil, err: io.EOF},
|
|
|
|
|
},
|
|
|
|
|
reqCountExpected: &count{closes: 0, seeks: make([]seek, 2)},
|
|
|
|
|
respCountExpected: &count{closes: 0, seeks: []seek{}},
|
|
|
|
|
expectations: map[string]expected{
|
|
|
|
|
"Do": {
|
|
|
|
|
attempts: 3,
|
|
|
|
|
reqCount: &count{closes: 0, seeks: make([]seek, 2)},
|
|
|
|
|
respCount: &count{closes: 0, seeks: []seek{}},
|
|
|
|
|
},
|
|
|
|
|
"Watch": {
|
|
|
|
|
attempts: 3,
|
|
|
|
|
reqCount: &count{closes: 0, seeks: make([]seek, 2)},
|
|
|
|
|
respCount: &count{closes: 0, seeks: []seek{}},
|
|
|
|
|
},
|
|
|
|
|
// for Stream, we never retry on any error
|
|
|
|
|
"Stream": {
|
|
|
|
|
attempts: 1, // only the first attempt is expected
|
|
|
|
|
reqCount: &count{closes: 0, seeks: []seek{}},
|
|
|
|
|
respCount: &count{closes: 0, seeks: []seek{}},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
{
|
|
|
|
|
name: "server returns success on the final retry",
|
|
|
|
@@ -2549,8 +2861,24 @@ func testRequestWithRetry(t *testing.T, doFunc func(ctx context.Context, r *Requ
|
|
|
|
|
{response: nil, err: io.EOF},
|
|
|
|
|
{response: &http.Response{StatusCode: http.StatusOK}, err: nil},
|
|
|
|
|
},
|
|
|
|
|
reqCountExpected: &count{closes: 0, seeks: make([]seek, 2)},
|
|
|
|
|
respCountExpected: &count{closes: 2, seeks: []seek{}},
|
|
|
|
|
expectations: map[string]expected{
|
|
|
|
|
"Do": {
|
|
|
|
|
attempts: 3,
|
|
|
|
|
reqCount: &count{closes: 0, seeks: make([]seek, 2)},
|
|
|
|
|
respCount: &count{closes: 2, seeks: []seek{}},
|
|
|
|
|
},
|
|
|
|
|
"Watch": {
|
|
|
|
|
attempts: 3,
|
|
|
|
|
reqCount: &count{closes: 0, seeks: make([]seek, 2)},
|
|
|
|
|
// we don't close the the Body of the final successful response
|
|
|
|
|
respCount: &count{closes: 1, seeks: []seek{}},
|
|
|
|
|
},
|
|
|
|
|
"Stream": {
|
|
|
|
|
attempts: 2,
|
|
|
|
|
reqCount: &count{closes: 0, seeks: make([]seek, 1)},
|
|
|
|
|
respCount: &count{closes: 1, seeks: []seek{}},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -2580,7 +2908,8 @@ func testRequestWithRetry(t *testing.T, doFunc func(ctx context.Context, r *Requ
|
|
|
|
|
verb: test.verb,
|
|
|
|
|
body: reqRecorder,
|
|
|
|
|
c: &RESTClient{
|
|
|
|
|
Client: client,
|
|
|
|
|
content: defaultContentConfig(),
|
|
|
|
|
Client: client,
|
|
|
|
|
},
|
|
|
|
|
backoff: &noSleepBackOff{},
|
|
|
|
|
retry: &withRetry{maxRetries: test.maxRetries},
|
|
|
|
@@ -2588,15 +2917,19 @@ func testRequestWithRetry(t *testing.T, doFunc func(ctx context.Context, r *Requ
|
|
|
|
|
|
|
|
|
|
doFunc(context.Background(), req)
|
|
|
|
|
|
|
|
|
|
attemptsExpected := test.maxRetries + 1
|
|
|
|
|
if attemptsExpected != attempts {
|
|
|
|
|
t.Errorf("Expected retries: %d, but got: %d", attemptsExpected, attempts)
|
|
|
|
|
expected, ok := test.expectations[key]
|
|
|
|
|
if !ok {
|
|
|
|
|
t.Fatalf("Wrong test setup - did not find expected for: %s", key)
|
|
|
|
|
}
|
|
|
|
|
if !reflect.DeepEqual(test.reqCountExpected.seeks, reqCountGot.seeks) {
|
|
|
|
|
t.Errorf("Expected request body to have seek invocation: %v, but got: %v", test.reqCountExpected.seeks, reqCountGot.seeks)
|
|
|
|
|
if expected.attempts != attempts {
|
|
|
|
|
t.Errorf("Expected retries: %d, but got: %d", expected.attempts, attempts)
|
|
|
|
|
}
|
|
|
|
|
if test.respCountExpected.closes != respCountGot.closes {
|
|
|
|
|
t.Errorf("Expected response body Close to be invoked %d times, but got: %d", test.respCountExpected.closes, respCountGot.closes)
|
|
|
|
|
|
|
|
|
|
if !reflect.DeepEqual(expected.reqCount.seeks, reqCountGot.seeks) {
|
|
|
|
|
t.Errorf("Expected request body to have seek invocation: %v, but got: %v", expected.reqCount.seeks, reqCountGot.seeks)
|
|
|
|
|
}
|
|
|
|
|
if expected.respCount.closes != respCountGot.getCloseCount() {
|
|
|
|
|
t.Errorf("Expected response body Close to be invoked %d times, but got: %d", expected.respCount.closes, respCountGot.getCloseCount())
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|