Revert "Revert "Allow webhook authenticator to use TokenReviewsInterface""
This reverts commit 0a02c8275d.
			
			
This commit is contained in:
		| @@ -410,6 +410,11 @@ func IsServerTimeout(err error) bool { | ||||
| 	return reasonForError(err) == unversioned.StatusReasonServerTimeout | ||||
| } | ||||
|  | ||||
| // IsInternalError determines if err is an error which indicates an internal server error. | ||||
| func IsInternalError(err error) bool { | ||||
| 	return reasonForError(err) == unversioned.StatusReasonInternalError | ||||
| } | ||||
|  | ||||
| // IsUnexpectedServerError returns true if the server response was not in the expected API format, | ||||
| // and may be the result of another HTTP actor. | ||||
| func IsUnexpectedServerError(err error) bool { | ||||
|   | ||||
| @@ -18,18 +18,17 @@ limitations under the License. | ||||
| package webhook | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"time" | ||||
|  | ||||
| 	"k8s.io/kubernetes/pkg/api/unversioned" | ||||
| 	"k8s.io/kubernetes/pkg/apis/authentication" | ||||
| 	_ "k8s.io/kubernetes/pkg/apis/authentication/install" | ||||
| 	"k8s.io/kubernetes/pkg/apis/authentication/v1beta1" | ||||
| 	"k8s.io/kubernetes/pkg/auth/authenticator" | ||||
| 	"k8s.io/kubernetes/pkg/auth/user" | ||||
| 	"k8s.io/kubernetes/pkg/client/restclient" | ||||
| 	authenticationclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/authentication/unversioned" | ||||
| 	"k8s.io/kubernetes/pkg/util/cache" | ||||
| 	"k8s.io/kubernetes/plugin/pkg/webhook" | ||||
|  | ||||
| 	_ "k8s.io/kubernetes/pkg/apis/authentication/install" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| @@ -42,55 +41,90 @@ const retryBackoff = 500 * time.Millisecond | ||||
| var _ authenticator.Token = (*WebhookTokenAuthenticator)(nil) | ||||
|  | ||||
| type WebhookTokenAuthenticator struct { | ||||
| 	*webhook.GenericWebhook | ||||
| 	tokenReview    authenticationclient.TokenReviewInterface | ||||
| 	responseCache  *cache.LRUExpireCache | ||||
| 	ttl            time.Duration | ||||
| 	initialBackoff time.Duration | ||||
| } | ||||
|  | ||||
| // NewFromInterface creates a webhook authenticator using the given tokenReview client | ||||
| func NewFromInterface(tokenReview authenticationclient.TokenReviewInterface, ttl time.Duration) (*WebhookTokenAuthenticator, error) { | ||||
| 	return newWithBackoff(tokenReview, ttl, retryBackoff) | ||||
| } | ||||
|  | ||||
| // New creates a new WebhookTokenAuthenticator from the provided kubeconfig file. | ||||
| func New(kubeConfigFile string, ttl time.Duration) (*WebhookTokenAuthenticator, error) { | ||||
| 	return newWithBackoff(kubeConfigFile, ttl, retryBackoff) | ||||
| } | ||||
|  | ||||
| // newWithBackoff allows tests to skip the sleep. | ||||
| func newWithBackoff(kubeConfigFile string, ttl, initialBackoff time.Duration) (*WebhookTokenAuthenticator, error) { | ||||
| 	gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions, initialBackoff) | ||||
| 	tokenReview, err := tokenReviewInterfaceFromKubeconfig(kubeConfigFile) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &WebhookTokenAuthenticator{gw, cache.NewLRUExpireCache(1024), ttl}, nil | ||||
| 	return newWithBackoff(tokenReview, ttl, retryBackoff) | ||||
| } | ||||
|  | ||||
| // newWithBackoff allows tests to skip the sleep. | ||||
| func newWithBackoff(tokenReview authenticationclient.TokenReviewInterface, ttl, initialBackoff time.Duration) (*WebhookTokenAuthenticator, error) { | ||||
| 	return &WebhookTokenAuthenticator{tokenReview, cache.NewLRUExpireCache(1024), ttl, initialBackoff}, nil | ||||
| } | ||||
|  | ||||
| // AuthenticateToken implements the authenticator.Token interface. | ||||
| func (w *WebhookTokenAuthenticator) AuthenticateToken(token string) (user.Info, bool, error) { | ||||
| 	r := &v1beta1.TokenReview{ | ||||
| 		Spec: v1beta1.TokenReviewSpec{Token: token}, | ||||
| 	r := &authentication.TokenReview{ | ||||
| 		Spec: authentication.TokenReviewSpec{Token: token}, | ||||
| 	} | ||||
| 	if entry, ok := w.responseCache.Get(r.Spec); ok { | ||||
| 		r.Status = entry.(v1beta1.TokenReviewStatus) | ||||
| 		r.Status = entry.(authentication.TokenReviewStatus) | ||||
| 	} else { | ||||
| 		result := w.WithExponentialBackoff(func() restclient.Result { | ||||
| 			return w.RestClient.Post().Body(r).Do() | ||||
| 		var ( | ||||
| 			result *authentication.TokenReview | ||||
| 			err    error | ||||
| 		) | ||||
| 		webhook.WithExponentialBackoff(w.initialBackoff, func() error { | ||||
| 			result, err = w.tokenReview.Create(r) | ||||
| 			return err | ||||
| 		}) | ||||
| 		if err := result.Error(); err != nil { | ||||
| 		if err != nil { | ||||
| 			return nil, false, err | ||||
| 		} | ||||
| 		var statusCode int | ||||
| 		if result.StatusCode(&statusCode); statusCode < 200 || statusCode >= 300 { | ||||
| 			return nil, false, fmt.Errorf("Error contacting webhook: %d", statusCode) | ||||
| 		} | ||||
| 		spec := r.Spec | ||||
| 		if err := result.Into(r); err != nil { | ||||
| 			return nil, false, err | ||||
| 		} | ||||
| 		w.responseCache.Add(spec, r.Status, w.ttl) | ||||
| 		r.Status = result.Status | ||||
| 		w.responseCache.Add(r.Spec, result.Status, w.ttl) | ||||
| 	} | ||||
| 	if !r.Status.Authenticated { | ||||
| 		return nil, false, nil | ||||
| 	} | ||||
|  | ||||
| 	var extra map[string][]string | ||||
| 	if r.Status.User.Extra != nil { | ||||
| 		extra = map[string][]string{} | ||||
| 		for k, v := range r.Status.User.Extra { | ||||
| 			extra[k] = v | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return &user.DefaultInfo{ | ||||
| 		Name:   r.Status.User.Username, | ||||
| 		UID:    r.Status.User.UID, | ||||
| 		Groups: r.Status.User.Groups, | ||||
| 		Extra:  extra, | ||||
| 	}, true, nil | ||||
| } | ||||
|  | ||||
| // tokenReviewInterfaceFromKubeconfig builds a client from the specified kubeconfig file, | ||||
| // and returns a TokenReviewInterface that uses that client. Note that the client submits TokenReview | ||||
| // requests to the exact path specified in the kubeconfig file, so arbitrary non-API servers can be targeted. | ||||
| func tokenReviewInterfaceFromKubeconfig(kubeConfigFile string) (authenticationclient.TokenReviewInterface, error) { | ||||
| 	gw, err := webhook.NewGenericWebhook(kubeConfigFile, groupVersions, 0) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &tokenReviewClient{gw}, nil | ||||
| } | ||||
|  | ||||
| type tokenReviewClient struct { | ||||
| 	w *webhook.GenericWebhook | ||||
| } | ||||
|  | ||||
| func (t *tokenReviewClient) Create(tokenReview *authentication.TokenReview) (*authentication.TokenReview, error) { | ||||
| 	result := &authentication.TokenReview{} | ||||
| 	err := t.w.RestClient.Post().Body(tokenReview).Do().Into(result) | ||||
| 	return result, err | ||||
| } | ||||
|   | ||||
| @@ -24,6 +24,7 @@ import ( | ||||
| 	"io/ioutil" | ||||
| 	"net/http" | ||||
| 	"net/http/httptest" | ||||
| 	"net/url" | ||||
| 	"os" | ||||
| 	"reflect" | ||||
| 	"testing" | ||||
| @@ -45,6 +46,7 @@ type Service interface { | ||||
|  | ||||
| // NewTestServer wraps a Service as an httptest.Server. | ||||
| func NewTestServer(s Service, cert, key, caCert []byte) (*httptest.Server, error) { | ||||
| 	const webhookPath = "/testserver" | ||||
| 	var tlsConfig *tls.Config | ||||
| 	if cert != nil { | ||||
| 		cert, err := tls.X509KeyPair(cert, key) | ||||
| @@ -65,29 +67,57 @@ func NewTestServer(s Service, cert, key, caCert []byte) (*httptest.Server, error | ||||
| 	} | ||||
|  | ||||
| 	serveHTTP := func(w http.ResponseWriter, r *http.Request) { | ||||
| 		if r.Method != "POST" { | ||||
| 			http.Error(w, fmt.Sprintf("unexpected method: %v", r.Method), http.StatusMethodNotAllowed) | ||||
| 			return | ||||
| 		} | ||||
| 		if r.URL.Path != webhookPath { | ||||
| 			http.Error(w, fmt.Sprintf("unexpected path: %v", r.URL.Path), http.StatusNotFound) | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		var review v1beta1.TokenReview | ||||
| 		if err := json.NewDecoder(r.Body).Decode(&review); err != nil { | ||||
| 		bodyData, _ := ioutil.ReadAll(r.Body) | ||||
| 		if err := json.Unmarshal(bodyData, &review); err != nil { | ||||
| 			http.Error(w, fmt.Sprintf("failed to decode body: %v", err), http.StatusBadRequest) | ||||
| 			return | ||||
| 		} | ||||
| 		// ensure we received the serialized tokenreview as expected | ||||
| 		if review.APIVersion != "authentication.k8s.io/v1beta1" { | ||||
| 			http.Error(w, fmt.Sprintf("wrong api version: %s", string(bodyData)), http.StatusBadRequest) | ||||
| 			return | ||||
| 		} | ||||
| 		// once we have a successful request, always call the review to record that we were called | ||||
| 		s.Review(&review) | ||||
| 		if s.HTTPStatusCode() < 200 || s.HTTPStatusCode() >= 300 { | ||||
| 			http.Error(w, "HTTP Error", s.HTTPStatusCode()) | ||||
| 			return | ||||
| 		} | ||||
| 		s.Review(&review) | ||||
| 		type userInfo struct { | ||||
| 			Username string              `json:"username"` | ||||
| 			UID      string              `json:"uid"` | ||||
| 			Groups   []string            `json:"groups"` | ||||
| 			Extra    map[string][]string `json:"extra"` | ||||
| 		} | ||||
| 		type status struct { | ||||
| 			Authenticated bool     `json:"authenticated"` | ||||
| 			User          userInfo `json:"user"` | ||||
| 		} | ||||
|  | ||||
| 		var extra map[string][]string | ||||
| 		if review.Status.User.Extra != nil { | ||||
| 			extra = map[string][]string{} | ||||
| 			for k, v := range review.Status.User.Extra { | ||||
| 				extra[k] = v | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		resp := struct { | ||||
| 			Kind       string `json:"kind"` | ||||
| 			APIVersion string `json:"apiVersion"` | ||||
| 			Status     status `json:"status"` | ||||
| 		}{ | ||||
| 			Kind:       "TokenReview", | ||||
| 			APIVersion: v1beta1.SchemeGroupVersion.String(), | ||||
| 			Status: status{ | ||||
| 				review.Status.Authenticated, | ||||
| @@ -95,6 +125,7 @@ func NewTestServer(s Service, cert, key, caCert []byte) (*httptest.Server, error | ||||
| 					Username: review.Status.User.Username, | ||||
| 					UID:      review.Status.User.UID, | ||||
| 					Groups:   review.Status.User.Groups, | ||||
| 					Extra:    extra, | ||||
| 				}, | ||||
| 			}, | ||||
| 		} | ||||
| @@ -105,6 +136,12 @@ func NewTestServer(s Service, cert, key, caCert []byte) (*httptest.Server, error | ||||
| 	server := httptest.NewUnstartedServer(http.HandlerFunc(serveHTTP)) | ||||
| 	server.TLS = tlsConfig | ||||
| 	server.StartTLS() | ||||
|  | ||||
| 	// Adjust the path to point to our custom path | ||||
| 	serverURL, _ := url.Parse(server.URL) | ||||
| 	serverURL.Path = webhookPath | ||||
| 	server.URL = serverURL.String() | ||||
|  | ||||
| 	return server, nil | ||||
| } | ||||
|  | ||||
| @@ -112,9 +149,11 @@ func NewTestServer(s Service, cert, key, caCert []byte) (*httptest.Server, error | ||||
| type mockService struct { | ||||
| 	allow      bool | ||||
| 	statusCode int | ||||
| 	called     int | ||||
| } | ||||
|  | ||||
| func (m *mockService) Review(r *v1beta1.TokenReview) { | ||||
| 	m.called++ | ||||
| 	r.Status.Authenticated = m.allow | ||||
| 	if m.allow { | ||||
| 		r.Status.User.Username = "realHooman@email.com" | ||||
| @@ -148,7 +187,13 @@ func newTokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, c | ||||
| 	if err := json.NewEncoder(tempfile).Encode(config); err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return newWithBackoff(p, cacheTime, 0) | ||||
|  | ||||
| 	c, err := tokenReviewInterfaceFromKubeconfig(p) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	return newWithBackoff(c, cacheTime, 0) | ||||
| } | ||||
|  | ||||
| func TestTLSConfig(t *testing.T) { | ||||
| @@ -294,6 +339,7 @@ func TestWebhookTokenAuthenticator(t *testing.T) { | ||||
| 					Username: "person@place.com", | ||||
| 					UID:      "abcd-1234", | ||||
| 					Groups:   []string{"stuff-dev", "main-eng"}, | ||||
| 					Extra:    map[string]v1beta1.ExtraValue{"foo": {"bar", "baz"}}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			expectedAuthenticated: true, | ||||
| @@ -301,6 +347,7 @@ func TestWebhookTokenAuthenticator(t *testing.T) { | ||||
| 				Name:   "person@place.com", | ||||
| 				UID:    "abcd-1234", | ||||
| 				Groups: []string{"stuff-dev", "main-eng"}, | ||||
| 				Extra:  map[string][]string{"foo": {"bar", "baz"}}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		// Unauthenticated shouldn't even include extra provided info. | ||||
| @@ -345,7 +392,7 @@ func TestWebhookTokenAuthenticator(t *testing.T) { | ||||
| 				i, authenticated, tt.expectedAuthenticated) | ||||
| 		} | ||||
| 		if user != nil && tt.expectedUser != nil && !reflect.DeepEqual(user, tt.expectedUser) { | ||||
| 			t.Errorf("case %d: Plugin returned incorrect user. Got %v, expected %v", | ||||
| 			t.Errorf("case %d: Plugin returned incorrect user. Got %#v, expected %#v", | ||||
| 				i, user, tt.expectedUser) | ||||
| 		} | ||||
| 	} | ||||
| @@ -374,8 +421,9 @@ func (a *authenticationUserInfo) GetExtra() map[string][]string { | ||||
| var _ user.Info = (*authenticationUserInfo)(nil) | ||||
|  | ||||
| // TestWebhookCache verifies that error responses from the server are not | ||||
| // cached, but successful responses are. | ||||
| func TestWebhookCache(t *testing.T) { | ||||
| // cached, but successful responses are. It also ensures that the webhook | ||||
| // call is retried on 429 and 500+ errors | ||||
| func TestWebhookCacheAndRetry(t *testing.T) { | ||||
| 	serv := new(mockService) | ||||
| 	s, err := NewTestServer(serv, serverCert, serverKey, caCert) | ||||
| 	if err != nil { | ||||
| @@ -388,36 +436,129 @@ func TestWebhookCache(t *testing.T) { | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| 	token := "t0k3n" | ||||
| 	serv.allow = true | ||||
| 	serv.statusCode = 500 | ||||
| 	if _, _, err := wh.AuthenticateToken(token); err == nil { | ||||
| 		t.Errorf("Webhook returned HTTP 500, but authorizer reported success.") | ||||
|  | ||||
| 	testcases := []struct { | ||||
| 		description string | ||||
|  | ||||
| 		token string | ||||
| 		allow bool | ||||
| 		code  int | ||||
|  | ||||
| 		expectError bool | ||||
| 		expectOk    bool | ||||
| 		expectCalls int | ||||
| 	}{ | ||||
| 		{ | ||||
| 			description: "t0k3n, 500 error, retries and fails", | ||||
|  | ||||
| 			token: "t0k3n", | ||||
| 			allow: false, | ||||
| 			code:  500, | ||||
|  | ||||
| 			expectError: true, | ||||
| 			expectOk:    false, | ||||
| 			expectCalls: 5, | ||||
| 		}, | ||||
| 		{ | ||||
| 			description: "t0k3n, 404 error, fails (but no retry)", | ||||
|  | ||||
| 			token: "t0k3n", | ||||
| 			allow: false, | ||||
| 			code:  404, | ||||
|  | ||||
| 			expectError: true, | ||||
| 			expectOk:    false, | ||||
| 			expectCalls: 1, | ||||
| 		}, | ||||
| 		{ | ||||
| 			description: "t0k3n, 200 response, allowed, succeeds with a single call", | ||||
|  | ||||
| 			token: "t0k3n", | ||||
| 			allow: true, | ||||
| 			code:  200, | ||||
|  | ||||
| 			expectError: false, | ||||
| 			expectOk:    true, | ||||
| 			expectCalls: 1, | ||||
| 		}, | ||||
| 		{ | ||||
| 			description: "t0k3n, 500 response, disallowed, but never called because previous 200 response was cached", | ||||
|  | ||||
| 			token: "t0k3n", | ||||
| 			allow: false, | ||||
| 			code:  500, | ||||
|  | ||||
| 			expectError: false, | ||||
| 			expectOk:    true, | ||||
| 			expectCalls: 0, | ||||
| 		}, | ||||
|  | ||||
| 		{ | ||||
| 			description: "an0th3r_t0k3n, 500 response, disallowed, should be called again with retries", | ||||
|  | ||||
| 			token: "an0th3r_t0k3n", | ||||
| 			allow: false, | ||||
| 			code:  500, | ||||
|  | ||||
| 			expectError: true, | ||||
| 			expectOk:    false, | ||||
| 			expectCalls: 5, | ||||
| 		}, | ||||
| 		{ | ||||
| 			description: "an0th3r_t0k3n, 429 response, disallowed, should be called again with retries", | ||||
|  | ||||
| 			token: "an0th3r_t0k3n", | ||||
| 			allow: false, | ||||
| 			code:  429, | ||||
|  | ||||
| 			expectError: true, | ||||
| 			expectOk:    false, | ||||
| 			expectCalls: 5, | ||||
| 		}, | ||||
| 		{ | ||||
| 			description: "an0th3r_t0k3n, 200 response, allowed, succeeds with a single call", | ||||
|  | ||||
| 			token: "an0th3r_t0k3n", | ||||
| 			allow: true, | ||||
| 			code:  200, | ||||
|  | ||||
| 			expectError: false, | ||||
| 			expectOk:    true, | ||||
| 			expectCalls: 1, | ||||
| 		}, | ||||
| 		{ | ||||
| 			description: "an0th3r_t0k3n, 500 response, disallowed, but never called because previous 200 response was cached", | ||||
|  | ||||
| 			token: "an0th3r_t0k3n", | ||||
| 			allow: false, | ||||
| 			code:  500, | ||||
|  | ||||
| 			expectError: false, | ||||
| 			expectOk:    true, | ||||
| 			expectCalls: 0, | ||||
| 		}, | ||||
| 	} | ||||
| 	serv.statusCode = 404 | ||||
| 	if _, _, err := wh.AuthenticateToken(token); err == nil { | ||||
| 		t.Errorf("Webhook returned HTTP 404, but authorizer reported success.") | ||||
|  | ||||
| 	for _, testcase := range testcases { | ||||
| 		func() { | ||||
| 			serv.allow = testcase.allow | ||||
| 			serv.statusCode = testcase.code | ||||
| 			serv.called = 0 | ||||
|  | ||||
| 			_, ok, err := wh.AuthenticateToken(testcase.token) | ||||
| 			hasError := err != nil | ||||
| 			if hasError != testcase.expectError { | ||||
| 				t.Log(testcase.description) | ||||
| 				t.Errorf("Webhook returned HTTP %d, expected error=%v, but got error %v", testcase.code, testcase.expectError, err) | ||||
| 			} | ||||
| 	serv.statusCode = 200 | ||||
| 	if _, _, err := wh.AuthenticateToken(token); err != nil { | ||||
| 		t.Errorf("Webhook returned HTTP 200, but authorizer reported unauthorized.") | ||||
| 			if serv.called != testcase.expectCalls { | ||||
| 				t.Log(testcase.description) | ||||
| 				t.Errorf("Expected %d calls, got %d", testcase.expectCalls, serv.called) | ||||
| 			} | ||||
| 	serv.statusCode = 500 | ||||
| 	if _, _, err := wh.AuthenticateToken(token); err != nil { | ||||
| 		t.Errorf("Webhook should have successful response cached, but authorizer reported unauthorized.") | ||||
| 			if ok != testcase.expectOk { | ||||
| 				t.Log(testcase.description) | ||||
| 				t.Errorf("Expected ok=%v, got %v", testcase.expectOk, ok) | ||||
| 			} | ||||
| 	// For a different request, webhook should be called again. | ||||
| 	token = "an0th3r_t0k3n" | ||||
| 	serv.statusCode = 500 | ||||
| 	if _, _, err := wh.AuthenticateToken(token); err == nil { | ||||
| 		t.Errorf("Webhook returned HTTP 500, but authorizer reported success.") | ||||
| 	} | ||||
| 	serv.statusCode = 200 | ||||
| 	if _, _, err := wh.AuthenticateToken(token); err != nil { | ||||
| 		t.Errorf("Webhook returned HTTP 200, but authorizer reported unauthorized.") | ||||
| 	} | ||||
| 	serv.statusCode = 500 | ||||
| 	if _, _, err := wh.AuthenticateToken(token); err != nil { | ||||
| 		t.Errorf("Webhook should have successful response cached, but authorizer reported unauthorized.") | ||||
| 		}() | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -22,6 +22,7 @@ import ( | ||||
| 	"time" | ||||
|  | ||||
| 	"k8s.io/kubernetes/pkg/api" | ||||
| 	apierrors "k8s.io/kubernetes/pkg/api/errors" | ||||
| 	"k8s.io/kubernetes/pkg/api/unversioned" | ||||
| 	"k8s.io/kubernetes/pkg/apimachinery/registered" | ||||
| 	"k8s.io/kubernetes/pkg/client/restclient" | ||||
| @@ -70,28 +71,40 @@ func NewGenericWebhook(kubeConfigFile string, groupVersions []unversioned.GroupV | ||||
| 	return &GenericWebhook{restClient, initialBackoff}, nil | ||||
| } | ||||
|  | ||||
| // WithExponentialBackoff will retry webhookFn 5 times w/ exponentially | ||||
| // increasing backoff when a 429 or a 5xx response code is returned. | ||||
| // WithExponentialBackoff will retry webhookFn() up to 5 times with exponentially increasing backoff when | ||||
| // it returns an error for which apierrors.SuggestsClientDelay() or apierrors.IsInternalError() returns true. | ||||
| func (g *GenericWebhook) WithExponentialBackoff(webhookFn func() restclient.Result) restclient.Result { | ||||
| 	var result restclient.Result | ||||
| 	WithExponentialBackoff(g.initialBackoff, func() error { | ||||
| 		result = webhookFn() | ||||
| 		return result.Error() | ||||
| 	}) | ||||
| 	return result | ||||
| } | ||||
|  | ||||
| // WithExponentialBackoff will retry webhookFn() up to 5 times with exponentially increasing backoff when | ||||
| // it returns an error for which apierrors.SuggestsClientDelay() or apierrors.IsInternalError() returns true. | ||||
| func WithExponentialBackoff(initialBackoff time.Duration, webhookFn func() error) error { | ||||
| 	backoff := wait.Backoff{ | ||||
| 		Duration: g.initialBackoff, | ||||
| 		Duration: initialBackoff, | ||||
| 		Factor:   1.5, | ||||
| 		Jitter:   0.2, | ||||
| 		Steps:    5, | ||||
| 	} | ||||
| 	var result restclient.Result | ||||
|  | ||||
| 	var err error | ||||
| 	wait.ExponentialBackoff(backoff, func() (bool, error) { | ||||
| 		result = webhookFn() | ||||
| 		// Return from Request.Do() errors immediately. | ||||
| 		if err := result.Error(); err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
| 		// Retry 429s, and 5xxs. | ||||
| 		var statusCode int | ||||
| 		if result.StatusCode(&statusCode); statusCode == 429 || statusCode >= 500 { | ||||
| 		err = webhookFn() | ||||
| 		if _, shouldRetry := apierrors.SuggestsClientDelay(err); shouldRetry { | ||||
| 			return false, nil | ||||
| 		} | ||||
| 		if apierrors.IsInternalError(err) { | ||||
| 			return false, nil | ||||
| 		} | ||||
| 		if err != nil { | ||||
| 			return false, err | ||||
| 		} | ||||
| 		return true, nil | ||||
| 	}) | ||||
| 	return result | ||||
| 	return err | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Jordan Liggitt
					Jordan Liggitt