federation: use generated listers

This commit is contained in:
Andy Goldstein
2017-02-22 16:42:20 -05:00
parent b437787a2e
commit a3a2246f73
7 changed files with 90 additions and 77 deletions

View File

@@ -30,7 +30,7 @@ go_library(
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/controller:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",

View File

@@ -29,23 +29,23 @@ import (
v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1"
v1 "k8s.io/kubernetes/pkg/api/v1"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/legacylisters"
"reflect"
"github.com/golang/glog"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
)
type clusterCache struct {
clientset *kubeclientset.Clientset
cluster *v1beta1.Cluster
// A store of services, populated by the serviceController
serviceStore listers.StoreToServiceLister
serviceStore corelisters.ServiceLister
// Watches changes to all services
serviceController cache.Controller
// A store of endpoint, populated by the serviceController
endpointStore listers.StoreToEndpointsLister
endpointStore corelisters.EndpointsLister
// Watches changes to all endpoints
endpointController cache.Controller
// services that need to be synced
@@ -91,7 +91,8 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa
serviceQueue: workqueue.New(),
endpointQueue: workqueue.New(),
}
cachedClusterClient.endpointStore.Store, cachedClusterClient.endpointController = cache.NewInformer(
var endpointIndexer cache.Indexer
endpointIndexer, cachedClusterClient.endpointController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
return clientset.Core().Endpoints(metav1.NamespaceAll).List(options)
@@ -113,9 +114,12 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa
cc.enqueueEndpoint(obj, clusterName)
},
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
cachedClusterClient.endpointStore = corelisters.NewEndpointsLister(endpointIndexer)
cachedClusterClient.serviceStore.Indexer, cachedClusterClient.serviceController = cache.NewIndexerInformer(
var serviceIndexer cache.Indexer
serviceIndexer, cachedClusterClient.serviceController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
return clientset.Core().Services(metav1.NamespaceAll).List(options)
@@ -152,6 +156,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
cachedClusterClient.serviceStore = corelisters.NewServiceLister(serviceIndexer)
cc.clientMap[clusterName] = cachedClusterClient
go cachedClusterClient.serviceController.Run(wait.NeverStop)
go cachedClusterClient.endpointController.Run(wait.NeverStop)

View File

@@ -17,9 +17,9 @@ limitations under the License.
package service
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/errors"
cache "k8s.io/client-go/tools/cache"
fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
v1 "k8s.io/kubernetes/pkg/api/v1"
@@ -81,29 +81,25 @@ func (cc *clusterClientCache) syncEndpoint(key, clusterName string, clusterCache
// here we filtered all non-federation services
return nil
}
endpointInterface, exists, err := clusterCache.endpointStore.GetByKey(key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
clusterCache.endpointQueue.Add(key)
return err
}
if exists {
endpoint, ok := endpointInterface.(*v1.Endpoints)
if ok {
glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
err = cc.processEndpointUpdate(cachedService, endpoint, clusterName, serviceController)
} else {
_, ok := endpointInterface.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", endpointInterface)
}
glog.Infof("Found tombstone for %v", key)
err = cc.processEndpointDeletion(cachedService, clusterName, serviceController)
}
} else {
endpoint, err := clusterCache.endpointStore.Endpoints(namespace).Get(name)
switch {
case errors.IsNotFound(err):
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
glog.Infof("Can not get endpoint %v for cluster %s from endpointStore", key, clusterName)
err = cc.processEndpointDeletion(cachedService, clusterName, serviceController)
case err != nil:
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
clusterCache.endpointQueue.Add(key)
return err
default:
glog.V(4).Infof("Found endpoint for federation service %s/%s from cluster %s", endpoint.Namespace, endpoint.Name, clusterName)
err = cc.processEndpointUpdate(cachedService, endpoint, clusterName, serviceController)
}
if err != nil {
glog.Errorf("Failed to sync service: %+v, put back to service queue", err)

View File

@@ -17,7 +17,6 @@ limitations under the License.
package service
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/errors"
@@ -85,31 +84,26 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache
// if serviceCache does not exists, that means the service is not created by federation, we should skip it
return nil
}
serviceInterface, exists, err := clusterCache.serviceStore.Indexer.GetByKey(key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
clusterCache.serviceQueue.Add(key)
return err
}
var needUpdate, isDeletion bool
if exists {
service, ok := serviceInterface.(*v1.Service)
if ok {
glog.V(4).Infof("Found service for federation service %s/%s from cluster %s", service.Namespace, service.Name, clusterName)
needUpdate = cc.processServiceUpdate(cachedService, service, clusterName)
} else {
_, ok := serviceInterface.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", serviceInterface)
}
glog.Infof("Found tombstone for %v", key)
needUpdate = cc.processServiceDeletion(cachedService, clusterName)
isDeletion = true
}
} else {
service, err := clusterCache.serviceStore.Services(namespace).Get(name)
switch {
case errors.IsNotFound(err):
glog.Infof("Can not get service %v for cluster %s from serviceStore", key, clusterName)
needUpdate = cc.processServiceDeletion(cachedService, clusterName)
isDeletion = true
case err != nil:
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
clusterCache.serviceQueue.Add(key)
return err
default:
glog.V(4).Infof("Found service for federation service %s/%s from cluster %s", service.Namespace, service.Name, clusterName)
needUpdate = cc.processServiceUpdate(cachedService, service, clusterName)
}
if needUpdate {

View File

@@ -46,7 +46,7 @@ import (
"k8s.io/kubernetes/pkg/api"
v1 "k8s.io/kubernetes/pkg/api/v1"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/legacylisters"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller"
)
@@ -121,7 +121,7 @@ type ServiceController struct {
serviceCache *serviceCache
clusterCache *clusterClientCache
// A store of services, populated by the serviceController
serviceStore listers.StoreToServiceLister
serviceStore corelisters.ServiceLister
// Watches changes to all services
serviceController cache.Controller
federatedInformer fedutil.FederatedInformer
@@ -180,7 +180,8 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
knownClusterSet: make(sets.String),
}
s.clusterDeliverer = util.NewDelayingDeliverer()
s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer(
var serviceIndexer cache.Indexer
serviceIndexer, s.serviceController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
return s.federationClient.Core().Services(metav1.NamespaceAll).List(options)
@@ -203,6 +204,7 @@ func New(federationClient fedclientset.Interface, dns dnsprovider.Interface,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
s.serviceStore = corelisters.NewServiceLister(serviceIndexer)
s.clusterStore.Store, s.clusterController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
@@ -938,44 +940,40 @@ func (s *ServiceController) syncService(key string) error {
defer func() {
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
}()
// obj holds the latest service info from apiserver
objFromStore, exists, err := s.serviceStore.Indexer.GetByKey(key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("Unable to retrieve service %v from store: %v", key, err)
s.queue.Add(key)
return err
}
if !exists {
service, err := s.serviceStore.Services(namespace).Get(name)
switch {
case errors.IsNotFound(err):
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
glog.Infof("Service has been deleted %v", key)
err, retryDelay = s.processServiceDeletion(key)
}
// Create a copy before modifying the obj to prevent race condition with
// other readers of obj from store.
obj, err := conversion.NewCloner().DeepCopy(objFromStore)
if err != nil {
glog.Errorf("Error in deep copying service %v retrieved from store: %v", key, err)
case err != nil:
glog.Errorf("Unable to retrieve service %v from store: %v", key, err)
s.queue.Add(key)
return err
}
if exists {
service, ok := obj.(*v1.Service)
if ok {
cachedService = s.serviceCache.getOrCreate(key)
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
} else {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("Object contained wasn't a service or a deleted key: %+v", obj)
}
glog.Infof("Found tombstone for %v", key)
err, retryDelay = s.processServiceDeletion(tombstone.Key)
default:
// Create a copy before modifying the obj to prevent race condition with
// other readers of obj from store.
copy, err := conversion.NewCloner().DeepCopy(service)
if err != nil {
glog.Errorf("Error in deep copying service %v retrieved from store: %v", key, err)
s.queue.Add(key)
return err
}
service := copy.(*v1.Service)
cachedService = s.serviceCache.getOrCreate(key)
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
}
if retryDelay != 0 {
s.enqueueService(obj)
s.enqueueService(service)
} else if err != nil {
runtime.HandleError(fmt.Errorf("Failed to process service. Not retrying: %v", err))
}