Merge pull request #50685 from sttts/sttts-deepcopy-calls-federation
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. federation: simplify deepcopy calls We have static DeepCopy now without the possibility of errors. Makes the code much simpler.
This commit is contained in:
		@@ -65,7 +65,7 @@ func (a *DaemonSetAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object {
 | 
			
		||||
	daemonset := obj.(*extensionsv1.DaemonSet)
 | 
			
		||||
	return &extensionsv1.DaemonSet{
 | 
			
		||||
		ObjectMeta: util.DeepCopyRelevantObjectMeta(daemonset.ObjectMeta),
 | 
			
		||||
		Spec:       *(util.DeepCopyApiTypeOrPanic(&daemonset.Spec).(*extensionsv1.DaemonSetSpec)),
 | 
			
		||||
		Spec:       *daemonset.Spec.DeepCopy(),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -92,7 +92,7 @@ func (a *HpaAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object {
 | 
			
		||||
	hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
 | 
			
		||||
	return &autoscalingv1.HorizontalPodAutoscaler{
 | 
			
		||||
		ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(hpa.ObjectMeta),
 | 
			
		||||
		Spec:       *fedutil.DeepCopyApiTypeOrPanic(&hpa.Spec).(*autoscalingv1.HorizontalPodAutoscalerSpec),
 | 
			
		||||
		Spec:       *hpa.Spec.DeepCopy(),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -80,7 +80,7 @@ func (a *NamespaceAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object {
 | 
			
		||||
	namespace := obj.(*apiv1.Namespace)
 | 
			
		||||
	return &apiv1.Namespace{
 | 
			
		||||
		ObjectMeta: util.DeepCopyRelevantObjectMeta(namespace.ObjectMeta),
 | 
			
		||||
		Spec:       *(util.DeepCopyApiTypeOrPanic(&namespace.Spec).(*apiv1.NamespaceSpec)),
 | 
			
		||||
		Spec:       *namespace.Spec.DeepCopy(),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -84,7 +84,7 @@ func (a *ReplicaSetAdapter) Copy(obj pkgruntime.Object) pkgruntime.Object {
 | 
			
		||||
	rs := obj.(*extensionsv1.ReplicaSet)
 | 
			
		||||
	return &extensionsv1.ReplicaSet{
 | 
			
		||||
		ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(rs.ObjectMeta),
 | 
			
		||||
		Spec:       *fedutil.DeepCopyApiTypeOrPanic(&rs.Spec).(*extensionsv1.ReplicaSetSpec),
 | 
			
		||||
		Spec:       *rs.Spec.DeepCopy(),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -604,13 +604,7 @@ func (ic *IngressController) getMasterCluster() (master *federationapi.Cluster,
 | 
			
		||||
*/
 | 
			
		||||
func (ic *IngressController) updateClusterIngressUIDToMasters(cluster *federationapi.Cluster, fallbackUID string) (string, error) {
 | 
			
		||||
	masterCluster, masterUID, err := ic.getMasterCluster()
 | 
			
		||||
	clusterObj, clusterErr := api.Scheme.DeepCopy(cluster) // Make a clone so that we don't clobber our input param
 | 
			
		||||
	cluster, ok := clusterObj.(*federationapi.Cluster)
 | 
			
		||||
	if clusterErr != nil || !ok {
 | 
			
		||||
		glog.Errorf("Internal error: Failed clone cluster resource while attempting to add master ingress UID annotation (%q = %q) from master cluster %q to cluster %q, will try again later: %v", uidAnnotationKey, masterUID, masterCluster.Name, cluster.Name, clusterErr)
 | 
			
		||||
		return "", clusterErr
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	cluster = cluster.DeepCopy() // Make a clone so that we don't clobber our input param
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		if masterCluster.Name != cluster.Name { // We're not the master, need to get in sync
 | 
			
		||||
			if cluster.ObjectMeta.Annotations == nil {
 | 
			
		||||
@@ -694,13 +688,8 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
 | 
			
		||||
		glog.V(4).Infof("Ingress %q is not federated.  Ignoring.", ingress)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	baseIngressObj, err := api.Scheme.DeepCopy(baseIngressObjFromStore)
 | 
			
		||||
	baseIngress, ok := baseIngressObj.(*extensionsv1beta1.Ingress)
 | 
			
		||||
	if err != nil || !ok {
 | 
			
		||||
		glog.Errorf("Internal Error %v : Object retrieved from ingressInformerStore with key %q is not of correct type *extensionsv1beta1.Ingress: %v", err, key, baseIngressObj)
 | 
			
		||||
	} else {
 | 
			
		||||
	baseIngress := baseIngressObjFromStore.(*extensionsv1beta1.Ingress).DeepCopy()
 | 
			
		||||
	glog.V(4).Infof("Base (federated) ingress: %v", baseIngress)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if baseIngress.DeletionTimestamp != nil {
 | 
			
		||||
		if err := ic.delete(baseIngress); err != nil {
 | 
			
		||||
@@ -747,24 +736,9 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		desiredIngress := &extensionsv1beta1.Ingress{}
 | 
			
		||||
		objMeta, err := api.Scheme.DeepCopy(&baseIngress.ObjectMeta)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			glog.Errorf("Error deep copying ObjectMeta: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		objSpec, err := api.Scheme.DeepCopy(&baseIngress.Spec)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			glog.Errorf("Error deep copying Spec: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		objMetaCopy, ok := objMeta.(*metav1.ObjectMeta)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			glog.Errorf("Internal error: Failed to cast to *metav1.ObjectMeta: %v", objMeta)
 | 
			
		||||
		}
 | 
			
		||||
		desiredIngress.ObjectMeta = *objMetaCopy
 | 
			
		||||
		objSpecCopy, ok := objSpec.(*extensionsv1beta1.IngressSpec)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			glog.Errorf("Internal error: Failed to cast to extensionsv1beta1.Ingressespec: %v", objSpec)
 | 
			
		||||
		}
 | 
			
		||||
		desiredIngress.Spec = *objSpecCopy
 | 
			
		||||
		desiredIngress.ObjectMeta = *baseIngress.ObjectMeta.DeepCopy()
 | 
			
		||||
		desiredIngress.Spec = *desiredIngress.Spec.DeepCopy()
 | 
			
		||||
 | 
			
		||||
		glog.V(4).Infof("Desired Ingress: %v", desiredIngress)
 | 
			
		||||
 | 
			
		||||
		send, err := clusterselector.SendToCluster(cluster.Labels, desiredIngress.ObjectMeta.Annotations)
 | 
			
		||||
@@ -830,14 +804,7 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				if !baseLBStatusExists && clusterLBStatusExists {
 | 
			
		||||
					lbstatusObj, lbErr := api.Scheme.DeepCopy(&clusterIngress.Status.LoadBalancer)
 | 
			
		||||
					lbstatus, ok := lbstatusObj.(*v1.LoadBalancerStatus)
 | 
			
		||||
					if lbErr != nil || !ok {
 | 
			
		||||
						glog.Errorf("Internal error: Failed to clone LoadBalancerStatus of %q in cluster %q while attempting to update master loadbalancer ingress status, will try again later. error: %v, Object to be cloned: %v", ingress, cluster.Name, lbErr, lbstatusObj)
 | 
			
		||||
						ic.deliverIngress(ingress, ic.ingressReviewDelay, true)
 | 
			
		||||
						return
 | 
			
		||||
					}
 | 
			
		||||
					baseIngress.Status.LoadBalancer = *lbstatus
 | 
			
		||||
					baseIngress.Status.LoadBalancer = *clusterIngress.Status.LoadBalancer.DeepCopy()
 | 
			
		||||
					glog.V(4).Infof("Attempting to update base federated ingress status: %v", baseIngress)
 | 
			
		||||
					if updatedFedIngress, err := ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).UpdateStatus(baseIngress); err != nil {
 | 
			
		||||
						glog.Errorf("Failed to update federated ingress status of %q (loadbalancer status), will try again later: %v", ingress, err)
 | 
			
		||||
@@ -857,17 +824,7 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
 | 
			
		||||
				glog.V(4).Infof("Ingress %q in cluster %q does not need an update: cluster ingress is equivalent to federated ingress", ingress, cluster.Name)
 | 
			
		||||
			} else {
 | 
			
		||||
				glog.V(4).Infof("Ingress %s in cluster %s needs an update: cluster ingress %v is not equivalent to federated ingress %v", ingress, cluster.Name, clusterIngress, desiredIngress)
 | 
			
		||||
				objMeta, err := api.Scheme.DeepCopy(&clusterIngress.ObjectMeta)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					glog.Errorf("Error deep copying ObjectMeta: %v", err)
 | 
			
		||||
					ic.deliverIngress(ingress, ic.ingressReviewDelay, true)
 | 
			
		||||
				}
 | 
			
		||||
				objMetaCopy, ok := objMeta.(*metav1.ObjectMeta)
 | 
			
		||||
				if !ok {
 | 
			
		||||
					glog.Errorf("Internal error: Failed to cast to metav1.ObjectMeta: %v", objMeta)
 | 
			
		||||
					ic.deliverIngress(ingress, ic.ingressReviewDelay, true)
 | 
			
		||||
				}
 | 
			
		||||
				desiredIngress.ObjectMeta = *objMetaCopy
 | 
			
		||||
				clusterIngress.ObjectMeta.DeepCopyInto(&desiredIngress.ObjectMeta)
 | 
			
		||||
				// Merge any annotations and labels on the federated ingress onto the underlying cluster ingress,
 | 
			
		||||
				// overwriting duplicates.
 | 
			
		||||
				if desiredIngress.ObjectMeta.Annotations == nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -434,7 +434,7 @@ func (fjc *FederationJobController) reconcileJob(key string) (reconciliationStat
 | 
			
		||||
		}
 | 
			
		||||
		ljob := &batchv1.Job{
 | 
			
		||||
			ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(fjob.ObjectMeta),
 | 
			
		||||
			Spec:       *fedutil.DeepCopyApiTypeOrPanic(&fjob.Spec).(*batchv1.JobSpec),
 | 
			
		||||
			Spec:       *fjob.Spec.DeepCopy(),
 | 
			
		||||
		}
 | 
			
		||||
		// use selector generated at federation level, or user specified value
 | 
			
		||||
		manualSelector := true
 | 
			
		||||
 
 | 
			
		||||
@@ -424,16 +424,7 @@ func (s *ServiceController) reconcileService(key string) reconciliationStatus {
 | 
			
		||||
	glog.V(3).Infof("Reconciling federated service: %s", key)
 | 
			
		||||
 | 
			
		||||
	// Create a copy before modifying the service to prevent race condition with other readers of service from store
 | 
			
		||||
	fedServiceObj, err := api.Scheme.DeepCopy(service)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		runtime.HandleError(fmt.Errorf("Error in copying obj: %s, %v", key, err))
 | 
			
		||||
		return statusNonRecoverableError
 | 
			
		||||
	}
 | 
			
		||||
	fedService, ok := fedServiceObj.(*v1.Service)
 | 
			
		||||
	if err != nil || !ok {
 | 
			
		||||
		runtime.HandleError(fmt.Errorf("Unknown obj received from store: %#v, %v", fedServiceObj, err))
 | 
			
		||||
		return statusNonRecoverableError
 | 
			
		||||
	}
 | 
			
		||||
	fedService := service.DeepCopy()
 | 
			
		||||
 | 
			
		||||
	// Handle deletion of federated service
 | 
			
		||||
	if fedService.DeletionTimestamp != nil {
 | 
			
		||||
@@ -546,7 +537,7 @@ func getOperationsToPerformOnCluster(informer fedutil.FederatedInformer, cluster
 | 
			
		||||
 | 
			
		||||
	desiredService := &v1.Service{
 | 
			
		||||
		ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(fedService.ObjectMeta),
 | 
			
		||||
		Spec:       *(fedutil.DeepCopyApiTypeOrPanic(&fedService.Spec).(*v1.ServiceSpec)),
 | 
			
		||||
		Spec:       *fedService.Spec.DeepCopy(),
 | 
			
		||||
	}
 | 
			
		||||
	switch {
 | 
			
		||||
	case found && send:
 | 
			
		||||
 
 | 
			
		||||
@@ -397,20 +397,7 @@ func (s *FederationSyncController) objFromCache(kind, key string) (pkgruntime.Ob
 | 
			
		||||
	if !exist {
 | 
			
		||||
		return nil, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Create a copy before modifying the resource to prevent racing with other readers.
 | 
			
		||||
	copiedObj, err := api.Scheme.DeepCopy(cachedObj)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		wrappedErr := fmt.Errorf("Error in retrieving %s %q from store: %v", kind, key, err)
 | 
			
		||||
		runtime.HandleError(wrappedErr)
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	if !s.adapter.IsExpectedType(copiedObj) {
 | 
			
		||||
		err = fmt.Errorf("Object is not the expected type: %v", copiedObj)
 | 
			
		||||
		runtime.HandleError(err)
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return copiedObj.(pkgruntime.Object), nil
 | 
			
		||||
	return cachedObj.(pkgruntime.Object).DeepCopyObject(), nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// delete deletes the given resource or returns error if the deletion was not complete.
 | 
			
		||||
 
 | 
			
		||||
@@ -70,6 +70,6 @@ func DeepCopyDeploymentObjectMeta(meta metav1.ObjectMeta) metav1.ObjectMeta {
 | 
			
		||||
func DeepCopyDeployment(a *extensions_v1.Deployment) *extensions_v1.Deployment {
 | 
			
		||||
	return &extensions_v1.Deployment{
 | 
			
		||||
		ObjectMeta: DeepCopyDeploymentObjectMeta(a.ObjectMeta),
 | 
			
		||||
		Spec:       *(DeepCopyApiTypeOrPanic(&a.Spec).(*extensions_v1.DeploymentSpec)),
 | 
			
		||||
		Spec:       *a.Spec.DeepCopy(),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -21,7 +21,6 @@ import (
 | 
			
		||||
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Copies cluster-independent, user provided data from the given ObjectMeta struct. If in
 | 
			
		||||
@@ -84,11 +83,3 @@ func ObjectMetaAndSpecEquivalent(a, b runtime.Object) bool {
 | 
			
		||||
	specB := reflect.ValueOf(b).Elem().FieldByName("Spec").Interface()
 | 
			
		||||
	return ObjectMetaEquivalent(objectMetaA, objectMetaB) && reflect.DeepEqual(specA, specB)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func DeepCopyApiTypeOrPanic(item interface{}) interface{} {
 | 
			
		||||
	result, err := api.Scheme.DeepCopy(item)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return result
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -12,7 +12,6 @@ go_library(
 | 
			
		||||
        "//federation/apis/federation/v1beta1:go_default_library",
 | 
			
		||||
        "//federation/pkg/federation-controller/util:go_default_library",
 | 
			
		||||
        "//federation/pkg/federation-controller/util/finalizers:go_default_library",
 | 
			
		||||
        "//pkg/api:go_default_library",
 | 
			
		||||
        "//vendor/github.com/golang/glog:go_default_library",
 | 
			
		||||
        "//vendor/github.com/stretchr/testify/assert:go_default_library",
 | 
			
		||||
        "//vendor/github.com/stretchr/testify/require:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -34,7 +34,6 @@ import (
 | 
			
		||||
	federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
 | 
			
		||||
	"k8s.io/kubernetes/federation/pkg/federation-controller/util"
 | 
			
		||||
	finalizersutil "k8s.io/kubernetes/federation/pkg/federation-controller/util/finalizers"
 | 
			
		||||
	"k8s.io/kubernetes/pkg/api"
 | 
			
		||||
 | 
			
		||||
	"github.com/golang/glog"
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
@@ -73,22 +72,14 @@ func (wd *WatcherDispatcher) Stop() {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func copy(obj runtime.Object) runtime.Object {
 | 
			
		||||
	objCopy, err := api.Scheme.DeepCopy(obj)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	return objCopy.(runtime.Object)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Add sends an add event.
 | 
			
		||||
func (wd *WatcherDispatcher) Add(obj runtime.Object) {
 | 
			
		||||
	wd.Lock()
 | 
			
		||||
	defer wd.Unlock()
 | 
			
		||||
	wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Added, Object: copy(obj)})
 | 
			
		||||
	wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Added, Object: obj.DeepCopyObject()})
 | 
			
		||||
	for _, watcher := range wd.watchers {
 | 
			
		||||
		if !watcher.IsStopped() {
 | 
			
		||||
			watcher.Add(copy(obj))
 | 
			
		||||
			watcher.Add(obj.DeepCopyObject())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -98,11 +89,11 @@ func (wd *WatcherDispatcher) Modify(obj runtime.Object) {
 | 
			
		||||
	wd.Lock()
 | 
			
		||||
	defer wd.Unlock()
 | 
			
		||||
	glog.V(4).Infof("->WatcherDispatcher.Modify(%v)", obj)
 | 
			
		||||
	wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Modified, Object: copy(obj)})
 | 
			
		||||
	wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Modified, Object: obj.DeepCopyObject()})
 | 
			
		||||
	for i, watcher := range wd.watchers {
 | 
			
		||||
		if !watcher.IsStopped() {
 | 
			
		||||
			glog.V(4).Infof("->Watcher(%d).Modify(%v)", i, obj)
 | 
			
		||||
			watcher.Modify(copy(obj))
 | 
			
		||||
			watcher.Modify(obj.DeepCopyObject())
 | 
			
		||||
		} else {
 | 
			
		||||
			glog.V(4).Infof("->Watcher(%d) is stopped.  Not calling Modify(%v)", i, obj)
 | 
			
		||||
		}
 | 
			
		||||
@@ -113,10 +104,10 @@ func (wd *WatcherDispatcher) Modify(obj runtime.Object) {
 | 
			
		||||
func (wd *WatcherDispatcher) Delete(lastValue runtime.Object) {
 | 
			
		||||
	wd.Lock()
 | 
			
		||||
	defer wd.Unlock()
 | 
			
		||||
	wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Deleted, Object: copy(lastValue)})
 | 
			
		||||
	wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Deleted, Object: lastValue.DeepCopyObject()})
 | 
			
		||||
	for _, watcher := range wd.watchers {
 | 
			
		||||
		if !watcher.IsStopped() {
 | 
			
		||||
			watcher.Delete(copy(lastValue))
 | 
			
		||||
			watcher.Delete(lastValue.DeepCopyObject())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -125,10 +116,10 @@ func (wd *WatcherDispatcher) Delete(lastValue runtime.Object) {
 | 
			
		||||
func (wd *WatcherDispatcher) Error(errValue runtime.Object) {
 | 
			
		||||
	wd.Lock()
 | 
			
		||||
	defer wd.Unlock()
 | 
			
		||||
	wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Error, Object: copy(errValue)})
 | 
			
		||||
	wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Error, Object: errValue.DeepCopyObject()})
 | 
			
		||||
	for _, watcher := range wd.watchers {
 | 
			
		||||
		if !watcher.IsStopped() {
 | 
			
		||||
			watcher.Error(copy(errValue))
 | 
			
		||||
			watcher.Error(errValue.DeepCopyObject())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -137,10 +128,10 @@ func (wd *WatcherDispatcher) Error(errValue runtime.Object) {
 | 
			
		||||
func (wd *WatcherDispatcher) Action(action watch.EventType, obj runtime.Object) {
 | 
			
		||||
	wd.Lock()
 | 
			
		||||
	defer wd.Unlock()
 | 
			
		||||
	wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: action, Object: copy(obj)})
 | 
			
		||||
	wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: action, Object: obj.DeepCopyObject()})
 | 
			
		||||
	for _, watcher := range wd.watchers {
 | 
			
		||||
		if !watcher.IsStopped() {
 | 
			
		||||
			watcher.Action(action, copy(obj))
 | 
			
		||||
			watcher.Action(action, obj.DeepCopyObject())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
@@ -204,7 +195,7 @@ func RegisterFakeOnCreate(resource string, client *core.Fake, watcher *WatcherDi
 | 
			
		||||
		createAction := action.(core.CreateAction)
 | 
			
		||||
		originalObj := createAction.GetObject()
 | 
			
		||||
		// Create a copy of the object here to prevent data races while reading the object in go routine.
 | 
			
		||||
		obj := copy(originalObj)
 | 
			
		||||
		obj := originalObj.DeepCopyObject()
 | 
			
		||||
		watcher.orderExecution <- func() {
 | 
			
		||||
			glog.V(4).Infof("Object created: %v", obj)
 | 
			
		||||
			watcher.Add(obj)
 | 
			
		||||
@@ -222,7 +213,7 @@ func RegisterFakeCopyOnCreate(resource string, client *core.Fake, watcher *Watch
 | 
			
		||||
		createAction := action.(core.CreateAction)
 | 
			
		||||
		originalObj := createAction.GetObject()
 | 
			
		||||
		// Create a copy of the object here to prevent data races while reading the object in go routine.
 | 
			
		||||
		obj := copy(originalObj)
 | 
			
		||||
		obj := originalObj.DeepCopyObject()
 | 
			
		||||
		watcher.orderExecution <- func() {
 | 
			
		||||
			glog.V(4).Infof("Object created. Writing to channel: %v", obj)
 | 
			
		||||
			watcher.Add(obj)
 | 
			
		||||
@@ -242,7 +233,7 @@ func RegisterFakeOnUpdate(resource string, client *core.Fake, watcher *WatcherDi
 | 
			
		||||
		glog.V(7).Infof("Updating %s: %v", resource, updateAction.GetObject())
 | 
			
		||||
 | 
			
		||||
		// Create a copy of the object here to prevent data races while reading the object in go routine.
 | 
			
		||||
		obj := copy(originalObj)
 | 
			
		||||
		obj := originalObj.DeepCopyObject()
 | 
			
		||||
		operation := func() {
 | 
			
		||||
			glog.V(4).Infof("Object updated %v", obj)
 | 
			
		||||
			watcher.Modify(obj)
 | 
			
		||||
@@ -270,7 +261,7 @@ func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *Watch
 | 
			
		||||
		glog.V(7).Infof("Updating %s: %v", resource, updateAction.GetObject())
 | 
			
		||||
 | 
			
		||||
		// Create a copy of the object here to prevent data races while reading the object in go routine.
 | 
			
		||||
		obj := copy(originalObj)
 | 
			
		||||
		obj := originalObj.DeepCopyObject()
 | 
			
		||||
		operation := func() {
 | 
			
		||||
			glog.V(4).Infof("Object updated. Writing to channel: %v", obj)
 | 
			
		||||
			watcher.Modify(obj)
 | 
			
		||||
 
 | 
			
		||||
@@ -49,7 +49,6 @@ go_library(
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/kubernetes:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
 | 
			
		||||
        "//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
 
 | 
			
		||||
@@ -28,7 +28,6 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/api/errors"
 | 
			
		||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	"k8s.io/client-go/kubernetes/scheme"
 | 
			
		||||
	fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
 | 
			
		||||
	"k8s.io/kubernetes/test/e2e/framework"
 | 
			
		||||
	fedframework "k8s.io/kubernetes/test/e2e_federation/framework"
 | 
			
		||||
@@ -184,33 +183,7 @@ var _ = framework.KubeDescribe("Federated Services [Feature:Federation]", func()
 | 
			
		||||
				backendPods = createBackendPodsOrFail(clusters, nsName, FederatedServicePodName)
 | 
			
		||||
 | 
			
		||||
				service = createLBServiceOrFail(f.FederationClientset, nsName, FederatedServiceName, clusters)
 | 
			
		||||
				obj, err := scheme.Scheme.DeepCopy(service)
 | 
			
		||||
				// Cloning shouldn't fail. On the off-chance it does, we
 | 
			
		||||
				// should shallow copy service to serviceShard before
 | 
			
		||||
				// failing. If we don't do this we will never really
 | 
			
		||||
				// get a chance to clean up the underlying services
 | 
			
		||||
				// when the cloner fails for reasons not in our
 | 
			
		||||
				// control. For example, cloner bug. That will cause
 | 
			
		||||
				// the resources to leak, which in turn causes the
 | 
			
		||||
				// test project to run out of quota and the entire
 | 
			
		||||
				// suite starts failing. So we must try as hard as
 | 
			
		||||
				// possible to cleanup the underlying services. So
 | 
			
		||||
				// if DeepCopy fails, we are going to try with shallow
 | 
			
		||||
				// copy as a last resort.
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					serviceCopy := *service
 | 
			
		||||
					serviceShard = &serviceCopy
 | 
			
		||||
					framework.ExpectNoError(err, fmt.Sprintf("Error in deep copying service %q", service.Name))
 | 
			
		||||
				}
 | 
			
		||||
				var ok bool
 | 
			
		||||
				serviceShard, ok = obj.(*v1.Service)
 | 
			
		||||
				// Same argument as above about using shallow copy
 | 
			
		||||
				// as a last resort.
 | 
			
		||||
				if !ok {
 | 
			
		||||
					serviceCopy := *service
 | 
			
		||||
					serviceShard = &serviceCopy
 | 
			
		||||
					framework.ExpectNoError(err, fmt.Sprintf("Unexpected service object copied %T", obj))
 | 
			
		||||
				}
 | 
			
		||||
				serviceShard := service.DeepCopy()
 | 
			
		||||
 | 
			
		||||
				waitForServiceShardsOrFail(nsName, serviceShard, clusters)
 | 
			
		||||
			})
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user