client-go/reflector: warns when the bookmark event for initial events hasn't been received
This commit is contained in:
		@@ -732,6 +732,8 @@ func watchHandler(start time.Time,
 | 
			
		||||
	stopCh <-chan struct{},
 | 
			
		||||
) error {
 | 
			
		||||
	eventCount := 0
 | 
			
		||||
	initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(name, clock, start, exitOnInitialEventsEndBookmark != nil)
 | 
			
		||||
	defer initialEventsEndBookmarkWarningTicker.Stop()
 | 
			
		||||
	if exitOnInitialEventsEndBookmark != nil {
 | 
			
		||||
		// set it to false just in case somebody
 | 
			
		||||
		// made it positive
 | 
			
		||||
@@ -809,6 +811,9 @@ loop:
 | 
			
		||||
				klog.V(4).Infof("exiting %v Watch because received the bookmark that marks the end of initial events stream, total %v items received in %v", name, eventCount, watchDuration)
 | 
			
		||||
				return nil
 | 
			
		||||
			}
 | 
			
		||||
			initialEventsEndBookmarkWarningTicker.observeLastEventTimeStamp(clock.Now())
 | 
			
		||||
		case <-initialEventsEndBookmarkWarningTicker.C():
 | 
			
		||||
			initialEventsEndBookmarkWarningTicker.warnIfExpired()
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -929,3 +934,88 @@ func isWatchErrorRetriable(err error) bool {
 | 
			
		||||
	}
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// initialEventsEndBookmarkTicker a ticker that produces a warning if the bookmark event
 | 
			
		||||
// which marks the end of the watch stream, has not been received within the defined tick interval.
 | 
			
		||||
//
 | 
			
		||||
// Note:
 | 
			
		||||
// The methods exposed by this type are not thread-safe.
 | 
			
		||||
type initialEventsEndBookmarkTicker struct {
 | 
			
		||||
	clock.Ticker
 | 
			
		||||
	clock clock.Clock
 | 
			
		||||
	name  string
 | 
			
		||||
 | 
			
		||||
	watchStart           time.Time
 | 
			
		||||
	tickInterval         time.Duration
 | 
			
		||||
	lastEventObserveTime time.Time
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// newInitialEventsEndBookmarkTicker returns a noop ticker if exitOnInitialEventsEndBookmarkRequested is false.
 | 
			
		||||
// Otherwise, it returns a ticker that exposes a method producing a warning if the bookmark event,
 | 
			
		||||
// which marks the end of the watch stream, has not been received within the defined tick interval.
 | 
			
		||||
//
 | 
			
		||||
// Note that the caller controls whether to call t.C() and t.Stop().
 | 
			
		||||
//
 | 
			
		||||
// In practice, the reflector exits the watchHandler as soon as the bookmark event is received and calls the t.C() method.
 | 
			
		||||
func newInitialEventsEndBookmarkTicker(name string, c clock.Clock, watchStart time.Time, exitOnInitialEventsEndBookmarkRequested bool) *initialEventsEndBookmarkTicker {
 | 
			
		||||
	return newInitialEventsEndBookmarkTickerInternal(name, c, watchStart, 10*time.Second, exitOnInitialEventsEndBookmarkRequested)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newInitialEventsEndBookmarkTickerInternal(name string, c clock.Clock, watchStart time.Time, tickInterval time.Duration, exitOnInitialEventsEndBookmarkRequested bool) *initialEventsEndBookmarkTicker {
 | 
			
		||||
	clockWithTicker, ok := c.(clock.WithTicker)
 | 
			
		||||
	if !ok || !exitOnInitialEventsEndBookmarkRequested {
 | 
			
		||||
		if exitOnInitialEventsEndBookmarkRequested {
 | 
			
		||||
			klog.Warningf("clock does not support WithTicker interface but exitOnInitialEventsEndBookmark was requested")
 | 
			
		||||
		}
 | 
			
		||||
		return &initialEventsEndBookmarkTicker{
 | 
			
		||||
			Ticker: &noopTicker{},
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &initialEventsEndBookmarkTicker{
 | 
			
		||||
		Ticker:       clockWithTicker.NewTicker(tickInterval),
 | 
			
		||||
		clock:        c,
 | 
			
		||||
		name:         name,
 | 
			
		||||
		watchStart:   watchStart,
 | 
			
		||||
		tickInterval: tickInterval,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *initialEventsEndBookmarkTicker) observeLastEventTimeStamp(lastEventObserveTime time.Time) {
 | 
			
		||||
	t.lastEventObserveTime = lastEventObserveTime
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (t *initialEventsEndBookmarkTicker) warnIfExpired() {
 | 
			
		||||
	if err := t.produceWarningIfExpired(); err != nil {
 | 
			
		||||
		klog.Warning(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// produceWarningIfExpired returns an error that represents a warning when
 | 
			
		||||
// the time elapsed since the last received event exceeds the tickInterval.
 | 
			
		||||
//
 | 
			
		||||
// Note that this method should be called when t.C() yields a value.
 | 
			
		||||
func (t *initialEventsEndBookmarkTicker) produceWarningIfExpired() error {
 | 
			
		||||
	if _, ok := t.Ticker.(*noopTicker); ok {
 | 
			
		||||
		return nil /*noop ticker*/
 | 
			
		||||
	}
 | 
			
		||||
	if t.lastEventObserveTime.IsZero() {
 | 
			
		||||
		return fmt.Errorf("%s: awaiting required bookmark event for initial events stream, no events received for %v", t.name, t.clock.Since(t.watchStart))
 | 
			
		||||
	}
 | 
			
		||||
	elapsedTime := t.clock.Now().Sub(t.lastEventObserveTime)
 | 
			
		||||
	hasBookmarkTimerExpired := elapsedTime >= t.tickInterval
 | 
			
		||||
 | 
			
		||||
	if !hasBookmarkTimerExpired {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	return fmt.Errorf("%s: hasn't received required bookmark event marking the end of initial events stream, received last event %v ago", t.name, elapsedTime)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ clock.Ticker = &noopTicker{}
 | 
			
		||||
 | 
			
		||||
// TODO(#115478): move to k8s/utils repo
 | 
			
		||||
type noopTicker struct{}
 | 
			
		||||
 | 
			
		||||
func (t *noopTicker) C() <-chan time.Time { return nil }
 | 
			
		||||
 | 
			
		||||
func (t *noopTicker) Stop() {}
 | 
			
		||||
 
 | 
			
		||||
@@ -17,13 +17,16 @@ limitations under the License.
 | 
			
		||||
package cache
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"sort"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/google/go-cmp/cmp"
 | 
			
		||||
	"github.com/google/go-cmp/cmp/cmpopts"
 | 
			
		||||
	"github.com/stretchr/testify/require"
 | 
			
		||||
 | 
			
		||||
	v1 "k8s.io/api/core/v1"
 | 
			
		||||
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
@@ -32,10 +35,78 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/types"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/watch"
 | 
			
		||||
	testingclock "k8s.io/utils/clock/testing"
 | 
			
		||||
	"k8s.io/utils/pointer"
 | 
			
		||||
	"k8s.io/utils/ptr"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestInitialEventsEndBookmarkTicker(t *testing.T) {
 | 
			
		||||
	assertNoEvents := func(t *testing.T, c <-chan time.Time) {
 | 
			
		||||
		select {
 | 
			
		||||
		case e := <-c:
 | 
			
		||||
			t.Errorf("Unexpected: %#v event received, expected no events", e)
 | 
			
		||||
		default:
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	t.Run("testing NoopInitialEventsEndBookmarkTicker", func(t *testing.T) {
 | 
			
		||||
		clock := testingclock.NewFakeClock(time.Now())
 | 
			
		||||
		target := newInitialEventsEndBookmarkTickerInternal("testName", clock, clock.Now(), time.Second, false)
 | 
			
		||||
 | 
			
		||||
		clock.Step(30 * time.Second)
 | 
			
		||||
		assertNoEvents(t, target.C())
 | 
			
		||||
		actualWarning := target.produceWarningIfExpired()
 | 
			
		||||
 | 
			
		||||
		require.Empty(t, actualWarning, "didn't expect any warning")
 | 
			
		||||
		// validate if the other methods don't produce panic
 | 
			
		||||
		target.warnIfExpired()
 | 
			
		||||
		target.observeLastEventTimeStamp(clock.Now())
 | 
			
		||||
 | 
			
		||||
		// make sure that after calling the other methods
 | 
			
		||||
		// nothing hasn't changed
 | 
			
		||||
		actualWarning = target.produceWarningIfExpired()
 | 
			
		||||
		require.Empty(t, actualWarning, "didn't expect any warning")
 | 
			
		||||
		assertNoEvents(t, target.C())
 | 
			
		||||
 | 
			
		||||
		target.Stop()
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	t.Run("testing InitialEventsEndBookmarkTicker backed by a fake clock", func(t *testing.T) {
 | 
			
		||||
		clock := testingclock.NewFakeClock(time.Now())
 | 
			
		||||
		target := newInitialEventsEndBookmarkTickerInternal("testName", clock, clock.Now(), time.Second, true)
 | 
			
		||||
		clock.Step(500 * time.Millisecond)
 | 
			
		||||
		assertNoEvents(t, target.C())
 | 
			
		||||
 | 
			
		||||
		clock.Step(500 * time.Millisecond)
 | 
			
		||||
		<-target.C()
 | 
			
		||||
		actualWarning := target.produceWarningIfExpired()
 | 
			
		||||
		require.Equal(t, errors.New("testName: awaiting required bookmark event for initial events stream, no events received for 1s"), actualWarning)
 | 
			
		||||
 | 
			
		||||
		clock.Step(time.Second)
 | 
			
		||||
		<-target.C()
 | 
			
		||||
		actualWarning = target.produceWarningIfExpired()
 | 
			
		||||
		require.Equal(t, errors.New("testName: awaiting required bookmark event for initial events stream, no events received for 2s"), actualWarning)
 | 
			
		||||
 | 
			
		||||
		target.observeLastEventTimeStamp(clock.Now())
 | 
			
		||||
		clock.Step(500 * time.Millisecond)
 | 
			
		||||
		assertNoEvents(t, target.C())
 | 
			
		||||
 | 
			
		||||
		clock.Step(500 * time.Millisecond)
 | 
			
		||||
		<-target.C()
 | 
			
		||||
		actualWarning = target.produceWarningIfExpired()
 | 
			
		||||
		require.Equal(t, errors.New("testName: hasn't received required bookmark event marking the end of initial events stream, received last event 1s ago"), actualWarning)
 | 
			
		||||
 | 
			
		||||
		clock.Step(time.Second)
 | 
			
		||||
		<-target.C()
 | 
			
		||||
		actualWarning = target.produceWarningIfExpired()
 | 
			
		||||
		require.Equal(t, errors.New("testName: hasn't received required bookmark event marking the end of initial events stream, received last event 2s ago"), actualWarning)
 | 
			
		||||
 | 
			
		||||
		target.Stop()
 | 
			
		||||
		assertNoEvents(t, target.C())
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWatchList(t *testing.T) {
 | 
			
		||||
	scenarios := []struct {
 | 
			
		||||
		name                string
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user