Brush up the sample controller
Signed-off-by: Mike Spreitzer <mspreitz@us.ibm.com>
This commit is contained in:
		@@ -61,6 +61,8 @@ const (
 | 
				
			|||||||
	// MessageResourceSynced is the message used for an Event fired when a Foo
 | 
						// MessageResourceSynced is the message used for an Event fired when a Foo
 | 
				
			||||||
	// is synced successfully
 | 
						// is synced successfully
 | 
				
			||||||
	MessageResourceSynced = "Foo synced successfully"
 | 
						MessageResourceSynced = "Foo synced successfully"
 | 
				
			||||||
 | 
						// FieldManager distinguishes this controller from other things writing to API objects
 | 
				
			||||||
 | 
						FieldManager = controllerAgentName
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Controller is the controller implementation for Foo resources
 | 
					// Controller is the controller implementation for Foo resources
 | 
				
			||||||
@@ -80,7 +82,7 @@ type Controller struct {
 | 
				
			|||||||
	// means we can ensure we only process a fixed amount of resources at a
 | 
						// means we can ensure we only process a fixed amount of resources at a
 | 
				
			||||||
	// time, and makes it easy to ensure we are never processing the same item
 | 
						// time, and makes it easy to ensure we are never processing the same item
 | 
				
			||||||
	// simultaneously in two different workers.
 | 
						// simultaneously in two different workers.
 | 
				
			||||||
	workqueue workqueue.TypedRateLimitingInterface[string]
 | 
						workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
 | 
				
			||||||
	// recorder is an event recorder for recording Event resources to the
 | 
						// recorder is an event recorder for recording Event resources to the
 | 
				
			||||||
	// Kubernetes API.
 | 
						// Kubernetes API.
 | 
				
			||||||
	recorder record.EventRecorder
 | 
						recorder record.EventRecorder
 | 
				
			||||||
@@ -106,8 +108,8 @@ func NewController(
 | 
				
			|||||||
	eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
 | 
						eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
 | 
				
			||||||
	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
 | 
						recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
 | 
				
			||||||
	ratelimiter := workqueue.NewTypedMaxOfRateLimiter(
 | 
						ratelimiter := workqueue.NewTypedMaxOfRateLimiter(
 | 
				
			||||||
		workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second),
 | 
							workqueue.NewTypedItemExponentialFailureRateLimiter[cache.ObjectName](5*time.Millisecond, 1000*time.Second),
 | 
				
			||||||
		&workqueue.TypedBucketRateLimiter[string]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
 | 
							&workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	controller := &Controller{
 | 
						controller := &Controller{
 | 
				
			||||||
@@ -196,64 +198,56 @@ func (c *Controller) runWorker(ctx context.Context) {
 | 
				
			|||||||
// processNextWorkItem will read a single work item off the workqueue and
 | 
					// processNextWorkItem will read a single work item off the workqueue and
 | 
				
			||||||
// attempt to process it, by calling the syncHandler.
 | 
					// attempt to process it, by calling the syncHandler.
 | 
				
			||||||
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
 | 
					func (c *Controller) processNextWorkItem(ctx context.Context) bool {
 | 
				
			||||||
	obj, shutdown := c.workqueue.Get()
 | 
						objRef, shutdown := c.workqueue.Get()
 | 
				
			||||||
	logger := klog.FromContext(ctx)
 | 
						logger := klog.FromContext(ctx)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if shutdown {
 | 
						if shutdown {
 | 
				
			||||||
		return false
 | 
							return false
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// We wrap this block in a func so we can defer c.workqueue.Done.
 | 
						// We call Done at the end of this func so the workqueue knows we have
 | 
				
			||||||
	err := func() error {
 | 
						// finished processing this item. We also must remember to call Forget
 | 
				
			||||||
		// We call Done here so the workqueue knows we have finished
 | 
						// if we do not want this work item being re-queued. For example, we do
 | 
				
			||||||
		// processing this item. We also must remember to call Forget if we
 | 
					 | 
				
			||||||
		// do not want this work item being re-queued. For example, we do
 | 
					 | 
				
			||||||
	// not call Forget if a transient error occurs, instead the item is
 | 
						// not call Forget if a transient error occurs, instead the item is
 | 
				
			||||||
	// put back on the workqueue and attempted again after a back-off
 | 
						// put back on the workqueue and attempted again after a back-off
 | 
				
			||||||
	// period.
 | 
						// period.
 | 
				
			||||||
		defer c.workqueue.Done(obj)
 | 
						defer c.workqueue.Done(objRef)
 | 
				
			||||||
		// Run the syncHandler, passing it the namespace/name string of the
 | 
					 | 
				
			||||||
		// Foo resource to be synced.
 | 
					 | 
				
			||||||
		if err := c.syncHandler(ctx, obj); err != nil {
 | 
					 | 
				
			||||||
			// Put the item back on the workqueue to handle any transient errors.
 | 
					 | 
				
			||||||
			c.workqueue.AddRateLimited(obj)
 | 
					 | 
				
			||||||
			return fmt.Errorf("error syncing '%s': %s, requeuing", obj, err.Error())
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		// Finally, if no error occurs we Forget this item so it does not
 | 
					 | 
				
			||||||
		// get queued again until another change happens.
 | 
					 | 
				
			||||||
		c.workqueue.Forget(obj)
 | 
					 | 
				
			||||||
		logger.Info("Successfully synced", "resourceName", obj)
 | 
					 | 
				
			||||||
		return nil
 | 
					 | 
				
			||||||
	}()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err != nil {
 | 
						// Run the syncHandler, passing it the structured reference to the object to be synced.
 | 
				
			||||||
		utilruntime.HandleError(err)
 | 
						err := c.syncHandler(ctx, objRef)
 | 
				
			||||||
 | 
						if err == nil {
 | 
				
			||||||
 | 
							// If no error occurs then we Forget this item so it does not
 | 
				
			||||||
 | 
							// get queued again until another change happens.
 | 
				
			||||||
 | 
							c.workqueue.Forget(objRef)
 | 
				
			||||||
 | 
							logger.Info("Successfully synced", "objectName", objRef)
 | 
				
			||||||
		return true
 | 
							return true
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						// there was a failure so be sure to report it.  This method allows for
 | 
				
			||||||
 | 
						// pluggable error handling which can be used for things like
 | 
				
			||||||
 | 
						// cluster-monitoring.
 | 
				
			||||||
 | 
						utilruntime.HandleErrorWithContext(ctx, err, "error syncing; requeuing", "objectReference", objRef)
 | 
				
			||||||
 | 
						// since we failed, we should requeue the item to work on later.  This
 | 
				
			||||||
 | 
						// method will add a backoff to avoid hotlooping on particular items
 | 
				
			||||||
 | 
						// (they're probably still not going to work right away) and overall
 | 
				
			||||||
 | 
						// controller protection (everything I've done is broken, this controller
 | 
				
			||||||
 | 
						// needs to calm down or it can starve other useful work) cases.
 | 
				
			||||||
 | 
						c.workqueue.AddRateLimited(objRef)
 | 
				
			||||||
	return true
 | 
						return true
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// syncHandler compares the actual state with the desired, and attempts to
 | 
					// syncHandler compares the actual state with the desired, and attempts to
 | 
				
			||||||
// converge the two. It then updates the Status block of the Foo resource
 | 
					// converge the two. It then updates the Status block of the Foo resource
 | 
				
			||||||
// with the current status of the resource.
 | 
					// with the current status of the resource.
 | 
				
			||||||
func (c *Controller) syncHandler(ctx context.Context, key string) error {
 | 
					func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
 | 
				
			||||||
	// Convert the namespace/name string into a distinct namespace and name
 | 
						logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef)
 | 
				
			||||||
	logger := klog.LoggerWithValues(klog.FromContext(ctx), "resourceName", key)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
 | 
					 | 
				
			||||||
		return nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Get the Foo resource with this namespace/name
 | 
						// Get the Foo resource with this namespace/name
 | 
				
			||||||
	foo, err := c.foosLister.Foos(namespace).Get(name)
 | 
						foo, err := c.foosLister.Foos(objectRef.Namespace).Get(objectRef.Name)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		// The Foo resource may no longer exist, in which case we stop
 | 
							// The Foo resource may no longer exist, in which case we stop
 | 
				
			||||||
		// processing.
 | 
							// processing.
 | 
				
			||||||
		if errors.IsNotFound(err) {
 | 
							if errors.IsNotFound(err) {
 | 
				
			||||||
			utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key))
 | 
								utilruntime.HandleError(fmt.Errorf("foo '%#v' in work queue no longer exists", objectRef))
 | 
				
			||||||
			return nil
 | 
								return nil
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -265,7 +259,7 @@ func (c *Controller) syncHandler(ctx context.Context, key string) error {
 | 
				
			|||||||
		// We choose to absorb the error here as the worker would requeue the
 | 
							// We choose to absorb the error here as the worker would requeue the
 | 
				
			||||||
		// resource otherwise. Instead, the next time the resource is updated
 | 
							// resource otherwise. Instead, the next time the resource is updated
 | 
				
			||||||
		// the resource will be queued again.
 | 
							// the resource will be queued again.
 | 
				
			||||||
		utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))
 | 
							utilruntime.HandleError(fmt.Errorf("%#v: deployment name must be specified", objectRef))
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -273,7 +267,7 @@ func (c *Controller) syncHandler(ctx context.Context, key string) error {
 | 
				
			|||||||
	deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
 | 
						deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
 | 
				
			||||||
	// If the resource doesn't exist, we'll create it
 | 
						// If the resource doesn't exist, we'll create it
 | 
				
			||||||
	if errors.IsNotFound(err) {
 | 
						if errors.IsNotFound(err) {
 | 
				
			||||||
		deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{})
 | 
							deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{FieldManager: FieldManager})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// If an error occurs during Get/Create, we'll requeue the item so we can
 | 
						// If an error occurs during Get/Create, we'll requeue the item so we can
 | 
				
			||||||
@@ -296,7 +290,7 @@ func (c *Controller) syncHandler(ctx context.Context, key string) error {
 | 
				
			|||||||
	// should update the Deployment resource.
 | 
						// should update the Deployment resource.
 | 
				
			||||||
	if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
 | 
						if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
 | 
				
			||||||
		logger.V(4).Info("Update deployment resource", "currentReplicas", *foo.Spec.Replicas, "desiredReplicas", *deployment.Spec.Replicas)
 | 
							logger.V(4).Info("Update deployment resource", "currentReplicas", *foo.Spec.Replicas, "desiredReplicas", *deployment.Spec.Replicas)
 | 
				
			||||||
		deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{})
 | 
							deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{FieldManager: FieldManager})
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// If an error occurs during Update, we'll requeue the item so we can
 | 
						// If an error occurs during Update, we'll requeue the item so we can
 | 
				
			||||||
@@ -327,7 +321,7 @@ func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1
 | 
				
			|||||||
	// we must use Update instead of UpdateStatus to update the Status block of the Foo resource.
 | 
						// we must use Update instead of UpdateStatus to update the Status block of the Foo resource.
 | 
				
			||||||
	// UpdateStatus will not allow changes to the Spec of the resource,
 | 
						// UpdateStatus will not allow changes to the Spec of the resource,
 | 
				
			||||||
	// which is ideal for ensuring nothing other than resource status has been updated.
 | 
						// which is ideal for ensuring nothing other than resource status has been updated.
 | 
				
			||||||
	_, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).UpdateStatus(context.TODO(), fooCopy, metav1.UpdateOptions{})
 | 
						_, err := c.sampleclientset.SamplecontrollerV1alpha1().Foos(foo.Namespace).UpdateStatus(context.TODO(), fooCopy, metav1.UpdateOptions{FieldManager: FieldManager})
 | 
				
			||||||
	return err
 | 
						return err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -335,13 +329,12 @@ func (c *Controller) updateFooStatus(foo *samplev1alpha1.Foo, deployment *appsv1
 | 
				
			|||||||
// string which is then put onto the work queue. This method should *not* be
 | 
					// string which is then put onto the work queue. This method should *not* be
 | 
				
			||||||
// passed resources of any type other than Foo.
 | 
					// passed resources of any type other than Foo.
 | 
				
			||||||
func (c *Controller) enqueueFoo(obj interface{}) {
 | 
					func (c *Controller) enqueueFoo(obj interface{}) {
 | 
				
			||||||
	var key string
 | 
						if objectRef, err := cache.ObjectToName(obj); err != nil {
 | 
				
			||||||
	var err error
 | 
					 | 
				
			||||||
	if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
 | 
					 | 
				
			||||||
		utilruntime.HandleError(err)
 | 
							utilruntime.HandleError(err)
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							c.workqueue.Add(objectRef)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	c.workqueue.Add(key)
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// handleObject will take any resource implementing metav1.Object and attempt
 | 
					// handleObject will take any resource implementing metav1.Object and attempt
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -108,22 +108,22 @@ func (f *fixture) newController(ctx context.Context) (*Controller, informers.Sha
 | 
				
			|||||||
	return c, i, k8sI
 | 
						return c, i, k8sI
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (f *fixture) run(ctx context.Context, fooName string) {
 | 
					func (f *fixture) run(ctx context.Context, fooRef cache.ObjectName) {
 | 
				
			||||||
	f.runController(ctx, fooName, true, false)
 | 
						f.runController(ctx, fooRef, true, false)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (f *fixture) runExpectError(ctx context.Context, fooName string) {
 | 
					func (f *fixture) runExpectError(ctx context.Context, fooRef cache.ObjectName) {
 | 
				
			||||||
	f.runController(ctx, fooName, true, true)
 | 
						f.runController(ctx, fooRef, true, true)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (f *fixture) runController(ctx context.Context, fooName string, startInformers bool, expectError bool) {
 | 
					func (f *fixture) runController(ctx context.Context, fooRef cache.ObjectName, startInformers bool, expectError bool) {
 | 
				
			||||||
	c, i, k8sI := f.newController(ctx)
 | 
						c, i, k8sI := f.newController(ctx)
 | 
				
			||||||
	if startInformers {
 | 
						if startInformers {
 | 
				
			||||||
		i.Start(ctx.Done())
 | 
							i.Start(ctx.Done())
 | 
				
			||||||
		k8sI.Start(ctx.Done())
 | 
							k8sI.Start(ctx.Done())
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	err := c.syncHandler(ctx, fooName)
 | 
						err := c.syncHandler(ctx, fooRef)
 | 
				
			||||||
	if !expectError && err != nil {
 | 
						if !expectError && err != nil {
 | 
				
			||||||
		f.t.Errorf("error syncing foo: %v", err)
 | 
							f.t.Errorf("error syncing foo: %v", err)
 | 
				
			||||||
	} else if expectError && err == nil {
 | 
						} else if expectError && err == nil {
 | 
				
			||||||
@@ -240,13 +240,9 @@ func (f *fixture) expectUpdateFooStatusAction(foo *samplecontroller.Foo) {
 | 
				
			|||||||
	f.actions = append(f.actions, action)
 | 
						f.actions = append(f.actions, action)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func getKey(foo *samplecontroller.Foo, t *testing.T) string {
 | 
					func getRef(foo *samplecontroller.Foo, t *testing.T) cache.ObjectName {
 | 
				
			||||||
	key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(foo)
 | 
						ref := cache.MetaObjectToName(foo)
 | 
				
			||||||
	if err != nil {
 | 
						return ref
 | 
				
			||||||
		t.Errorf("Unexpected error getting key for foo %v: %v", foo.Name, err)
 | 
					 | 
				
			||||||
		return ""
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return key
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestCreatesDeployment(t *testing.T) {
 | 
					func TestCreatesDeployment(t *testing.T) {
 | 
				
			||||||
@@ -261,7 +257,7 @@ func TestCreatesDeployment(t *testing.T) {
 | 
				
			|||||||
	f.expectCreateDeploymentAction(expDeployment)
 | 
						f.expectCreateDeploymentAction(expDeployment)
 | 
				
			||||||
	f.expectUpdateFooStatusAction(foo)
 | 
						f.expectUpdateFooStatusAction(foo)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	f.run(ctx, getKey(foo, t))
 | 
						f.run(ctx, getRef(foo, t))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestDoNothing(t *testing.T) {
 | 
					func TestDoNothing(t *testing.T) {
 | 
				
			||||||
@@ -277,7 +273,7 @@ func TestDoNothing(t *testing.T) {
 | 
				
			|||||||
	f.kubeobjects = append(f.kubeobjects, d)
 | 
						f.kubeobjects = append(f.kubeobjects, d)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	f.expectUpdateFooStatusAction(foo)
 | 
						f.expectUpdateFooStatusAction(foo)
 | 
				
			||||||
	f.run(ctx, getKey(foo, t))
 | 
						f.run(ctx, getRef(foo, t))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestUpdateDeployment(t *testing.T) {
 | 
					func TestUpdateDeployment(t *testing.T) {
 | 
				
			||||||
@@ -298,7 +294,7 @@ func TestUpdateDeployment(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	f.expectUpdateFooStatusAction(foo)
 | 
						f.expectUpdateFooStatusAction(foo)
 | 
				
			||||||
	f.expectUpdateDeploymentAction(expDeployment)
 | 
						f.expectUpdateDeploymentAction(expDeployment)
 | 
				
			||||||
	f.run(ctx, getKey(foo, t))
 | 
						f.run(ctx, getRef(foo, t))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func TestNotControlledByUs(t *testing.T) {
 | 
					func TestNotControlledByUs(t *testing.T) {
 | 
				
			||||||
@@ -315,7 +311,7 @@ func TestNotControlledByUs(t *testing.T) {
 | 
				
			|||||||
	f.deploymentLister = append(f.deploymentLister, d)
 | 
						f.deploymentLister = append(f.deploymentLister, d)
 | 
				
			||||||
	f.kubeobjects = append(f.kubeobjects, d)
 | 
						f.kubeobjects = append(f.kubeobjects, d)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	f.runExpectError(ctx, getKey(foo, t))
 | 
						f.runExpectError(ctx, getRef(foo, t))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func int32Ptr(i int32) *int32 { return &i }
 | 
					func int32Ptr(i int32) *int32 { return &i }
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user