Add HTTP 410 (Gone) status code checks to reflector and relist with RV=''
This commit is contained in:
		| @@ -201,6 +201,7 @@ func NewApplyConflict(causes []metav1.StatusCause, message string) *StatusError | ||||
| } | ||||
|  | ||||
| // NewGone returns an error indicating the item no longer available at the server and no forwarding address is known. | ||||
| // DEPRECATED: Please use NewResourceExpired instead. | ||||
| func NewGone(message string) *StatusError { | ||||
| 	return &StatusError{metav1.Status{ | ||||
| 		Status:  metav1.StatusFailure, | ||||
|   | ||||
| @@ -468,7 +468,7 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w | ||||
| 		return result, nil | ||||
| 	} | ||||
| 	if resourceVersion < oldest-1 { | ||||
| 		return nil, errors.NewGone(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1)) | ||||
| 		return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d (%d)", resourceVersion, oldest-1)) | ||||
| 	} | ||||
|  | ||||
| 	// Binary search the smallest index at which resourceVersion is greater than the given one. | ||||
|   | ||||
| @@ -74,6 +74,9 @@ type Reflector struct { | ||||
| 	// observed when doing a sync with the underlying store | ||||
| 	// it is thread safe, but not synchronized with the underlying store | ||||
| 	lastSyncResourceVersion string | ||||
| 	// isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion | ||||
| 	// failed with an HTTP 410 (Gone) status code. | ||||
| 	isLastSyncResourceVersionGone bool | ||||
| 	// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion | ||||
| 	lastSyncResourceVersionMutex sync.RWMutex | ||||
| 	// WatchListPageSize is the requested chunk size of initial and resync watch lists. | ||||
| @@ -208,19 +211,16 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { | ||||
| 			if r.WatchListPageSize != 0 { | ||||
| 				pager.PageSize = r.WatchListPageSize | ||||
| 			} | ||||
| 			// Pager falls back to full list if paginated list calls fail due to an "Expired" error on the 2nd page or later, | ||||
| 			// but still my return an "Expired" error if the 1st page fails with "Expired" or the full list fails with "Expired". | ||||
| 			list, err = pager.List(context.Background(), options) | ||||
| 			if apierrs.IsResourceExpired(err) { | ||||
| 				// For Kubernetes 1.16 and earlier, if the watch cache is disabled for a resource, list requests | ||||
| 				// with LastSyncResourceVersion set to a non-zero ResourceVersion will fail if the exact ResourceVersion | ||||
| 				// requested is expired (e.g. an etcd compaction has remove it). | ||||
| 				// To prevent the reflector from getting stuck retrying a list for an expired resource version in this | ||||
| 				// case, we set ResourceVersion="" and list again to re-establish reflector to the latest available | ||||
| 				// ResourceVersion, using a consistent read from etcd. This is also safe to do if watch cache is enabled | ||||
| 				// and the list request returned a "Expired" error. | ||||
| 				options = metav1.ListOptions{ResourceVersion: ""} | ||||
|  | ||||
| 			list, err = pager.List(context.Background(), options) | ||||
| 			if isExpiredError(err) { | ||||
| 				r.setIsLastSyncResourceVersionExpired(true) | ||||
| 				// Retry immediately if the resource version used to list is expired. | ||||
| 				// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on | ||||
| 				// continuation pages, but the pager might not be enabled, or the full list might fail because the | ||||
| 				// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all | ||||
| 				// to recover and ensure the reflector makes forward progress. | ||||
| 				list, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) | ||||
| 			} | ||||
| 			close(listCh) | ||||
| 		}() | ||||
| @@ -234,6 +234,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err) | ||||
| 		} | ||||
| 		r.setIsLastSyncResourceVersionExpired(false) // list was successful | ||||
| 		initTrace.Step("Objects listed") | ||||
| 		listMetaInterface, err := meta.ListAccessor(list) | ||||
| 		if err != nil { | ||||
| @@ -307,10 +308,13 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { | ||||
|  | ||||
| 		w, err := r.listerWatcher.Watch(options) | ||||
| 		if err != nil { | ||||
| 			switch err { | ||||
| 			case io.EOF: | ||||
| 			switch { | ||||
| 			case isExpiredError(err): | ||||
| 				r.setIsLastSyncResourceVersionExpired(true) | ||||
| 				klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) | ||||
| 			case err == io.EOF: | ||||
| 				// watch closed normally | ||||
| 			case io.ErrUnexpectedEOF: | ||||
| 			case err == io.ErrUnexpectedEOF: | ||||
| 				klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err) | ||||
| 			default: | ||||
| 				utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err)) | ||||
| @@ -329,7 +333,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { | ||||
| 		if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { | ||||
| 			if err != errorStopRequested { | ||||
| 				switch { | ||||
| 				case apierrs.IsResourceExpired(err): | ||||
| 				case isExpiredError(err): | ||||
| 					r.setIsLastSyncResourceVersionExpired(true) | ||||
| 					klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) | ||||
| 				default: | ||||
| 					klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) | ||||
| @@ -442,16 +447,41 @@ func (r *Reflector) setLastSyncResourceVersion(v string) { | ||||
| 	r.lastSyncResourceVersion = v | ||||
| } | ||||
|  | ||||
| // relistResourceVersion is the resource version the reflector should list or relist from. | ||||
| // relistResourceVersion determines the resource version the reflector should list or relist from. | ||||
| // Returns either the lastSyncResourceVersion so that this reflector will relist with a resource | ||||
| // versions no older than has already been observed in relist results or watch events, or, if the last relist resulted | ||||
| // in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in | ||||
| // etcd via a quorum read. | ||||
| func (r *Reflector) relistResourceVersion() string { | ||||
| 	lastSyncRV := r.LastSyncResourceVersion() | ||||
| 	if lastSyncRV == "" { | ||||
| 		// Explicitly set resource version to have it list from cache for | ||||
| 		// performance reasons. | ||||
| 		// It's fine for the returned state to be stale (we will catch up via Watch() | ||||
| 		// eventually), but we need to be at least as new as the last resource version we | ||||
| 		// synced to avoid going back in time. | ||||
| 	r.lastSyncResourceVersionMutex.RLock() | ||||
| 	defer r.lastSyncResourceVersionMutex.RUnlock() | ||||
|  | ||||
| 	if r.isLastSyncResourceVersionGone { | ||||
| 		// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache | ||||
| 		// if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector | ||||
| 		// to the latest available ResourceVersion, using a consistent read from etcd. | ||||
| 		return "" | ||||
| 	} | ||||
| 	if r.lastSyncResourceVersion == "" { | ||||
| 		// For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to | ||||
| 		// be served from the watch cache if it is enabled. | ||||
| 		return "0" | ||||
| 	} | ||||
| 	return lastSyncRV | ||||
| 	return r.lastSyncResourceVersion | ||||
| } | ||||
|  | ||||
| // setIsLastSyncResourceVersionExpired sets if the last list or watch request with lastSyncResourceVersion returned a | ||||
| // expired error: HTTP 410 (Gone) Status Code. | ||||
| func (r *Reflector) setIsLastSyncResourceVersionExpired(isExpired bool) { | ||||
| 	r.lastSyncResourceVersionMutex.Lock() | ||||
| 	defer r.lastSyncResourceVersionMutex.Unlock() | ||||
| 	r.isLastSyncResourceVersionGone = isExpired | ||||
| } | ||||
|  | ||||
| func isExpiredError(err error) bool { | ||||
| 	// In Kubernetes 1.17 and earlier, the api server returns both apierrs.StatusReasonExpired and | ||||
| 	// apierrs.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent | ||||
| 	// and always returns apierrs.StatusReasonExpired. For backward compatibility we can only remove the apierrs.IsGone | ||||
| 	// check when we fully drop support for Kubernetes 1.17 servers from reflectors. | ||||
| 	return apierrs.IsResourceExpired(err) || apierrs.IsGone(err) | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Joe Betz
					Joe Betz