Merge pull request #40385 from ncdc/shared-informers-02-swap-existing

Automatic merge from submit-queue

Replace hand-written informers with generated ones

Replace existing uses of hand-written informers with generated ones.
Follow-up commits will switch the use of one-off informers to shared
informers.

This is a precursor to #40097. That PR will switch one-off informers to shared informers for the majority of the code base (but not quite all of it...).

NOTE: this does create a second set of shared informers in the kube-controller-manager. This will be resolved back down to a single factory once #40097 is reviewed and merged.

There are a couple of places where I expanded the # of caches we wait for in the calls to `WaitForCacheSync` - please pay attention to those. I also added in a commented-out wait in the attach/detach controller. If @kubernetes/sig-storage-pr-reviews is ok with enabling the waiting, I'll do it (I'll just need to tweak an integration test slightly).

@deads2k @sttts @smarterclayton @liggitt @soltysh @timothysc @lavalamp @wojtek-t @gmarek @sjenning @derekwaynecarr @kubernetes/sig-scalability-pr-reviews
This commit is contained in:
Kubernetes Submit Queue
2017-02-06 16:25:42 -08:00
committed by GitHub
55 changed files with 936 additions and 823 deletions

View File

@@ -24,9 +24,11 @@ go_library(
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/client/informers/informers_generated/core/v1:go_default_library",
"//pkg/client/informers/informers_generated/extensions/v1beta1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/client/listers/extensions/v1beta1:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/controller/informers:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/util/metrics:go_default_library",
"//pkg/util/node:go_default_library",
@@ -67,10 +69,12 @@ go_test(
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/informers/informers_generated:go_default_library",
"//pkg/client/informers/informers_generated/core/v1:go_default_library",
"//pkg/client/informers/informers_generated/extensions/v1beta1:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/fake:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/informers:go_default_library",
"//pkg/controller/node/testutil:go_default_library",
"//pkg/util/node:go_default_library",
"//vendor:github.com/golang/glog",
@@ -82,7 +86,6 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/testing",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/util/flowcontrol",
],
)

View File

@@ -31,7 +31,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/legacylisters"
extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/util/node"
@@ -42,7 +42,7 @@ import (
// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted, or were found pending deletion.
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore listers.StoreToDaemonSetLister) (bool, error) {
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore extensionslisters.DaemonSetLister) (bool, error) {
remaining := false
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName).String()
options := metav1.ListOptions{FieldSelector: selector}
@@ -155,7 +155,11 @@ func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
return
}
nodeObj, found, err := nc.nodeStore.Store.GetByKey(pod.Spec.NodeName)
node, err := nc.nodeLister.Get(pod.Spec.NodeName)
// if there is no such node, do nothing and let the podGC clean it up.
if errors.IsNotFound(err) {
return
}
if err != nil {
// this can only happen if the Store.KeyFunc has a problem creating
// a key for the pod. If it happens once, it will happen again so
@@ -164,17 +168,11 @@ func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
return
}
// if there is no such node, do nothing and let the podGC clean it up.
if !found {
return
}
// delete terminating pods that have been scheduled on
// nodes that do not support graceful termination
// TODO(mikedanese): this can be removed when we no longer
// guarantee backwards compatibility of master API to kubelets with
// versions less than 1.1.0
node := nodeObj.(*v1.Node)
v, err := utilversion.ParseSemantic(node.Status.NodeInfo.KubeletVersion)
if err != nil {
glog.V(0).Infof("Couldn't parse version %q of node: %v", node.Status.NodeInfo.KubeletVersion, err)

View File

@@ -25,6 +25,7 @@ import (
"github.com/golang/glog"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
@@ -39,9 +40,11 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/legacylisters"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/core/v1"
extensionsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/extensions/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
extensionslisters "k8s.io/kubernetes/pkg/client/listers/extensions/v1beta1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/util/metrics"
utilnode "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/system"
@@ -134,13 +137,15 @@ type NodeController struct {
// The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration
recorder record.EventRecorder
podInformer informers.PodInformer
nodeInformer informers.NodeInformer
daemonSetInformer informers.DaemonSetInformer
podStore listers.StoreToPodLister
nodeStore listers.StoreToNodeLister
daemonSetStore listers.StoreToDaemonSetLister
nodeLister corelisters.NodeLister
nodeInformerSynced cache.InformerSynced
daemonSetStore extensionslisters.DaemonSetLister
daemonSetInformerSynced cache.InformerSynced
podInformerSynced cache.InformerSynced
// allocate/recycle CIDRs for node if allocateNodeCIDRs == true
cidrAllocator CIDRAllocator
@@ -155,13 +160,6 @@ type NodeController struct {
secondaryEvictionLimiterQPS float32
largeClusterThreshold int32
unhealthyZoneThreshold float32
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewDaemonSetsController(passing SharedInformer), this
// will be null
internalPodInformer cache.SharedIndexInformer
}
// NewNodeController returns a new node controller to sync instances from cloudprovider.
@@ -169,9 +167,9 @@ type NodeController struct {
// podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes
// currently, this should be handled as a fatal error.
func NewNodeController(
podInformer informers.PodInformer,
nodeInformer informers.NodeInformer,
daemonSetInformer informers.DaemonSetInformer,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
daemonSetInformer extensionsinformers.DaemonSetInformer,
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
@@ -234,9 +232,6 @@ func NewNodeController(
largeClusterThreshold: largeClusterThreshold,
unhealthyZoneThreshold: unhealthyZoneThreshold,
zoneStates: make(map[string]zoneState),
podInformer: podInformer,
nodeInformer: nodeInformer,
daemonSetInformer: daemonSetInformer,
}
nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
@@ -246,7 +241,7 @@ func NewNodeController(
AddFunc: nc.maybeDeleteTerminatingPod,
UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) },
})
nc.podStore = *podInformer.Lister()
nc.podInformerSynced = podInformer.Informer().HasSynced
nodeEventHandlerFuncs := cache.ResourceEventHandlerFuncs{}
if nc.allocateNodeCIDRs {
@@ -347,9 +342,11 @@ func NewNodeController(
}
nodeInformer.Informer().AddEventHandler(nodeEventHandlerFuncs)
nc.nodeStore = *nodeInformer.Lister()
nc.nodeLister = nodeInformer.Lister()
nc.nodeInformerSynced = nodeInformer.Informer().HasSynced
nc.daemonSetStore = *daemonSetInformer.Lister()
nc.daemonSetStore = daemonSetInformer.Lister()
nc.daemonSetInformerSynced = daemonSetInformer.Informer().HasSynced
return nc, nil
}
@@ -359,8 +356,8 @@ func (nc *NodeController) Run() {
go func() {
defer utilruntime.HandleCrash()
if !cache.WaitForCacheSync(wait.NeverStop, nc.nodeInformer.Informer().HasSynced, nc.podInformer.Informer().HasSynced, nc.daemonSetInformer.Informer().HasSynced) {
utilruntime.HandleError(errors.New("NodeController timed out while waiting for informers to sync..."))
if !cache.WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
@@ -379,13 +376,13 @@ func (nc *NodeController) Run() {
defer nc.evictorLock.Unlock()
for k := range nc.zonePodEvictor {
nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
obj, exists, err := nc.nodeStore.GetByKey(value.Value)
node, err := nc.nodeLister.Get(value.Value)
if apierrors.IsNotFound(err) {
glog.Warningf("Node %v no longer present in nodeLister!", value.Value)
}
if err != nil {
glog.Warningf("Failed to get Node %v from the nodeStore: %v", value.Value, err)
} else if !exists {
glog.Warningf("Node %v no longer present in nodeStore!", value.Value)
glog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
} else {
node, _ := obj.(*v1.Node)
zone := utilnode.GetZoneKey(node)
EvictionsNumber.WithLabelValues(zone).Inc()
}
@@ -413,11 +410,11 @@ func (nc *NodeController) Run() {
func (nc *NodeController) monitorNodeStatus() error {
// We are listing nodes from local cache as we can tolerate some small delays
// comparing to state from etcd and there is eventual consistency anyway.
nodes, err := nc.nodeStore.List()
nodes, err := nc.nodeLister.List(labels.Everything())
if err != nil {
return err
}
added, deleted := nc.checkForNodeAddedDeleted(&nodes)
added, deleted := nc.checkForNodeAddedDeleted(nodes)
for i := range added {
glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name)
recordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name))
@@ -442,11 +439,11 @@ func (nc *NodeController) monitorNodeStatus() error {
}
zoneToNodeConditions := map[string][]*v1.NodeCondition{}
for i := range nodes.Items {
for i := range nodes {
var gracePeriod time.Duration
var observedReadyCondition v1.NodeCondition
var currentReadyCondition *v1.NodeCondition
nodeCopy, err := api.Scheme.DeepCopy(&nodes.Items[i])
nodeCopy, err := api.Scheme.DeepCopy(nodes[i])
if err != nil {
utilruntime.HandleError(err)
continue
@@ -528,12 +525,12 @@ func (nc *NodeController) monitorNodeStatus() error {
}
}
}
nc.handleDisruption(zoneToNodeConditions, &nodes)
nc.handleDisruption(zoneToNodeConditions, nodes)
return nil
}
func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes *v1.NodeList) {
func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
newZoneStates := map[string]zoneState{}
allAreFullyDisrupted := true
for k, v := range zoneToNodeConditions {
@@ -574,8 +571,8 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1
// We're switching to full disruption mode
if allAreFullyDisrupted {
glog.V(0).Info("NodeController detected that all Nodes are not-Ready. Entering master disruption mode.")
for i := range nodes.Items {
nc.cancelPodEviction(&nodes.Items[i])
for i := range nodes {
nc.cancelPodEviction(nodes[i])
}
// We stop all evictions.
for k := range nc.zonePodEvictor {
@@ -592,11 +589,11 @@ func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1
glog.V(0).Info("NodeController detected that some Nodes are Ready. Exiting master disruption mode.")
// When exiting disruption mode update probe timestamps on all Nodes.
now := nc.now()
for i := range nodes.Items {
v := nc.nodeStatusMap[nodes.Items[i].Name]
for i := range nodes {
v := nc.nodeStatusMap[nodes[i].Name]
v.probeTimestamp = now
v.readyTransitionTimestamp = now
nc.nodeStatusMap[nodes.Items[i].Name] = v
nc.nodeStatusMap[nodes[i].Name] = v
}
// We reset all rate limiters to settings appropriate for the given state.
for k := range nc.zonePodEvictor {
@@ -797,21 +794,21 @@ func (nc *NodeController) tryUpdateNodeStatus(node *v1.Node) (time.Duration, v1.
return gracePeriod, observedReadyCondition, currentReadyCondition, err
}
func (nc *NodeController) checkForNodeAddedDeleted(nodes *v1.NodeList) (added, deleted []*v1.Node) {
for i := range nodes.Items {
if _, has := nc.knownNodeSet[nodes.Items[i].Name]; !has {
added = append(added, &nodes.Items[i])
func (nc *NodeController) checkForNodeAddedDeleted(nodes []*v1.Node) (added, deleted []*v1.Node) {
for i := range nodes {
if _, has := nc.knownNodeSet[nodes[i].Name]; !has {
added = append(added, nodes[i])
}
}
// If there's a difference between lengths of known Nodes and observed nodes
// we must have removed some Node.
if len(nc.knownNodeSet)+len(added) != len(nodes.Items) {
if len(nc.knownNodeSet)+len(added) != len(nodes) {
knowSetCopy := map[string]*v1.Node{}
for k, v := range nc.knownNodeSet {
knowSetCopy[k] = v
}
for i := range nodes.Items {
delete(knowSetCopy, nodes.Items[i].Name)
for i := range nodes {
delete(knowSetCopy, nodes[i].Name)
}
for i := range knowSetCopy {
deleted = append(deleted, knowSetCopy[i])

View File

@@ -29,15 +29,16 @@ import (
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
testcore "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/core/v1"
extensionsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/extensions/v1beta1"
"k8s.io/kubernetes/pkg/cloudprovider"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/controller/node/testutil"
"k8s.io/kubernetes/pkg/util/node"
)
@@ -51,6 +52,14 @@ const (
testUnhealtyThreshold = float32(0.55)
)
func alwaysReady() bool { return true }
type nodeController struct {
*NodeController
nodeInformer coreinformers.NodeInformer
daemonSetInformer extensionsinformers.DaemonSetInformer
}
func NewNodeControllerFromClient(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
@@ -65,21 +74,44 @@ func NewNodeControllerFromClient(
clusterCIDR *net.IPNet,
serviceCIDR *net.IPNet,
nodeCIDRMaskSize int,
allocateNodeCIDRs bool) (*NodeController, error) {
allocateNodeCIDRs bool) (*nodeController, error) {
factory := informers.NewSharedInformerFactory(kubeClient, nil, controller.NoResyncPeriodFunc())
factory := informers.NewSharedInformerFactory(nil, kubeClient, controller.NoResyncPeriodFunc())
nc, err := NewNodeController(factory.Pods(), factory.Nodes(), factory.DaemonSets(), cloud, kubeClient, podEvictionTimeout, evictionLimiterQPS, secondaryEvictionLimiterQPS,
largeClusterThreshold, unhealthyZoneThreshold, nodeMonitorGracePeriod, nodeStartupGracePeriod, nodeMonitorPeriod, clusterCIDR,
serviceCIDR, nodeCIDRMaskSize, allocateNodeCIDRs)
nodeInformer := factory.Core().V1().Nodes()
daemonSetInformer := factory.Extensions().V1beta1().DaemonSets()
nc, err := NewNodeController(
factory.Core().V1().Pods(),
nodeInformer,
daemonSetInformer,
cloud,
kubeClient,
podEvictionTimeout,
evictionLimiterQPS,
secondaryEvictionLimiterQPS,
largeClusterThreshold,
unhealthyZoneThreshold,
nodeMonitorGracePeriod,
nodeStartupGracePeriod,
nodeMonitorPeriod,
clusterCIDR,
serviceCIDR,
nodeCIDRMaskSize,
allocateNodeCIDRs,
)
if err != nil {
return nil, err
}
return nc, nil
nc.podInformerSynced = alwaysReady
nc.nodeInformerSynced = alwaysReady
nc.daemonSetInformerSynced = alwaysReady
return &nodeController{nc, nodeInformer, daemonSetInformer}, nil
}
func syncNodeStore(nc *NodeController, fakeNodeHandler *testutil.FakeNodeHandler) error {
func syncNodeStore(nc *nodeController, fakeNodeHandler *testutil.FakeNodeHandler) error {
nodes, err := fakeNodeHandler.List(metav1.ListOptions{})
if err != nil {
return err
@@ -88,7 +120,7 @@ func syncNodeStore(nc *NodeController, fakeNodeHandler *testutil.FakeNodeHandler
for i := range nodes.Items {
newElems = append(newElems, &nodes.Items[i])
}
return nc.nodeStore.Replace(newElems, "newRV")
return nc.nodeInformer.Informer().GetStore().Replace(newElems, "newRV")
}
func TestMonitorNodeStatusEvictPods(t *testing.T) {
@@ -518,7 +550,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() metav1.Time { return fakeNow }
for _, ds := range item.daemonSets {
nodeController.daemonSetStore.Add(&ds)
nodeController.daemonSetInformer.Informer().GetStore().Add(&ds)
}
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
@@ -541,7 +573,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
for _, zone := range zones {
nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) {
nodeUid, _ := value.UID.(string)
deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore)
deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetInformer.Lister())
return true, 0
})
}
@@ -1910,8 +1942,7 @@ func TestCheckPod(t *testing.T) {
}
nc, _ := NewNodeControllerFromClient(nil, fake.NewSimpleClientset(), 0, 0, 0, 0, 0, 0, 0, 0, nil, nil, 0, false)
nc.nodeStore.Store = cache.NewStore(cache.MetaNamespaceKeyFunc)
nc.nodeStore.Store.Add(&v1.Node{
nc.nodeInformer.Informer().GetStore().Add(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "new",
},
@@ -1921,7 +1952,7 @@ func TestCheckPod(t *testing.T) {
},
},
})
nc.nodeStore.Store.Add(&v1.Node{
nc.nodeInformer.Informer().GetStore().Add(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "old",
},
@@ -1931,7 +1962,7 @@ func TestCheckPod(t *testing.T) {
},
},
})
nc.nodeStore.Store.Add(&v1.Node{
nc.nodeInformer.Informer().GetStore().Add(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "older",
},
@@ -1941,7 +1972,7 @@ func TestCheckPod(t *testing.T) {
},
},
})
nc.nodeStore.Store.Add(&v1.Node{
nc.nodeInformer.Informer().GetStore().Add(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "oldest",
},