Merge pull request #120729 from pohly/events-context
k8s.io/client-go/tools/[events|record]: support context
This commit is contained in:
		| @@ -29,6 +29,8 @@ structured k8s.io/apiserver/pkg/server/options/encryptionconfig/.* | ||||
| # TODO next: contextual k8s.io/kubernetes/pkg/scheduler/.* | ||||
| # A few files involved in startup migrated already to contextual | ||||
| # We can't enable contextual logcheck until all are migrated | ||||
| contextual k8s.io/client-go/tools/events/.* | ||||
| contextual k8s.io/client-go/tools/record/.* | ||||
| contextual k8s.io/dynamic-resource-allocation/.* | ||||
| contextual k8s.io/kubernetes/cmd/kube-scheduler/.* | ||||
| contextual k8s.io/kubernetes/pkg/controller/.* | ||||
|   | ||||
| @@ -81,27 +81,27 @@ type EventSinkImpl struct { | ||||
| } | ||||
|  | ||||
| // Create takes the representation of a event and creates it. Returns the server's representation of the event, and an error, if there is any. | ||||
| func (e *EventSinkImpl) Create(event *eventsv1.Event) (*eventsv1.Event, error) { | ||||
| func (e *EventSinkImpl) Create(ctx context.Context, event *eventsv1.Event) (*eventsv1.Event, error) { | ||||
| 	if event.Namespace == "" { | ||||
| 		return nil, fmt.Errorf("can't create an event with empty namespace") | ||||
| 	} | ||||
| 	return e.Interface.Events(event.Namespace).Create(context.TODO(), event, metav1.CreateOptions{}) | ||||
| 	return e.Interface.Events(event.Namespace).Create(ctx, event, metav1.CreateOptions{}) | ||||
| } | ||||
|  | ||||
| // Update takes the representation of a event and updates it. Returns the server's representation of the event, and an error, if there is any. | ||||
| func (e *EventSinkImpl) Update(event *eventsv1.Event) (*eventsv1.Event, error) { | ||||
| func (e *EventSinkImpl) Update(ctx context.Context, event *eventsv1.Event) (*eventsv1.Event, error) { | ||||
| 	if event.Namespace == "" { | ||||
| 		return nil, fmt.Errorf("can't update an event with empty namespace") | ||||
| 	} | ||||
| 	return e.Interface.Events(event.Namespace).Update(context.TODO(), event, metav1.UpdateOptions{}) | ||||
| 	return e.Interface.Events(event.Namespace).Update(ctx, event, metav1.UpdateOptions{}) | ||||
| } | ||||
|  | ||||
| // Patch applies the patch and returns the patched event, and an error, if there is any. | ||||
| func (e *EventSinkImpl) Patch(event *eventsv1.Event, data []byte) (*eventsv1.Event, error) { | ||||
| func (e *EventSinkImpl) Patch(ctx context.Context, event *eventsv1.Event, data []byte) (*eventsv1.Event, error) { | ||||
| 	if event.Namespace == "" { | ||||
| 		return nil, fmt.Errorf("can't patch an event with empty namespace") | ||||
| 	} | ||||
| 	return e.Interface.Events(event.Namespace).Patch(context.TODO(), event.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) | ||||
| 	return e.Interface.Events(event.Namespace).Patch(ctx, event.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{}) | ||||
| } | ||||
|  | ||||
| // NewBroadcaster Creates a new event broadcaster. | ||||
| @@ -124,13 +124,13 @@ func (e *eventBroadcasterImpl) Shutdown() { | ||||
| } | ||||
|  | ||||
| // refreshExistingEventSeries refresh events TTL | ||||
| func (e *eventBroadcasterImpl) refreshExistingEventSeries() { | ||||
| func (e *eventBroadcasterImpl) refreshExistingEventSeries(ctx context.Context) { | ||||
| 	// TODO: Investigate whether lock contention won't be a problem | ||||
| 	e.mu.Lock() | ||||
| 	defer e.mu.Unlock() | ||||
| 	for isomorphicKey, event := range e.eventCache { | ||||
| 		if event.Series != nil { | ||||
| 			if recordedEvent, retry := recordEvent(e.sink, event); !retry { | ||||
| 			if recordedEvent, retry := recordEvent(ctx, e.sink, event); !retry { | ||||
| 				if recordedEvent != nil { | ||||
| 					e.eventCache[isomorphicKey] = recordedEvent | ||||
| 				} | ||||
| @@ -142,7 +142,7 @@ func (e *eventBroadcasterImpl) refreshExistingEventSeries() { | ||||
| // finishSeries checks if a series has ended and either: | ||||
| // - write final count to the apiserver | ||||
| // - delete a singleton event (i.e. series field is nil) from the cache | ||||
| func (e *eventBroadcasterImpl) finishSeries() { | ||||
| func (e *eventBroadcasterImpl) finishSeries(ctx context.Context) { | ||||
| 	// TODO: Investigate whether lock contention won't be a problem | ||||
| 	e.mu.Lock() | ||||
| 	defer e.mu.Unlock() | ||||
| @@ -150,7 +150,7 @@ func (e *eventBroadcasterImpl) finishSeries() { | ||||
| 		eventSerie := event.Series | ||||
| 		if eventSerie != nil { | ||||
| 			if eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) { | ||||
| 				if _, retry := recordEvent(e.sink, event); !retry { | ||||
| 				if _, retry := recordEvent(ctx, e.sink, event); !retry { | ||||
| 					delete(e.eventCache, isomorphicKey) | ||||
| 				} | ||||
| 			} | ||||
| @@ -161,13 +161,13 @@ func (e *eventBroadcasterImpl) finishSeries() { | ||||
| } | ||||
|  | ||||
| // NewRecorder returns an EventRecorder that records events with the given event source. | ||||
| func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder { | ||||
| func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorderLogger { | ||||
| 	hostname, _ := os.Hostname() | ||||
| 	reportingInstance := reportingController + "-" + hostname | ||||
| 	return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}} | ||||
| 	return &recorderImplLogger{recorderImpl: &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}, logger: klog.Background()} | ||||
| } | ||||
|  | ||||
| func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.Clock) { | ||||
| func (e *eventBroadcasterImpl) recordToSink(ctx context.Context, event *eventsv1.Event, clock clock.Clock) { | ||||
| 	// Make a copy before modification, because there could be multiple listeners. | ||||
| 	eventCopy := event.DeepCopy() | ||||
| 	go func() { | ||||
| @@ -197,7 +197,7 @@ func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.C | ||||
| 		}() | ||||
| 		if evToRecord != nil { | ||||
| 			// TODO: Add a metric counting the number of recording attempts | ||||
| 			e.attemptRecording(evToRecord) | ||||
| 			e.attemptRecording(ctx, evToRecord) | ||||
| 			// We don't want the new recorded Event to be reflected in the | ||||
| 			// client's cache because server-side mutations could mess with the | ||||
| 			// aggregation mechanism used by the client. | ||||
| @@ -205,40 +205,45 @@ func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.C | ||||
| 	}() | ||||
| } | ||||
|  | ||||
| func (e *eventBroadcasterImpl) attemptRecording(event *eventsv1.Event) *eventsv1.Event { | ||||
| func (e *eventBroadcasterImpl) attemptRecording(ctx context.Context, event *eventsv1.Event) { | ||||
| 	tries := 0 | ||||
| 	for { | ||||
| 		if recordedEvent, retry := recordEvent(e.sink, event); !retry { | ||||
| 			return recordedEvent | ||||
| 		if _, retry := recordEvent(ctx, e.sink, event); !retry { | ||||
| 			return | ||||
| 		} | ||||
| 		tries++ | ||||
| 		if tries >= maxTriesPerEvent { | ||||
| 			klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) | ||||
| 			return nil | ||||
| 			klog.FromContext(ctx).Error(nil, "Unable to write event (retry limit exceeded!)", "event", event) | ||||
| 			return | ||||
| 		} | ||||
| 		// Randomize sleep so that various clients won't all be | ||||
| 		// synced up if the master goes down. | ||||
| 		time.Sleep(wait.Jitter(e.sleepDuration, 0.25)) | ||||
| 		// synced up if the master goes down. Give up when | ||||
| 		// the context is canceled. | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			return | ||||
| 		case <-time.After(wait.Jitter(e.sleepDuration, 0.25)): | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) { | ||||
| func recordEvent(ctx context.Context, sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) { | ||||
| 	var newEvent *eventsv1.Event | ||||
| 	var err error | ||||
| 	isEventSeries := event.Series != nil | ||||
| 	if isEventSeries { | ||||
| 		patch, patchBytesErr := createPatchBytesForSeries(event) | ||||
| 		if patchBytesErr != nil { | ||||
| 			klog.Errorf("Unable to calculate diff, no merge is possible: %v", patchBytesErr) | ||||
| 			klog.FromContext(ctx).Error(patchBytesErr, "Unable to calculate diff, no merge is possible") | ||||
| 			return nil, false | ||||
| 		} | ||||
| 		newEvent, err = sink.Patch(event, patch) | ||||
| 		newEvent, err = sink.Patch(ctx, event, patch) | ||||
| 	} | ||||
| 	// Update can fail because the event may have been removed and it no longer exists. | ||||
| 	if !isEventSeries || (isEventSeries && util.IsKeyNotFoundError(err)) { | ||||
| 		// Making sure that ResourceVersion is empty on creation | ||||
| 		event.ResourceVersion = "" | ||||
| 		newEvent, err = sink.Create(event) | ||||
| 		newEvent, err = sink.Create(ctx, event) | ||||
| 	} | ||||
| 	if err == nil { | ||||
| 		return newEvent, false | ||||
| @@ -248,7 +253,7 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) | ||||
| 	switch err.(type) { | ||||
| 	case *restclient.RequestConstructionError: | ||||
| 		// We will construct the request the same next time, so don't keep trying. | ||||
| 		klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err) | ||||
| 		klog.FromContext(ctx).Error(err, "Unable to construct event (will not retry!)", "event", event) | ||||
| 		return nil, false | ||||
| 	case *errors.StatusError: | ||||
| 		if errors.IsAlreadyExists(err) { | ||||
| @@ -260,9 +265,9 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) | ||||
| 			if isEventSeries { | ||||
| 				return nil, true | ||||
| 			} | ||||
| 			klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err) | ||||
| 			klog.FromContext(ctx).V(5).Info("Server rejected event (will not retry!)", "event", event, "err", err) | ||||
| 		} else { | ||||
| 			klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err) | ||||
| 			klog.FromContext(ctx).Error(err, "Server rejected event (will not retry!)", "event", event) | ||||
| 		} | ||||
| 		return nil, false | ||||
| 	case *errors.UnexpectedObjectError: | ||||
| @@ -271,7 +276,7 @@ func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) | ||||
| 	default: | ||||
| 		// This case includes actual http transport errors. Go ahead and retry. | ||||
| 	} | ||||
| 	klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err) | ||||
| 	klog.FromContext(ctx).Error(err, "Unable to write event (may retry after sleeping)") | ||||
| 	return nil, true | ||||
| } | ||||
|  | ||||
| @@ -307,29 +312,38 @@ func getKey(event *eventsv1.Event) eventKey { | ||||
| // StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function. | ||||
| // The return value can be ignored or used to stop recording, if desired. | ||||
| // TODO: this function should also return an error. | ||||
| // | ||||
| // Deprecated: use StartLogging instead. | ||||
| func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) func() { | ||||
| 	stopWatcher, err := e.StartEventWatcher( | ||||
| 		func(obj runtime.Object) { | ||||
| 			event, ok := obj.(*eventsv1.Event) | ||||
| 			if !ok { | ||||
| 				klog.Errorf("unexpected type, expected eventsv1.Event") | ||||
| 				return | ||||
| 			} | ||||
| 			klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note) | ||||
| 		}) | ||||
| 	logger := klog.Background().V(int(verbosity)) | ||||
| 	stopWatcher, err := e.StartLogging(logger) | ||||
| 	if err != nil { | ||||
| 		klog.Errorf("failed to start event watcher: '%v'", err) | ||||
| 		logger.Error(err, "Failed to start event watcher") | ||||
| 		return func() {} | ||||
| 	} | ||||
| 	return stopWatcher | ||||
| } | ||||
|  | ||||
| // StartLogging starts sending events received from this EventBroadcaster to the structured logger. | ||||
| // To adjust verbosity, use the logger's V method (i.e. pass `logger.V(3)` instead of `logger`). | ||||
| // The returned function can be ignored or used to stop recording, if desired. | ||||
| func (e *eventBroadcasterImpl) StartLogging(logger klog.Logger) (func(), error) { | ||||
| 	return e.StartEventWatcher( | ||||
| 		func(obj runtime.Object) { | ||||
| 			event, ok := obj.(*eventsv1.Event) | ||||
| 			if !ok { | ||||
| 				logger.Error(nil, "unexpected type, expected eventsv1.Event") | ||||
| 				return | ||||
| 			} | ||||
| 			logger.Info("Event occurred", "object", klog.KRef(event.Regarding.Namespace, event.Regarding.Name), "kind", event.Regarding.Kind, "apiVersion", event.Regarding.APIVersion, "type", event.Type, "reason", event.Reason, "action", event.Action, "note", event.Note) | ||||
| 		}) | ||||
| } | ||||
|  | ||||
| // StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function. | ||||
| // The return value is used to stop recording | ||||
| func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) (func(), error) { | ||||
| 	watcher, err := e.Watch() | ||||
| 	if err != nil { | ||||
| 		klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	go func() { | ||||
| @@ -345,37 +359,42 @@ func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime | ||||
| 	return watcher.Stop, nil | ||||
| } | ||||
|  | ||||
| func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) error { | ||||
| func (e *eventBroadcasterImpl) startRecordingEvents(ctx context.Context) error { | ||||
| 	eventHandler := func(obj runtime.Object) { | ||||
| 		event, ok := obj.(*eventsv1.Event) | ||||
| 		if !ok { | ||||
| 			klog.Errorf("unexpected type, expected eventsv1.Event") | ||||
| 			klog.FromContext(ctx).Error(nil, "unexpected type, expected eventsv1.Event") | ||||
| 			return | ||||
| 		} | ||||
| 		e.recordToSink(event, clock.RealClock{}) | ||||
| 		e.recordToSink(ctx, event, clock.RealClock{}) | ||||
| 	} | ||||
| 	stopWatcher, err := e.StartEventWatcher(eventHandler) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	go func() { | ||||
| 		<-stopCh | ||||
| 		<-ctx.Done() | ||||
| 		stopWatcher() | ||||
| 	}() | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink. | ||||
| // Deprecated: use StartRecordingToSinkWithContext instead. | ||||
| func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) { | ||||
| 	go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh) | ||||
| 	go wait.Until(e.finishSeries, finishTime, stopCh) | ||||
| 	err := e.startRecordingEvents(stopCh) | ||||
| 	err := e.StartRecordingToSinkWithContext(wait.ContextForChannel(stopCh)) | ||||
| 	if err != nil { | ||||
| 		klog.Errorf("unexpected type, expected eventsv1.Event") | ||||
| 		return | ||||
| 		klog.Background().Error(err, "Failed to start recording to sink") | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // StartRecordingToSinkWithContext starts sending events received from the specified eventBroadcaster to the given sink. | ||||
| func (e *eventBroadcasterImpl) StartRecordingToSinkWithContext(ctx context.Context) error { | ||||
| 	go wait.UntilWithContext(ctx, e.refreshExistingEventSeries, refreshTime) | ||||
| 	go wait.UntilWithContext(ctx, e.finishSeries, finishTime) | ||||
| 	return e.startRecordingEvents(ctx) | ||||
| } | ||||
|  | ||||
| type eventBroadcasterAdapterImpl struct { | ||||
| 	coreClient          typedv1core.EventsGetter | ||||
| 	coreBroadcaster     record.EventBroadcaster | ||||
| @@ -409,14 +428,14 @@ func (e *eventBroadcasterAdapterImpl) StartRecordingToSink(stopCh <-chan struct{ | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorder { | ||||
| func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorderLogger { | ||||
| 	if e.eventsv1Broadcaster != nil && e.eventsv1Client != nil { | ||||
| 		return e.eventsv1Broadcaster.NewRecorder(scheme.Scheme, name) | ||||
| 	} | ||||
| 	return record.NewEventRecorderAdapter(e.DeprecatedNewLegacyRecorder(name)) | ||||
| } | ||||
|  | ||||
| func (e *eventBroadcasterAdapterImpl) DeprecatedNewLegacyRecorder(name string) record.EventRecorder { | ||||
| func (e *eventBroadcasterAdapterImpl) DeprecatedNewLegacyRecorder(name string) record.EventRecorderLogger { | ||||
| 	return e.coreBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: name}) | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -25,6 +25,7 @@ import ( | ||||
| 	eventsv1 "k8s.io/api/events/v1" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/client-go/kubernetes/fake" | ||||
| 	"k8s.io/klog/v2/ktesting" | ||||
| ) | ||||
|  | ||||
| func TestRecordEventToSink(t *testing.T) { | ||||
| @@ -78,11 +79,12 @@ func TestRecordEventToSink(t *testing.T) { | ||||
|  | ||||
| 	for _, tc := range testCases { | ||||
| 		t.Run(tc.name, func(t *testing.T) { | ||||
| 			_, ctx := ktesting.NewTestContext(t) | ||||
| 			kubeClient := fake.NewSimpleClientset() | ||||
| 			eventSink := &EventSinkImpl{Interface: kubeClient.EventsV1()} | ||||
|  | ||||
| 			for _, ev := range tc.eventsToRecord { | ||||
| 				recordEvent(eventSink, &ev) | ||||
| 				recordEvent(ctx, eventSink, &ev) | ||||
| 			} | ||||
|  | ||||
| 			recordedEvents, err := kubeClient.EventsV1().Events(metav1.NamespaceDefault).List(context.TODO(), metav1.ListOptions{}) | ||||
|   | ||||
| @@ -40,12 +40,33 @@ type recorderImpl struct { | ||||
| 	clock clock.Clock | ||||
| } | ||||
|  | ||||
| var _ EventRecorder = &recorderImpl{} | ||||
|  | ||||
| func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) { | ||||
| 	recorder.eventf(klog.Background(), regarding, related, eventtype, reason, action, note, args...) | ||||
| } | ||||
|  | ||||
| type recorderImplLogger struct { | ||||
| 	*recorderImpl | ||||
| 	logger klog.Logger | ||||
| } | ||||
|  | ||||
| var _ EventRecorderLogger = &recorderImplLogger{} | ||||
|  | ||||
| func (recorder *recorderImplLogger) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) { | ||||
| 	recorder.eventf(recorder.logger, regarding, related, eventtype, reason, action, note, args...) | ||||
| } | ||||
|  | ||||
| func (recorder *recorderImplLogger) WithLogger(logger klog.Logger) EventRecorderLogger { | ||||
| 	return &recorderImplLogger{recorderImpl: recorder.recorderImpl, logger: logger} | ||||
| } | ||||
|  | ||||
| func (recorder *recorderImpl) eventf(logger klog.Logger, regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) { | ||||
| 	timestamp := metav1.MicroTime{Time: time.Now()} | ||||
| 	message := fmt.Sprintf(note, args...) | ||||
| 	refRegarding, err := reference.GetReference(recorder.scheme, regarding) | ||||
| 	if err != nil { | ||||
| 		klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", regarding, err, eventtype, reason, message) | ||||
| 		logger.Error(err, "Could not construct reference, will not report event", "object", regarding, "eventType", eventtype, "reason", reason, "message", message) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| @@ -53,11 +74,11 @@ func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.O | ||||
| 	if related != nil { | ||||
| 		refRelated, err = reference.GetReference(recorder.scheme, related) | ||||
| 		if err != nil { | ||||
| 			klog.V(9).Infof("Could not construct reference to: '%#v' due to: '%v'.", related, err) | ||||
| 			logger.V(9).Info("Could not construct reference", "object", related, "err", err) | ||||
| 		} | ||||
| 	} | ||||
| 	if !util.ValidateEventType(eventtype) { | ||||
| 		klog.Errorf("Unsupported event type: '%v'", eventtype) | ||||
| 		logger.Error(nil, "Unsupported event type", "eventType", eventtype) | ||||
| 		return | ||||
| 	} | ||||
| 	event := recorder.makeEvent(refRegarding, refRelated, timestamp, eventtype, reason, message, recorder.reportingController, recorder.reportingInstance, action) | ||||
|   | ||||
| @@ -34,6 +34,7 @@ import ( | ||||
| 	"k8s.io/client-go/kubernetes/scheme" | ||||
| 	restclient "k8s.io/client-go/rest" | ||||
| 	ref "k8s.io/client-go/tools/reference" | ||||
| 	"k8s.io/klog/v2/ktesting" | ||||
| ) | ||||
|  | ||||
| type testEventSeriesSink struct { | ||||
| @@ -43,7 +44,7 @@ type testEventSeriesSink struct { | ||||
| } | ||||
|  | ||||
| // Create records the event for testing. | ||||
| func (t *testEventSeriesSink) Create(e *eventsv1.Event) (*eventsv1.Event, error) { | ||||
| func (t *testEventSeriesSink) Create(ctx context.Context, e *eventsv1.Event) (*eventsv1.Event, error) { | ||||
| 	if t.OnCreate != nil { | ||||
| 		return t.OnCreate(e) | ||||
| 	} | ||||
| @@ -51,7 +52,7 @@ func (t *testEventSeriesSink) Create(e *eventsv1.Event) (*eventsv1.Event, error) | ||||
| } | ||||
|  | ||||
| // Update records the event for testing. | ||||
| func (t *testEventSeriesSink) Update(e *eventsv1.Event) (*eventsv1.Event, error) { | ||||
| func (t *testEventSeriesSink) Update(ctx context.Context, e *eventsv1.Event) (*eventsv1.Event, error) { | ||||
| 	if t.OnUpdate != nil { | ||||
| 		return t.OnUpdate(e) | ||||
| 	} | ||||
| @@ -59,7 +60,7 @@ func (t *testEventSeriesSink) Update(e *eventsv1.Event) (*eventsv1.Event, error) | ||||
| } | ||||
|  | ||||
| // Patch records the event for testing. | ||||
| func (t *testEventSeriesSink) Patch(e *eventsv1.Event, p []byte) (*eventsv1.Event, error) { | ||||
| func (t *testEventSeriesSink) Patch(ctx context.Context, e *eventsv1.Event, p []byte) (*eventsv1.Event, error) { | ||||
| 	if t.OnPatch != nil { | ||||
| 		return t.OnPatch(e, p) | ||||
| 	} | ||||
| @@ -135,7 +136,9 @@ func TestEventSeriesf(t *testing.T) { | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	stopCh := make(chan struct{}) | ||||
| 	_, ctx := ktesting.NewTestContext(t) | ||||
| 	ctx, cancel := context.WithCancel(ctx) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	createEvent := make(chan *eventsv1.Event) | ||||
| 	updateEvent := make(chan *eventsv1.Event) | ||||
| @@ -163,7 +166,7 @@ func TestEventSeriesf(t *testing.T) { | ||||
| 	// Don't call StartRecordingToSink, as we don't need neither refreshing event | ||||
| 	// series nor finishing them in this tests and additional events updated would | ||||
| 	// race with our expected ones. | ||||
| 	err = broadcaster.startRecordingEvents(stopCh) | ||||
| 	err = broadcaster.startRecordingEvents(ctx) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| @@ -184,7 +187,6 @@ func TestEventSeriesf(t *testing.T) { | ||||
| 			validateEvent(strconv.Itoa(index), false, actualEvent, item.expect, t) | ||||
| 		} | ||||
| 	} | ||||
| 	close(stopCh) | ||||
| } | ||||
|  | ||||
| // TestEventSeriesWithEventSinkImplRace verifies that when Events are emitted to | ||||
| @@ -256,6 +258,7 @@ func validateEvent(messagePrefix string, expectedUpdate bool, actualEvent *event | ||||
| } | ||||
|  | ||||
| func TestFinishSeries(t *testing.T) { | ||||
| 	_, ctx := ktesting.NewTestContext(t) | ||||
| 	hostname, _ := os.Hostname() | ||||
| 	testPod := &v1.Pod{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| @@ -295,7 +298,7 @@ func TestFinishSeries(t *testing.T) { | ||||
| 	} | ||||
| 	cache := map[eventKey]*eventsv1.Event{} | ||||
| 	eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl) | ||||
| 	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl) | ||||
| 	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImplLogger) | ||||
| 	cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{Time: time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started") | ||||
| 	nonFinishedEvent := cachedEvent.DeepCopy() | ||||
| 	nonFinishedEvent.ReportingController = "nonFinished-controller" | ||||
| @@ -305,7 +308,7 @@ func TestFinishSeries(t *testing.T) { | ||||
| 	} | ||||
| 	cache[getKey(cachedEvent)] = cachedEvent | ||||
| 	cache[getKey(nonFinishedEvent)] = nonFinishedEvent | ||||
| 	eventBroadcaster.finishSeries() | ||||
| 	eventBroadcaster.finishSeries(ctx) | ||||
| 	select { | ||||
| 	case actualEvent := <-patchEvent: | ||||
| 		t.Logf("validating event affected by patch request") | ||||
| @@ -327,6 +330,7 @@ func TestFinishSeries(t *testing.T) { | ||||
| } | ||||
|  | ||||
| func TestRefreshExistingEventSeries(t *testing.T) { | ||||
| 	_, ctx := ktesting.NewTestContext(t) | ||||
| 	hostname, _ := os.Hostname() | ||||
| 	testPod := &v1.Pod{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| @@ -381,7 +385,7 @@ func TestRefreshExistingEventSeries(t *testing.T) { | ||||
| 		} | ||||
| 		cache := map[eventKey]*eventsv1.Event{} | ||||
| 		eventBroadcaster := newBroadcaster(&testEvents, 0, cache).(*eventBroadcasterImpl) | ||||
| 		recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImpl) | ||||
| 		recorder := eventBroadcaster.NewRecorder(scheme.Scheme, "k8s.io/kube-foo").(*recorderImplLogger) | ||||
| 		cachedEvent := recorder.makeEvent(regarding, related, metav1.MicroTime{Time: time.Now()}, v1.EventTypeNormal, "test", "some verbose message: 1", "eventTest", "eventTest-"+hostname, "started") | ||||
| 		cachedEvent.Series = &eventsv1.EventSeries{ | ||||
| 			Count:            10, | ||||
| @@ -390,7 +394,7 @@ func TestRefreshExistingEventSeries(t *testing.T) { | ||||
| 		cacheKey := getKey(cachedEvent) | ||||
| 		cache[cacheKey] = cachedEvent | ||||
|  | ||||
| 		eventBroadcaster.refreshExistingEventSeries() | ||||
| 		eventBroadcaster.refreshExistingEventSeries(ctx) | ||||
| 		select { | ||||
| 		case <-patchEvent: | ||||
| 			t.Logf("validating event affected by patch request") | ||||
|   | ||||
| @@ -20,6 +20,7 @@ import ( | ||||
| 	"fmt" | ||||
|  | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/klog/v2" | ||||
| ) | ||||
|  | ||||
| // FakeRecorder is used as a fake during tests. It is thread safe. It is usable | ||||
| @@ -29,6 +30,8 @@ type FakeRecorder struct { | ||||
| 	Events chan string | ||||
| } | ||||
|  | ||||
| var _ EventRecorderLogger = &FakeRecorder{} | ||||
|  | ||||
| // Eventf emits an event | ||||
| func (f *FakeRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) { | ||||
| 	if f.Events != nil { | ||||
| @@ -36,6 +39,10 @@ func (f *FakeRecorder) Eventf(regarding runtime.Object, related runtime.Object, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (f *FakeRecorder) WithLogger(logger klog.Logger) EventRecorderLogger { | ||||
| 	return f | ||||
| } | ||||
|  | ||||
| // NewFakeRecorder creates new fake event recorder with event channel with | ||||
| // buffer of given size. | ||||
| func NewFakeRecorder(bufferSize int) *FakeRecorder { | ||||
|   | ||||
| @@ -17,39 +17,30 @@ limitations under the License. | ||||
| package events | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
|  | ||||
| 	eventsv1 "k8s.io/api/events/v1" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	internalevents "k8s.io/client-go/tools/internal/events" | ||||
| 	"k8s.io/client-go/tools/record" | ||||
| 	"k8s.io/klog/v2" | ||||
| ) | ||||
|  | ||||
| // EventRecorder knows how to record events on behalf of an EventSource. | ||||
| type EventRecorder interface { | ||||
| 	// Eventf constructs an event from the given information and puts it in the queue for sending. | ||||
| 	// 'regarding' is the object this event is about. Event will make a reference-- or you may also | ||||
| 	// pass a reference to the object directly. | ||||
| 	// 'related' is the secondary object for more complex actions. E.g. when regarding object triggers | ||||
| 	// a creation or deletion of related object. | ||||
| 	// 'type' of this event, and can be one of Normal, Warning. New types could be added in future | ||||
| 	// 'reason' is the reason this event is generated. 'reason' should be short and unique; it | ||||
| 	// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used | ||||
| 	// to automate handling of events, so imagine people writing switch statements to handle them. | ||||
| 	// You want to make that easy. | ||||
| 	// 'action' explains what happened with regarding/what action did the ReportingController | ||||
| 	// (ReportingController is a type of a Controller reporting an Event, e.g. k8s.io/node-controller, k8s.io/kubelet.) | ||||
| 	// take in regarding's name; it should be in UpperCamelCase format (starting with a capital letter). | ||||
| 	// 'note' is intended to be human readable. | ||||
| 	Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) | ||||
| } | ||||
| type EventRecorder = internalevents.EventRecorder | ||||
| type EventRecorderLogger = internalevents.EventRecorderLogger | ||||
|  | ||||
| // EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log. | ||||
| type EventBroadcaster interface { | ||||
| 	// StartRecordingToSink starts sending events received from the specified eventBroadcaster. | ||||
| 	// Deprecated: use StartRecordingToSinkWithContext instead. | ||||
| 	StartRecordingToSink(stopCh <-chan struct{}) | ||||
|  | ||||
| 	// StartRecordingToSink starts sending events received from the specified eventBroadcaster. | ||||
| 	StartRecordingToSinkWithContext(ctx context.Context) error | ||||
|  | ||||
| 	// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster | ||||
| 	// with the event source set to the given event source. | ||||
| 	NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder | ||||
| 	NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorderLogger | ||||
|  | ||||
| 	// StartEventWatcher enables you to watch for emitted events without usage | ||||
| 	// of StartRecordingToSink. This lets you also process events in a custom way (e.g. in tests). | ||||
| @@ -59,8 +50,14 @@ type EventBroadcaster interface { | ||||
|  | ||||
| 	// StartStructuredLogging starts sending events received from this EventBroadcaster to the structured | ||||
| 	// logging function. The return value can be ignored or used to stop recording, if desired. | ||||
| 	// Deprecated: use StartLogging instead. | ||||
| 	StartStructuredLogging(verbosity klog.Level) func() | ||||
|  | ||||
| 	// StartLogging starts sending events received from this EventBroadcaster to the structured logger. | ||||
| 	// To adjust verbosity, use the logger's V method (i.e. pass `logger.V(3)` instead of `logger`). | ||||
| 	// The returned function can be ignored or used to stop recording, if desired. | ||||
| 	StartLogging(logger klog.Logger) (func(), error) | ||||
|  | ||||
| 	// Shutdown shuts down the broadcaster | ||||
| 	Shutdown() | ||||
| } | ||||
| @@ -70,9 +67,9 @@ type EventBroadcaster interface { | ||||
| // It is assumed that EventSink will return the same sorts of errors as | ||||
| // client-go's REST client. | ||||
| type EventSink interface { | ||||
| 	Create(event *eventsv1.Event) (*eventsv1.Event, error) | ||||
| 	Update(event *eventsv1.Event) (*eventsv1.Event, error) | ||||
| 	Patch(oldEvent *eventsv1.Event, data []byte) (*eventsv1.Event, error) | ||||
| 	Create(ctx context.Context, event *eventsv1.Event) (*eventsv1.Event, error) | ||||
| 	Update(ctx context.Context, event *eventsv1.Event) (*eventsv1.Event, error) | ||||
| 	Patch(ctx context.Context, oldEvent *eventsv1.Event, data []byte) (*eventsv1.Event, error) | ||||
| } | ||||
|  | ||||
| // EventBroadcasterAdapter is a auxiliary interface to simplify migration to | ||||
| @@ -85,10 +82,10 @@ type EventBroadcasterAdapter interface { | ||||
| 	StartRecordingToSink(stopCh <-chan struct{}) | ||||
|  | ||||
| 	// NewRecorder creates a new Event Recorder with specified name. | ||||
| 	NewRecorder(name string) EventRecorder | ||||
| 	NewRecorder(name string) EventRecorderLogger | ||||
|  | ||||
| 	// DeprecatedNewLegacyRecorder creates a legacy Event Recorder with specific name. | ||||
| 	DeprecatedNewLegacyRecorder(name string) record.EventRecorder | ||||
| 	DeprecatedNewLegacyRecorder(name string) record.EventRecorderLogger | ||||
|  | ||||
| 	// Shutdown shuts down the broadcaster. | ||||
| 	Shutdown() | ||||
|   | ||||
| @@ -0,0 +1,59 @@ | ||||
| /* | ||||
| Copyright 2019 The Kubernetes Authors. | ||||
|  | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
|  | ||||
|     http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
|  | ||||
| // Package internal is needed to break an import cycle: record.EventRecorderAdapter | ||||
| // needs this interface definition to implement it, but event.NewEventBroadcasterAdapter | ||||
| // needs record.NewBroadcaster. Therefore this interface cannot be in event/interfaces.go. | ||||
| package internal | ||||
|  | ||||
| import ( | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/klog/v2" | ||||
| ) | ||||
|  | ||||
| // EventRecorder knows how to record events on behalf of an EventSource. | ||||
| type EventRecorder interface { | ||||
| 	// Eventf constructs an event from the given information and puts it in the queue for sending. | ||||
| 	// 'regarding' is the object this event is about. Event will make a reference-- or you may also | ||||
| 	// pass a reference to the object directly. | ||||
| 	// 'related' is the secondary object for more complex actions. E.g. when regarding object triggers | ||||
| 	// a creation or deletion of related object. | ||||
| 	// 'type' of this event, and can be one of Normal, Warning. New types could be added in future | ||||
| 	// 'reason' is the reason this event is generated. 'reason' should be short and unique; it | ||||
| 	// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used | ||||
| 	// to automate handling of events, so imagine people writing switch statements to handle them. | ||||
| 	// You want to make that easy. | ||||
| 	// 'action' explains what happened with regarding/what action did the ReportingController | ||||
| 	// (ReportingController is a type of a Controller reporting an Event, e.g. k8s.io/node-controller, k8s.io/kubelet.) | ||||
| 	// take in regarding's name; it should be in UpperCamelCase format (starting with a capital letter). | ||||
| 	// 'note' is intended to be human readable. | ||||
| 	Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) | ||||
| } | ||||
|  | ||||
| // EventRecorderLogger extends EventRecorder such that a logger can | ||||
| // be set for methods in EventRecorder. Normally, those methods | ||||
| // uses the global default logger to record errors and debug messages. | ||||
| // If that is not desired, use WithLogger to provide a logger instance. | ||||
| type EventRecorderLogger interface { | ||||
| 	EventRecorder | ||||
|  | ||||
| 	// WithLogger replaces the context used for logging. This is a cheap call | ||||
| 	// and meant to be used for contextual logging: | ||||
| 	//    recorder := ... | ||||
| 	//    logger := klog.FromContext(ctx) | ||||
| 	//    recorder.WithLogger(logger).Eventf(...) | ||||
| 	WithLogger(logger klog.Logger) EventRecorderLogger | ||||
| } | ||||
| @@ -29,6 +29,7 @@ import ( | ||||
| 	utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||||
| 	"k8s.io/apimachinery/pkg/watch" | ||||
| 	restclient "k8s.io/client-go/rest" | ||||
| 	internalevents "k8s.io/client-go/tools/internal/events" | ||||
| 	"k8s.io/client-go/tools/record/util" | ||||
| 	ref "k8s.io/client-go/tools/reference" | ||||
| 	"k8s.io/klog/v2" | ||||
| @@ -110,6 +111,21 @@ type EventRecorder interface { | ||||
| 	AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) | ||||
| } | ||||
|  | ||||
| // EventRecorderLogger extends EventRecorder such that a logger can | ||||
| // be set for methods in EventRecorder. Normally, those methods | ||||
| // uses the global default logger to record errors and debug messages. | ||||
| // If that is not desired, use WithLogger to provide a logger instance. | ||||
| type EventRecorderLogger interface { | ||||
| 	EventRecorder | ||||
|  | ||||
| 	// WithLogger replaces the context used for logging. This is a cheap call | ||||
| 	// and meant to be used for contextual logging: | ||||
| 	//    recorder := ... | ||||
| 	//    logger := klog.FromContext(ctx) | ||||
| 	//    recorder.WithLogger(logger).Eventf(...) | ||||
| 	WithLogger(logger klog.Logger) EventRecorderLogger | ||||
| } | ||||
|  | ||||
| // EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log. | ||||
| type EventBroadcaster interface { | ||||
| 	// StartEventWatcher starts sending events received from this EventBroadcaster to the given | ||||
| @@ -131,7 +147,7 @@ type EventBroadcaster interface { | ||||
|  | ||||
| 	// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster | ||||
| 	// with the event source set to the given event source. | ||||
| 	NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder | ||||
| 	NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger | ||||
|  | ||||
| 	// Shutdown shuts down the broadcaster. Once the broadcaster is shut | ||||
| 	// down, it will only try to record an event in a sink once before | ||||
| @@ -142,12 +158,14 @@ type EventBroadcaster interface { | ||||
| // EventRecorderAdapter is a wrapper around a "k8s.io/client-go/tools/record".EventRecorder | ||||
| // implementing the new "k8s.io/client-go/tools/events".EventRecorder interface. | ||||
| type EventRecorderAdapter struct { | ||||
| 	recorder EventRecorder | ||||
| 	recorder EventRecorderLogger | ||||
| } | ||||
|  | ||||
| var _ internalevents.EventRecorder = &EventRecorderAdapter{} | ||||
|  | ||||
| // NewEventRecorderAdapter returns an adapter implementing the new | ||||
| // "k8s.io/client-go/tools/events".EventRecorder interface. | ||||
| func NewEventRecorderAdapter(recorder EventRecorder) *EventRecorderAdapter { | ||||
| func NewEventRecorderAdapter(recorder EventRecorderLogger) *EventRecorderAdapter { | ||||
| 	return &EventRecorderAdapter{ | ||||
| 		recorder: recorder, | ||||
| 	} | ||||
| @@ -158,28 +176,76 @@ func (a *EventRecorderAdapter) Eventf(regarding, _ runtime.Object, eventtype, re | ||||
| 	a.recorder.Eventf(regarding, eventtype, reason, note, args...) | ||||
| } | ||||
|  | ||||
| func (a *EventRecorderAdapter) WithLogger(logger klog.Logger) internalevents.EventRecorderLogger { | ||||
| 	return &EventRecorderAdapter{ | ||||
| 		recorder: a.recorder.WithLogger(logger), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Creates a new event broadcaster. | ||||
| func NewBroadcaster() EventBroadcaster { | ||||
| 	return newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration) | ||||
| func NewBroadcaster(opts ...BroadcasterOption) EventBroadcaster { | ||||
| 	c := config{ | ||||
| 		sleepDuration: defaultSleepDuration, | ||||
| 	} | ||||
| 	for _, opt := range opts { | ||||
| 		opt(&c) | ||||
| 	} | ||||
| 	eventBroadcaster := &eventBroadcasterImpl{ | ||||
| 		Broadcaster:   watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), | ||||
| 		sleepDuration: c.sleepDuration, | ||||
| 		options:       c.CorrelatorOptions, | ||||
| 	} | ||||
| 	ctx := c.Context | ||||
| 	if ctx == nil { | ||||
| 		ctx = context.Background() | ||||
| 	} else { | ||||
| 		// Calling Shutdown is not required when a context was provided: | ||||
| 		// when the context is canceled, this goroutine will shut down | ||||
| 		// the broadcaster. | ||||
| 		go func() { | ||||
| 			<-ctx.Done() | ||||
| 			eventBroadcaster.Broadcaster.Shutdown() | ||||
| 		}() | ||||
| 	} | ||||
| 	eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(ctx) | ||||
| 	return eventBroadcaster | ||||
| } | ||||
|  | ||||
| func NewBroadcasterForTests(sleepDuration time.Duration) EventBroadcaster { | ||||
| 	return newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration) | ||||
| 	return NewBroadcaster(WithSleepDuration(sleepDuration)) | ||||
| } | ||||
|  | ||||
| func NewBroadcasterWithCorrelatorOptions(options CorrelatorOptions) EventBroadcaster { | ||||
| 	eventBroadcaster := newEventBroadcaster(watch.NewLongQueueBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration) | ||||
| 	eventBroadcaster.options = options | ||||
| 	return eventBroadcaster | ||||
| 	return NewBroadcaster(WithCorrelatorOptions(options)) | ||||
| } | ||||
|  | ||||
| func newEventBroadcaster(broadcaster *watch.Broadcaster, sleepDuration time.Duration) *eventBroadcasterImpl { | ||||
| 	eventBroadcaster := &eventBroadcasterImpl{ | ||||
| 		Broadcaster:   broadcaster, | ||||
| 		sleepDuration: sleepDuration, | ||||
| func WithCorrelatorOptions(options CorrelatorOptions) BroadcasterOption { | ||||
| 	return func(c *config) { | ||||
| 		c.CorrelatorOptions = options | ||||
| 	} | ||||
| 	eventBroadcaster.cancelationCtx, eventBroadcaster.cancel = context.WithCancel(context.Background()) | ||||
| 	return eventBroadcaster | ||||
| } | ||||
|  | ||||
| // WithContext sets a context for the broadcaster. Canceling the context will | ||||
| // shut down the broadcaster, Shutdown doesn't need to be called. The context | ||||
| // can also be used to provide a logger. | ||||
| func WithContext(ctx context.Context) BroadcasterOption { | ||||
| 	return func(c *config) { | ||||
| 		c.Context = ctx | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func WithSleepDuration(sleepDuration time.Duration) BroadcasterOption { | ||||
| 	return func(c *config) { | ||||
| 		c.sleepDuration = sleepDuration | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type BroadcasterOption func(*config) | ||||
|  | ||||
| type config struct { | ||||
| 	CorrelatorOptions | ||||
| 	context.Context | ||||
| 	sleepDuration time.Duration | ||||
| } | ||||
|  | ||||
| type eventBroadcasterImpl struct { | ||||
| @@ -220,12 +286,12 @@ func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eve | ||||
| 	} | ||||
| 	tries := 0 | ||||
| 	for { | ||||
| 		if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) { | ||||
| 		if recordEvent(e.cancelationCtx, sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) { | ||||
| 			break | ||||
| 		} | ||||
| 		tries++ | ||||
| 		if tries >= maxTriesPerEvent { | ||||
| 			klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) | ||||
| 			klog.FromContext(e.cancelationCtx).Error(nil, "Unable to write event (retry limit exceeded!)", "event", event) | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| @@ -237,7 +303,7 @@ func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eve | ||||
| 		} | ||||
| 		select { | ||||
| 		case <-e.cancelationCtx.Done(): | ||||
| 			klog.Errorf("Unable to write event '%#v' (broadcaster is shut down)", event) | ||||
| 			klog.FromContext(e.cancelationCtx).Error(nil, "Unable to write event (broadcaster is shut down)", "event", event) | ||||
| 			return | ||||
| 		case <-time.After(delay): | ||||
| 		} | ||||
| @@ -248,7 +314,7 @@ func (e *eventBroadcasterImpl) recordToSink(sink EventSink, event *v1.Event, eve | ||||
| // was successfully recorded or discarded, false if it should be retried. | ||||
| // If updateExistingEvent is false, it creates a new event, otherwise it updates | ||||
| // existing event. | ||||
| func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool { | ||||
| func recordEvent(ctx context.Context, sink EventSink, event *v1.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool { | ||||
| 	var newEvent *v1.Event | ||||
| 	var err error | ||||
| 	if updateExistingEvent { | ||||
| @@ -271,13 +337,13 @@ func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEv | ||||
| 	switch err.(type) { | ||||
| 	case *restclient.RequestConstructionError: | ||||
| 		// We will construct the request the same next time, so don't keep trying. | ||||
| 		klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err) | ||||
| 		klog.FromContext(ctx).Error(err, "Unable to construct event (will not retry!)", "event", event) | ||||
| 		return true | ||||
| 	case *errors.StatusError: | ||||
| 		if errors.IsAlreadyExists(err) || errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { | ||||
| 			klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err) | ||||
| 			klog.FromContext(ctx).V(5).Info("Server rejected event (will not retry!)", "event", event, "err", err) | ||||
| 		} else { | ||||
| 			klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err) | ||||
| 			klog.FromContext(ctx).Error(err, "Server rejected event (will not retry!)", "event", event) | ||||
| 		} | ||||
| 		return true | ||||
| 	case *errors.UnexpectedObjectError: | ||||
| @@ -286,7 +352,7 @@ func recordEvent(sink EventSink, event *v1.Event, patch []byte, updateExistingEv | ||||
| 	default: | ||||
| 		// This case includes actual http transport errors. Go ahead and retry. | ||||
| 	} | ||||
| 	klog.Errorf("Unable to write event: '%#v': '%v'(may retry after sleeping)", event, err) | ||||
| 	klog.FromContext(ctx).Error(err, "Unable to write event (may retry after sleeping)", "event", event) | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| @@ -299,12 +365,15 @@ func (e *eventBroadcasterImpl) StartLogging(logf func(format string, args ...int | ||||
| 		}) | ||||
| } | ||||
|  | ||||
| // StartStructuredLogging starts sending events received from this EventBroadcaster to the structured logging function. | ||||
| // StartStructuredLogging starts sending events received from this EventBroadcaster to a structured logger. | ||||
| // The logger is retrieved from a context if the broadcaster was constructed with a context, otherwise | ||||
| // the global default is used. | ||||
| // The return value can be ignored or used to stop recording, if desired. | ||||
| func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watch.Interface { | ||||
| 	loggerV := klog.FromContext(e.cancelationCtx).V(int(verbosity)) | ||||
| 	return e.StartEventWatcher( | ||||
| 		func(e *v1.Event) { | ||||
| 			klog.V(verbosity).InfoS("Event occurred", "object", klog.KRef(e.InvolvedObject.Namespace, e.InvolvedObject.Name), "fieldPath", e.InvolvedObject.FieldPath, "kind", e.InvolvedObject.Kind, "apiVersion", e.InvolvedObject.APIVersion, "type", e.Type, "reason", e.Reason, "message", e.Message) | ||||
| 			loggerV.Info("Event occurred", "object", klog.KRef(e.InvolvedObject.Namespace, e.InvolvedObject.Name), "fieldPath", e.InvolvedObject.FieldPath, "kind", e.InvolvedObject.Kind, "apiVersion", e.InvolvedObject.APIVersion, "type", e.Type, "reason", e.Reason, "message", e.Message) | ||||
| 		}) | ||||
| } | ||||
|  | ||||
| @@ -313,26 +382,32 @@ func (e *eventBroadcasterImpl) StartStructuredLogging(verbosity klog.Level) watc | ||||
| func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface { | ||||
| 	watcher, err := e.Watch() | ||||
| 	if err != nil { | ||||
| 		klog.Errorf("Unable start event watcher: '%v' (will not retry!)", err) | ||||
| 		klog.FromContext(e.cancelationCtx).Error(err, "Unable start event watcher (will not retry!)") | ||||
| 	} | ||||
| 	go func() { | ||||
| 		defer utilruntime.HandleCrash() | ||||
| 		for watchEvent := range watcher.ResultChan() { | ||||
| 			event, ok := watchEvent.Object.(*v1.Event) | ||||
| 			if !ok { | ||||
| 				// This is all local, so there's no reason this should | ||||
| 				// ever happen. | ||||
| 				continue | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-e.cancelationCtx.Done(): | ||||
| 				watcher.Stop() | ||||
| 				return | ||||
| 			case watchEvent := <-watcher.ResultChan(): | ||||
| 				event, ok := watchEvent.Object.(*v1.Event) | ||||
| 				if !ok { | ||||
| 					// This is all local, so there's no reason this should | ||||
| 					// ever happen. | ||||
| 					continue | ||||
| 				} | ||||
| 				eventHandler(event) | ||||
| 			} | ||||
| 			eventHandler(event) | ||||
| 		} | ||||
| 	}() | ||||
| 	return watcher | ||||
| } | ||||
|  | ||||
| // NewRecorder returns an EventRecorder that records events with the given event source. | ||||
| func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder { | ||||
| 	return &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}} | ||||
| func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorderLogger { | ||||
| 	return &recorderImplLogger{recorderImpl: &recorderImpl{scheme, source, e.Broadcaster, clock.RealClock{}}, logger: klog.Background()} | ||||
| } | ||||
|  | ||||
| type recorderImpl struct { | ||||
| @@ -342,15 +417,17 @@ type recorderImpl struct { | ||||
| 	clock clock.PassiveClock | ||||
| } | ||||
|  | ||||
| func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, eventtype, reason, message string) { | ||||
| var _ EventRecorder = &recorderImpl{} | ||||
|  | ||||
| func (recorder *recorderImpl) generateEvent(logger klog.Logger, object runtime.Object, annotations map[string]string, eventtype, reason, message string) { | ||||
| 	ref, err := ref.GetReference(recorder.scheme, object) | ||||
| 	if err != nil { | ||||
| 		klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message) | ||||
| 		logger.Error(err, "Could not construct reference, will not report event", "object", object, "eventType", eventtype, "reason", reason, "message", message) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	if !util.ValidateEventType(eventtype) { | ||||
| 		klog.Errorf("Unsupported event type: '%v'", eventtype) | ||||
| 		logger.Error(nil, "Unsupported event type", "eventType", eventtype) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| @@ -367,16 +444,16 @@ func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations m | ||||
| 	// outgoing events anyway). | ||||
| 	sent, err := recorder.ActionOrDrop(watch.Added, event) | ||||
| 	if err != nil { | ||||
| 		klog.Errorf("unable to record event: %v (will not retry!)", err) | ||||
| 		logger.Error(err, "Unable to record event (will not retry!)") | ||||
| 		return | ||||
| 	} | ||||
| 	if !sent { | ||||
| 		klog.Errorf("unable to record event: too many queued events, dropped event %#v", event) | ||||
| 		logger.Error(nil, "Unable to record event: too many queued events, dropped event", "event", event) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) { | ||||
| 	recorder.generateEvent(object, nil, eventtype, reason, message) | ||||
| 	recorder.generateEvent(klog.Background(), object, nil, eventtype, reason, message) | ||||
| } | ||||
|  | ||||
| func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { | ||||
| @@ -384,7 +461,7 @@ func (recorder *recorderImpl) Eventf(object runtime.Object, eventtype, reason, m | ||||
| } | ||||
|  | ||||
| func (recorder *recorderImpl) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) { | ||||
| 	recorder.generateEvent(object, annotations, eventtype, reason, fmt.Sprintf(messageFmt, args...)) | ||||
| 	recorder.generateEvent(klog.Background(), object, annotations, eventtype, reason, fmt.Sprintf(messageFmt, args...)) | ||||
| } | ||||
|  | ||||
| func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event { | ||||
| @@ -408,3 +485,26 @@ func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map | ||||
| 		Type:           eventtype, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type recorderImplLogger struct { | ||||
| 	*recorderImpl | ||||
| 	logger klog.Logger | ||||
| } | ||||
|  | ||||
| var _ EventRecorderLogger = &recorderImplLogger{} | ||||
|  | ||||
| func (recorder recorderImplLogger) Event(object runtime.Object, eventtype, reason, message string) { | ||||
| 	recorder.recorderImpl.generateEvent(recorder.logger, object, nil, eventtype, reason, message) | ||||
| } | ||||
|  | ||||
| func (recorder recorderImplLogger) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { | ||||
| 	recorder.Event(object, eventtype, reason, fmt.Sprintf(messageFmt, args...)) | ||||
| } | ||||
|  | ||||
| func (recorder recorderImplLogger) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) { | ||||
| 	recorder.generateEvent(recorder.logger, object, annotations, eventtype, reason, fmt.Sprintf(messageFmt, args...)) | ||||
| } | ||||
|  | ||||
| func (recorder recorderImplLogger) WithLogger(logger klog.Logger) EventRecorderLogger { | ||||
| 	return recorderImplLogger{recorderImpl: recorder.recorderImpl, logger: logger} | ||||
| } | ||||
|   | ||||
| @@ -112,7 +112,7 @@ func TestNonRacyShutdown(t *testing.T) { | ||||
|  | ||||
| 	caster := NewBroadcasterForTests(0) | ||||
| 	clock := testclocks.NewFakeClock(time.Now()) | ||||
| 	recorder := recorderWithFakeClock(v1.EventSource{Component: "eventTest"}, caster, clock) | ||||
| 	recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, caster, clock) | ||||
|  | ||||
| 	var wg sync.WaitGroup | ||||
| 	wg.Add(100) | ||||
| @@ -381,7 +381,7 @@ func TestEventf(t *testing.T) { | ||||
| 	sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) | ||||
|  | ||||
| 	clock := testclocks.NewFakeClock(time.Now()) | ||||
| 	recorder := recorderWithFakeClock(v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock) | ||||
| 	recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock) | ||||
| 	for index, item := range table { | ||||
| 		clock.Step(1 * time.Second) | ||||
| 		logWatcher := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { | ||||
| @@ -407,7 +407,7 @@ func TestEventf(t *testing.T) { | ||||
| 	sinkWatcher.Stop() | ||||
| } | ||||
|  | ||||
| func recorderWithFakeClock(eventSource v1.EventSource, eventBroadcaster EventBroadcaster, clock clock.Clock) EventRecorder { | ||||
| func recorderWithFakeClock(t *testing.T, eventSource v1.EventSource, eventBroadcaster EventBroadcaster, clock clock.Clock) EventRecorder { | ||||
| 	return &recorderImpl{scheme.Scheme, eventSource, eventBroadcaster.(*eventBroadcasterImpl).Broadcaster, clock} | ||||
| } | ||||
|  | ||||
| @@ -662,7 +662,7 @@ func TestEventfNoNamespace(t *testing.T) { | ||||
| 	sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) | ||||
|  | ||||
| 	clock := testclocks.NewFakeClock(time.Now()) | ||||
| 	recorder := recorderWithFakeClock(v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock) | ||||
| 	recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock) | ||||
|  | ||||
| 	for index, item := range table { | ||||
| 		clock.Step(1 * time.Second) | ||||
| @@ -955,7 +955,7 @@ func TestMultiSinkCache(t *testing.T) { | ||||
|  | ||||
| 	eventBroadcaster := NewBroadcasterForTests(0) | ||||
| 	clock := testclocks.NewFakeClock(time.Now()) | ||||
| 	recorder := recorderWithFakeClock(v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock) | ||||
| 	recorder := recorderWithFakeClock(t, v1.EventSource{Component: "eventTest"}, eventBroadcaster, clock) | ||||
|  | ||||
| 	sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) | ||||
| 	for index, item := range table { | ||||
|   | ||||
| @@ -20,6 +20,7 @@ import ( | ||||
| 	"fmt" | ||||
|  | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/klog/v2" | ||||
| ) | ||||
|  | ||||
| // FakeRecorder is used as a fake during tests. It is thread safe. It is usable | ||||
| @@ -31,6 +32,8 @@ type FakeRecorder struct { | ||||
| 	IncludeObject bool | ||||
| } | ||||
|  | ||||
| var _ EventRecorderLogger = &FakeRecorder{} | ||||
|  | ||||
| func objectString(object runtime.Object, includeObject bool) string { | ||||
| 	if !includeObject { | ||||
| 		return "" | ||||
| @@ -68,6 +71,10 @@ func (f *FakeRecorder) AnnotatedEventf(object runtime.Object, annotations map[st | ||||
| 	f.writeEvent(object, annotations, eventtype, reason, messageFmt, args...) | ||||
| } | ||||
|  | ||||
| func (f *FakeRecorder) WithLogger(logger klog.Logger) EventRecorderLogger { | ||||
| 	return f | ||||
| } | ||||
|  | ||||
| // NewFakeRecorder creates new fake event recorder with event channel with | ||||
| // buffer of given size. | ||||
| func NewFakeRecorder(bufferSize int) *FakeRecorder { | ||||
|   | ||||
							
								
								
									
										1
									
								
								vendor/modules.txt
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								vendor/modules.txt
									
									
									
									
										vendored
									
									
								
							| @@ -1915,6 +1915,7 @@ k8s.io/client-go/tools/clientcmd/api | ||||
| k8s.io/client-go/tools/clientcmd/api/latest | ||||
| k8s.io/client-go/tools/clientcmd/api/v1 | ||||
| k8s.io/client-go/tools/events | ||||
| k8s.io/client-go/tools/internal/events | ||||
| k8s.io/client-go/tools/leaderelection | ||||
| k8s.io/client-go/tools/leaderelection/resourcelock | ||||
| k8s.io/client-go/tools/metrics | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot