Merge pull request #31705 from mwielgus/secrets-events
Automatic merge from submit-queue Events for federated secrets controller cc: @quinton-hoole @nikhiljindal @kubernetes/sig-cluster-federation
This commit is contained in:
		| @@ -24,10 +24,12 @@ import ( | ||||
| 	federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" | ||||
| 	federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" | ||||
| 	"k8s.io/kubernetes/federation/pkg/federation-controller/util" | ||||
| 	"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" | ||||
| 	"k8s.io/kubernetes/pkg/api" | ||||
| 	api_v1 "k8s.io/kubernetes/pkg/api/v1" | ||||
| 	"k8s.io/kubernetes/pkg/client/cache" | ||||
| 	kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4" | ||||
| 	"k8s.io/kubernetes/pkg/client/record" | ||||
| 	"k8s.io/kubernetes/pkg/controller" | ||||
| 	"k8s.io/kubernetes/pkg/controller/framework" | ||||
| 	pkg_runtime "k8s.io/kubernetes/pkg/runtime" | ||||
| @@ -66,6 +68,9 @@ type SecretController struct { | ||||
| 	// Backoff manager for secrets | ||||
| 	secretBackoff *flowcontrol.Backoff | ||||
|  | ||||
| 	// For events | ||||
| 	eventRecorder record.EventRecorder | ||||
|  | ||||
| 	secretReviewDelay     time.Duration | ||||
| 	clusterAvailableDelay time.Duration | ||||
| 	smallDelay            time.Duration | ||||
| @@ -74,6 +79,10 @@ type SecretController struct { | ||||
|  | ||||
| // NewSecretController returns a new secret controller | ||||
| func NewSecretController(client federation_release_1_4.Interface) *SecretController { | ||||
| 	broadcaster := record.NewBroadcaster() | ||||
| 	broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client)) | ||||
| 	recorder := broadcaster.NewRecorder(api.EventSource{Component: "federated-secrets-controller"}) | ||||
|  | ||||
| 	secretcontroller := &SecretController{ | ||||
| 		federatedApiClient:    client, | ||||
| 		secretReviewDelay:     time.Second * 10, | ||||
| @@ -81,6 +90,7 @@ func NewSecretController(client federation_release_1_4.Interface) *SecretControl | ||||
| 		smallDelay:            time.Second * 3, | ||||
| 		updateTimeout:         time.Second * 30, | ||||
| 		secretBackoff:         flowcontrol.NewBackOff(5*time.Second, time.Minute), | ||||
| 		eventRecorder:         recorder, | ||||
| 	} | ||||
|  | ||||
| 	// Build delivereres for triggering reconcilations. | ||||
| @@ -278,6 +288,9 @@ func (secretcontroller *SecretController) reconcileSecret(namespace string, secr | ||||
| 		} | ||||
|  | ||||
| 		if !found { | ||||
| 			secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "CreateInCluster", | ||||
| 				"Creating secret in cluster %s", cluster.Name) | ||||
|  | ||||
| 			operations = append(operations, util.FederatedOperation{ | ||||
| 				Type:        util.OperationTypeAdd, | ||||
| 				Obj:         desiredSecret, | ||||
| @@ -290,6 +303,9 @@ func (secretcontroller *SecretController) reconcileSecret(namespace string, secr | ||||
| 			if !util.ObjectMetaEquivalent(desiredSecret.ObjectMeta, clusterSecret.ObjectMeta) || | ||||
| 				!reflect.DeepEqual(desiredSecret.Data, clusterSecret.Data) || | ||||
| 				!reflect.DeepEqual(desiredSecret.Type, clusterSecret.Type) { | ||||
|  | ||||
| 				secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "UpdateInCluster", | ||||
| 					"Updating secret in cluster %s", cluster.Name) | ||||
| 				operations = append(operations, util.FederatedOperation{ | ||||
| 					Type:        util.OperationTypeUpdate, | ||||
| 					Obj:         desiredSecret, | ||||
| @@ -303,7 +319,12 @@ func (secretcontroller *SecretController) reconcileSecret(namespace string, secr | ||||
| 		// Everything is in order | ||||
| 		return | ||||
| 	} | ||||
| 	err = secretcontroller.federatedUpdater.Update(operations, secretcontroller.updateTimeout) | ||||
| 	err = secretcontroller.federatedUpdater.UpdateWithOnError(operations, secretcontroller.updateTimeout, | ||||
| 		func(op util.FederatedOperation, operror error) { | ||||
| 			secretcontroller.eventRecorder.Eventf(baseSecret, api.EventTypeNormal, "FailedUpdateInCluster", | ||||
| 				"Update secret in cluster %s failed: %v", op.ClusterName, operror) | ||||
| 		}) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		glog.Errorf("Failed to execute updates for %s: %v", key, err) | ||||
| 		secretcontroller.deliverSecret(namespace, secretName, 0, true) | ||||
|   | ||||
| @@ -78,7 +78,8 @@ func TestSecretController(t *testing.T) { | ||||
| 	secret1 := api_v1.Secret{ | ||||
| 		ObjectMeta: api_v1.ObjectMeta{ | ||||
| 			Name:      "test-secret", | ||||
| 			Namespace: "mynamespace", | ||||
| 			Namespace: "ns", | ||||
| 			SelfLink:  "/api/v1/namespaces/ns/secrets/test-secret", | ||||
| 		}, | ||||
| 		Data: map[string][]byte{ | ||||
| 			"A": []byte("ala ma kota"), | ||||
| @@ -93,7 +94,7 @@ func TestSecretController(t *testing.T) { | ||||
| 	assert.NotNil(t, createdSecret) | ||||
| 	assert.Equal(t, secret1.Namespace, createdSecret.Namespace) | ||||
| 	assert.Equal(t, secret1.Name, createdSecret.Name) | ||||
| 	assert.True(t, reflect.DeepEqual(&secret1, createdSecret)) | ||||
| 	assert.True(t, secretsEqual(secret1, *createdSecret)) | ||||
|  | ||||
| 	// Test update federated secret. | ||||
| 	secret1.Annotations = map[string]string{ | ||||
| @@ -104,7 +105,7 @@ func TestSecretController(t *testing.T) { | ||||
| 	assert.NotNil(t, updatedSecret) | ||||
| 	assert.Equal(t, secret1.Name, updatedSecret.Name) | ||||
| 	assert.Equal(t, secret1.Namespace, updatedSecret.Namespace) | ||||
| 	assert.True(t, reflect.DeepEqual(&secret1, updatedSecret)) | ||||
| 	assert.True(t, secretsEqual(secret1, *updatedSecret)) | ||||
|  | ||||
| 	// Test update federated secret. | ||||
| 	secret1.Data = map[string][]byte{ | ||||
| @@ -115,7 +116,7 @@ func TestSecretController(t *testing.T) { | ||||
| 	assert.NotNil(t, updatedSecret) | ||||
| 	assert.Equal(t, secret1.Name, updatedSecret.Name) | ||||
| 	assert.Equal(t, secret1.Namespace, updatedSecret.Namespace) | ||||
| 	assert.True(t, reflect.DeepEqual(&secret1, updatedSecret2)) | ||||
| 	assert.True(t, secretsEqual(secret1, *updatedSecret2)) | ||||
|  | ||||
| 	// Test add cluster | ||||
| 	clusterWatch.Add(cluster2) | ||||
| @@ -123,11 +124,17 @@ func TestSecretController(t *testing.T) { | ||||
| 	assert.NotNil(t, createdSecret2) | ||||
| 	assert.Equal(t, secret1.Name, createdSecret2.Name) | ||||
| 	assert.Equal(t, secret1.Namespace, createdSecret2.Namespace) | ||||
| 	assert.True(t, reflect.DeepEqual(&secret1, createdSecret2)) | ||||
| 	assert.True(t, secretsEqual(secret1, *createdSecret2)) | ||||
|  | ||||
| 	close(stop) | ||||
| } | ||||
|  | ||||
| func secretsEqual(a, b api_v1.Secret) bool { | ||||
| 	a.SelfLink = "" | ||||
| 	b.SelfLink = "" | ||||
| 	return reflect.DeepEqual(a, b) | ||||
| } | ||||
|  | ||||
| func GetSecretFromChan(c chan runtime.Object) *api_v1.Secret { | ||||
| 	secret := GetObjectFromChan(c).(*api_v1.Secret) | ||||
| 	return secret | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Submit Queue
					Kubernetes Submit Queue