Merge branch 'master' into upgrade_aliases_branch
This commit is contained in:
@@ -133,6 +133,7 @@ filegroup(
|
||||
"//pkg/controller/volume/events:all-srcs",
|
||||
"//pkg/controller/volume/expand:all-srcs",
|
||||
"//pkg/controller/volume/persistentvolume:all-srcs",
|
||||
"//pkg/controller/volume/pvcprotection:all-srcs",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
)
|
||||
|
||||
@@ -16,12 +16,15 @@ go_test(
|
||||
importpath = "k8s.io/kubernetes/pkg/controller/node/ipam",
|
||||
library = ":go_default_library",
|
||||
deps = [
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/node/ipam/cidrset:go_default_library",
|
||||
"//pkg/controller/node/ipam/test:go_default_library",
|
||||
"//pkg/controller/testutil:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
],
|
||||
)
|
||||
@@ -41,6 +44,7 @@ go_library(
|
||||
deps = [
|
||||
"//pkg/cloudprovider:go_default_library",
|
||||
"//pkg/cloudprovider/providers/gce:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/node/ipam/cidrset:go_default_library",
|
||||
"//pkg/controller/node/ipam/sync:go_default_library",
|
||||
"//pkg/controller/node/util:go_default_library",
|
||||
@@ -52,12 +56,14 @@ go_library(
|
||||
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset/scheme:go_default_library",
|
||||
|
||||
@@ -80,12 +80,12 @@ type CIDRAllocator interface {
|
||||
AllocateOrOccupyCIDR(node *v1.Node) error
|
||||
// ReleaseCIDR releases the CIDR of the removed node
|
||||
ReleaseCIDR(node *v1.Node) error
|
||||
// Register allocator with the nodeInformer for updates.
|
||||
Register(nodeInformer informers.NodeInformer)
|
||||
// Run starts all the working logic of the allocator.
|
||||
Run(stopCh <-chan struct{})
|
||||
}
|
||||
|
||||
// New creates a new CIDR range allocator.
|
||||
func New(kubeClient clientset.Interface, cloud cloudprovider.Interface, allocatorType CIDRAllocatorType, clusterCIDR, serviceCIDR *net.IPNet, nodeCIDRMaskSize int) (CIDRAllocator, error) {
|
||||
func New(kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer, allocatorType CIDRAllocatorType, clusterCIDR, serviceCIDR *net.IPNet, nodeCIDRMaskSize int) (CIDRAllocator, error) {
|
||||
nodeList, err := listNodes(kubeClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -93,9 +93,9 @@ func New(kubeClient clientset.Interface, cloud cloudprovider.Interface, allocato
|
||||
|
||||
switch allocatorType {
|
||||
case RangeAllocatorType:
|
||||
return NewCIDRRangeAllocator(kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList)
|
||||
return NewCIDRRangeAllocator(kubeClient, nodeInformer, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList)
|
||||
case CloudAllocatorType:
|
||||
return NewCloudCIDRAllocator(kubeClient, cloud)
|
||||
return NewCloudCIDRAllocator(kubeClient, cloud, nodeInformer)
|
||||
default:
|
||||
return nil, fmt.Errorf("Invalid CIDR allocator type: %v", allocatorType)
|
||||
}
|
||||
|
||||
@@ -25,9 +25,10 @@ import (
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
informers "k8s.io/client-go/informers/core/v1"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
|
||||
@@ -37,6 +38,7 @@ import (
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/node/util"
|
||||
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
||||
)
|
||||
@@ -49,6 +51,12 @@ type cloudCIDRAllocator struct {
|
||||
client clientset.Interface
|
||||
cloud *gce.GCECloud
|
||||
|
||||
// nodeLister is able to list/get nodes and is populated by the shared informer passed to
|
||||
// NewCloudCIDRAllocator.
|
||||
nodeLister corelisters.NodeLister
|
||||
// nodesSynced returns true if the node shared informer has been synced at least once.
|
||||
nodesSynced cache.InformerSynced
|
||||
|
||||
// Channel that is used to pass updating Nodes to the background.
|
||||
// This increases the throughput of CIDR assignment by parallelization
|
||||
// and not blocking on long operations (which shouldn't be done from
|
||||
@@ -64,7 +72,7 @@ type cloudCIDRAllocator struct {
|
||||
var _ CIDRAllocator = (*cloudCIDRAllocator)(nil)
|
||||
|
||||
// NewCloudCIDRAllocator creates a new cloud CIDR allocator.
|
||||
func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Interface) (CIDRAllocator, error) {
|
||||
func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer) (CIDRAllocator, error) {
|
||||
if client == nil {
|
||||
glog.Fatalf("kubeClient is nil when starting NodeController")
|
||||
}
|
||||
@@ -84,20 +92,45 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
|
||||
ca := &cloudCIDRAllocator{
|
||||
client: client,
|
||||
cloud: gceCloud,
|
||||
nodeLister: nodeInformer.Lister(),
|
||||
nodesSynced: nodeInformer.Informer().HasSynced,
|
||||
nodeUpdateChannel: make(chan string, cidrUpdateQueueSize),
|
||||
recorder: recorder,
|
||||
nodesInProcessing: sets.NewString(),
|
||||
}
|
||||
|
||||
for i := 0; i < cidrUpdateWorkers; i++ {
|
||||
// TODO: Take stopChan as an argument to NewCloudCIDRAllocator and pass it to the worker.
|
||||
go ca.worker(wait.NeverStop)
|
||||
}
|
||||
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: util.CreateAddNodeHandler(ca.AllocateOrOccupyCIDR),
|
||||
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
|
||||
if newNode.Spec.PodCIDR == "" {
|
||||
return ca.AllocateOrOccupyCIDR(newNode)
|
||||
}
|
||||
return nil
|
||||
}),
|
||||
DeleteFunc: util.CreateDeleteNodeHandler(ca.ReleaseCIDR),
|
||||
})
|
||||
|
||||
glog.V(0).Infof("Using cloud CIDR allocator (provider: %v)", cloud.ProviderName())
|
||||
return ca, nil
|
||||
}
|
||||
|
||||
func (ca *cloudCIDRAllocator) Run(stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
glog.Infof("Starting cloud CIDR allocator")
|
||||
defer glog.Infof("Shutting down cloud CIDR allocator")
|
||||
|
||||
if !controller.WaitForCacheSync("cidrallocator", stopCh, ca.nodesSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < cidrUpdateWorkers; i++ {
|
||||
go ca.worker(stopCh)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) {
|
||||
for {
|
||||
select {
|
||||
@@ -169,7 +202,7 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(nodeName string) error {
|
||||
|
||||
for rep := 0; rep < cidrUpdateRetries; rep++ {
|
||||
// TODO: change it to using PATCH instead of full Node updates.
|
||||
node, err = ca.client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
|
||||
node, err = ca.nodeLister.Get(nodeName)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", nodeName, err)
|
||||
continue
|
||||
@@ -218,16 +251,3 @@ func (ca *cloudCIDRAllocator) ReleaseCIDR(node *v1.Node) error {
|
||||
node.Name, node.Spec.PodCIDR)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ca *cloudCIDRAllocator) Register(nodeInformer informers.NodeInformer) {
|
||||
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: util.CreateAddNodeHandler(ca.AllocateOrOccupyCIDR),
|
||||
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
|
||||
if newNode.Spec.PodCIDR == "" {
|
||||
return ca.AllocateOrOccupyCIDR(newNode)
|
||||
}
|
||||
return nil
|
||||
}),
|
||||
DeleteFunc: util.CreateDeleteNodeHandler(ca.ReleaseCIDR),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -25,16 +25,16 @@ import (
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
informers "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/node/ipam/cidrset"
|
||||
"k8s.io/kubernetes/pkg/controller/node/util"
|
||||
)
|
||||
@@ -45,6 +45,12 @@ type rangeAllocator struct {
|
||||
clusterCIDR *net.IPNet
|
||||
maxCIDRs int
|
||||
|
||||
// nodeLister is able to list/get nodes and is populated by the shared informer passed to
|
||||
// NewCloudCIDRAllocator.
|
||||
nodeLister corelisters.NodeLister
|
||||
// nodesSynced returns true if the node shared informer has been synced at least once.
|
||||
nodesSynced cache.InformerSynced
|
||||
|
||||
// Channel that is used to pass updating Nodes with assigned CIDRs to the background
|
||||
// This increases a throughput of CIDR assignment by not blocking on long operations.
|
||||
nodeCIDRUpdateChannel chan nodeAndCIDR
|
||||
@@ -59,7 +65,7 @@ type rangeAllocator struct {
|
||||
// Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
|
||||
// Caller must always pass in a list of existing nodes so the new allocator
|
||||
// can initialize its CIDR map. NodeList is only nil in testing.
|
||||
func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *v1.NodeList) (CIDRAllocator, error) {
|
||||
func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.NodeInformer, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *v1.NodeList) (CIDRAllocator, error) {
|
||||
if client == nil {
|
||||
glog.Fatalf("kubeClient is nil when starting NodeController")
|
||||
}
|
||||
@@ -78,6 +84,8 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
|
||||
client: client,
|
||||
cidrs: set,
|
||||
clusterCIDR: clusterCIDR,
|
||||
nodeLister: nodeInformer.Lister(),
|
||||
nodesSynced: nodeInformer.Informer().HasSynced,
|
||||
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
|
||||
recorder: recorder,
|
||||
nodesInProcessing: sets.NewString(),
|
||||
@@ -107,14 +115,57 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
|
||||
}
|
||||
}
|
||||
}
|
||||
for i := 0; i < cidrUpdateWorkers; i++ {
|
||||
// TODO: Take stopChan as an argument to NewCIDRRangeAllocator and pass it to the worker.
|
||||
go ra.worker(wait.NeverStop)
|
||||
}
|
||||
|
||||
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: util.CreateAddNodeHandler(ra.AllocateOrOccupyCIDR),
|
||||
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
|
||||
// If the PodCIDR is not empty we either:
|
||||
// - already processed a Node that already had a CIDR after NC restarted
|
||||
// (cidr is marked as used),
|
||||
// - already processed a Node successfully and allocated a CIDR for it
|
||||
// (cidr is marked as used),
|
||||
// - already processed a Node but we did saw a "timeout" response and
|
||||
// request eventually got through in this case we haven't released
|
||||
// the allocated CIDR (cidr is still marked as used).
|
||||
// There's a possible error here:
|
||||
// - NC sees a new Node and assigns a CIDR X to it,
|
||||
// - Update Node call fails with a timeout,
|
||||
// - Node is updated by some other component, NC sees an update and
|
||||
// assigns CIDR Y to the Node,
|
||||
// - Both CIDR X and CIDR Y are marked as used in the local cache,
|
||||
// even though Node sees only CIDR Y
|
||||
// The problem here is that in in-memory cache we see CIDR X as marked,
|
||||
// which prevents it from being assigned to any new node. The cluster
|
||||
// state is correct.
|
||||
// Restart of NC fixes the issue.
|
||||
if newNode.Spec.PodCIDR == "" {
|
||||
return ra.AllocateOrOccupyCIDR(newNode)
|
||||
}
|
||||
return nil
|
||||
}),
|
||||
DeleteFunc: util.CreateDeleteNodeHandler(ra.ReleaseCIDR),
|
||||
})
|
||||
|
||||
return ra, nil
|
||||
}
|
||||
|
||||
func (r *rangeAllocator) Run(stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
glog.Infof("Starting range CIDR allocator")
|
||||
defer glog.Infof("Shutting down range CIDR allocator")
|
||||
|
||||
if !controller.WaitForCacheSync("cidrallocator", stopCh, r.nodesSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < cidrUpdateWorkers; i++ {
|
||||
go r.worker(stopCh)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func (r *rangeAllocator) worker(stopChan <-chan struct{}) {
|
||||
for {
|
||||
select {
|
||||
@@ -232,7 +283,7 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
|
||||
podCIDR := data.cidr.String()
|
||||
for rep := 0; rep < cidrUpdateRetries; rep++ {
|
||||
// TODO: change it to using PATCH instead of full Node updates.
|
||||
node, err = r.client.CoreV1().Nodes().Get(data.nodeName, metav1.GetOptions{})
|
||||
node, err = r.nodeLister.Get(data.nodeName)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err)
|
||||
continue
|
||||
@@ -269,35 +320,3 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *rangeAllocator) Register(nodeInformer informers.NodeInformer) {
|
||||
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: util.CreateAddNodeHandler(r.AllocateOrOccupyCIDR),
|
||||
UpdateFunc: util.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
|
||||
// If the PodCIDR is not empty we either:
|
||||
// - already processed a Node that already had a CIDR after NC restarted
|
||||
// (cidr is marked as used),
|
||||
// - already processed a Node successfully and allocated a CIDR for it
|
||||
// (cidr is marked as used),
|
||||
// - already processed a Node but we did saw a "timeout" response and
|
||||
// request eventually got through in this case we haven't released
|
||||
// the allocated CIDR (cidr is still marked as used).
|
||||
// There's a possible error here:
|
||||
// - NC sees a new Node and assigns a CIDR X to it,
|
||||
// - Update Node call fails with a timeout,
|
||||
// - Node is updated by some other component, NC sees an update and
|
||||
// assigns CIDR Y to the Node,
|
||||
// - Both CIDR X and CIDR Y are marked as used in the local cache,
|
||||
// even though Node sees only CIDR Y
|
||||
// The problem here is that in in-memory cache we see CIDR X as marked,
|
||||
// which prevents it from being assigned to any new node. The cluster
|
||||
// state is correct.
|
||||
// Restart of NC fixes the issue.
|
||||
if newNode.Spec.PodCIDR == "" {
|
||||
return r.AllocateOrOccupyCIDR(newNode)
|
||||
}
|
||||
return nil
|
||||
}),
|
||||
DeleteFunc: util.CreateDeleteNodeHandler(r.ReleaseCIDR),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -24,7 +24,10 @@ import (
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/testutil"
|
||||
)
|
||||
|
||||
@@ -32,6 +35,8 @@ const (
|
||||
nodePollInterval = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
var alwaysReady = func() bool { return true }
|
||||
|
||||
func waitForUpdatedNodeWithTimeout(nodeHandler *testutil.FakeNodeHandler, number int, timeout time.Duration) error {
|
||||
return wait.Poll(nodePollInterval, timeout, func() (bool, error) {
|
||||
if len(nodeHandler.GetUpdatedNodesCopy()) >= number {
|
||||
@@ -41,6 +46,19 @@ func waitForUpdatedNodeWithTimeout(nodeHandler *testutil.FakeNodeHandler, number
|
||||
})
|
||||
}
|
||||
|
||||
// Creates a fakeNodeInformer using the provided fakeNodeHandler.
|
||||
func getFakeNodeInformer(fakeNodeHandler *testutil.FakeNodeHandler) coreinformers.NodeInformer {
|
||||
fakeClient := &fake.Clientset{}
|
||||
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, controller.NoResyncPeriodFunc())
|
||||
fakeNodeInformer := fakeInformerFactory.Core().V1().Nodes()
|
||||
|
||||
for _, node := range fakeNodeHandler.Existing {
|
||||
fakeNodeInformer.Informer().GetStore().Add(node)
|
||||
}
|
||||
|
||||
return fakeNodeInformer
|
||||
}
|
||||
|
||||
func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
|
||||
testCases := []struct {
|
||||
description string
|
||||
@@ -130,19 +148,23 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
|
||||
expectedAllocatedCIDR string
|
||||
allocatedCIDRs []string
|
||||
}) {
|
||||
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
|
||||
// Initialize the range allocator.
|
||||
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, getFakeNodeInformer(tc.fakeNodeHandler), tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
|
||||
rangeAllocator, ok := allocator.(*rangeAllocator)
|
||||
if !ok {
|
||||
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
|
||||
return
|
||||
}
|
||||
rangeAllocator.nodesSynced = alwaysReady
|
||||
rangeAllocator.recorder = testutil.NewFakeRecorder()
|
||||
go allocator.Run(wait.NeverStop)
|
||||
|
||||
// this is a bit of white box testing
|
||||
for _, allocated := range tc.allocatedCIDRs {
|
||||
_, cidr, err := net.ParseCIDR(allocated)
|
||||
if err != nil {
|
||||
t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err)
|
||||
}
|
||||
rangeAllocator, ok := allocator.(*rangeAllocator)
|
||||
if !ok {
|
||||
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
|
||||
return
|
||||
}
|
||||
rangeAllocator.recorder = testutil.NewFakeRecorder()
|
||||
if err = rangeAllocator.cidrs.Occupy(cidr); err != nil {
|
||||
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
|
||||
}
|
||||
@@ -212,19 +234,23 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) {
|
||||
subNetMaskSize int
|
||||
allocatedCIDRs []string
|
||||
}) {
|
||||
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
|
||||
// Initialize the range allocator.
|
||||
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, getFakeNodeInformer(tc.fakeNodeHandler), tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
|
||||
rangeAllocator, ok := allocator.(*rangeAllocator)
|
||||
if !ok {
|
||||
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
|
||||
return
|
||||
}
|
||||
rangeAllocator.nodesSynced = alwaysReady
|
||||
rangeAllocator.recorder = testutil.NewFakeRecorder()
|
||||
go allocator.Run(wait.NeverStop)
|
||||
|
||||
// this is a bit of white box testing
|
||||
for _, allocated := range tc.allocatedCIDRs {
|
||||
_, cidr, err := net.ParseCIDR(allocated)
|
||||
if err != nil {
|
||||
t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err)
|
||||
}
|
||||
rangeAllocator, ok := allocator.(*rangeAllocator)
|
||||
if !ok {
|
||||
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
|
||||
return
|
||||
}
|
||||
rangeAllocator.recorder = testutil.NewFakeRecorder()
|
||||
err = rangeAllocator.cidrs.Occupy(cidr)
|
||||
if err != nil {
|
||||
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
|
||||
@@ -324,19 +350,23 @@ func TestReleaseCIDRSuccess(t *testing.T) {
|
||||
allocatedCIDRs []string
|
||||
cidrsToRelease []string
|
||||
}) {
|
||||
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
|
||||
// Initialize the range allocator.
|
||||
allocator, _ := NewCIDRRangeAllocator(tc.fakeNodeHandler, getFakeNodeInformer(tc.fakeNodeHandler), tc.clusterCIDR, tc.serviceCIDR, tc.subNetMaskSize, nil)
|
||||
rangeAllocator, ok := allocator.(*rangeAllocator)
|
||||
if !ok {
|
||||
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
|
||||
return
|
||||
}
|
||||
rangeAllocator.nodesSynced = alwaysReady
|
||||
rangeAllocator.recorder = testutil.NewFakeRecorder()
|
||||
go allocator.Run(wait.NeverStop)
|
||||
|
||||
// this is a bit of white box testing
|
||||
for _, allocated := range tc.allocatedCIDRs {
|
||||
_, cidr, err := net.ParseCIDR(allocated)
|
||||
if err != nil {
|
||||
t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err)
|
||||
}
|
||||
rangeAllocator, ok := allocator.(*rangeAllocator)
|
||||
if !ok {
|
||||
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
|
||||
return
|
||||
}
|
||||
rangeAllocator.recorder = testutil.NewFakeRecorder()
|
||||
err = rangeAllocator.cidrs.Occupy(cidr)
|
||||
if err != nil {
|
||||
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
|
||||
|
||||
@@ -360,11 +360,10 @@ func NewNodeController(
|
||||
} else {
|
||||
var err error
|
||||
nc.cidrAllocator, err = ipam.New(
|
||||
kubeClient, cloud, nc.allocatorType, nc.clusterCIDR, nc.serviceCIDR, nodeCIDRMaskSize)
|
||||
kubeClient, cloud, nodeInformer, nc.allocatorType, nc.clusterCIDR, nc.serviceCIDR, nodeCIDRMaskSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nc.cidrAllocator.Register(nodeInformer)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -585,6 +584,12 @@ func (nc *Controller) Run(stopCh <-chan struct{}) {
|
||||
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop)
|
||||
}
|
||||
|
||||
if nc.allocateNodeCIDRs {
|
||||
if nc.allocatorType != ipam.IPAMFromClusterAllocatorType && nc.allocatorType != ipam.IPAMFromCloudAllocatorType {
|
||||
go nc.cidrAllocator.Run(wait.NeverStop)
|
||||
}
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
|
||||
61
pkg/controller/volume/pvcprotection/BUILD
Normal file
61
pkg/controller/volume/pvcprotection/BUILD
Normal file
@@ -0,0 +1,61 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["pvc_protection_controller.go"],
|
||||
importpath = "k8s.io/kubernetes/pkg/controller/volume/pvcprotection",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/util/metrics:go_default_library",
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//pkg/volume/util/volumehelper:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["pvc_protection_controller_test.go"],
|
||||
importpath = "k8s.io/kubernetes/pkg/controller/volume/pvcprotection",
|
||||
library = ":go_default_library",
|
||||
deps = [
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//vendor/k8s.io/client-go/testing:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
284
pkg/controller/volume/pvcprotection/pvc_protection_controller.go
Normal file
284
pkg/controller/volume/pvcprotection/pvc_protection_controller.go
Normal file
@@ -0,0 +1,284 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package pvcprotection
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/core/v1"
|
||||
apierrs "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/util/metrics"
|
||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
|
||||
)
|
||||
|
||||
// Controller is controller that removes PVCProtectionFinalizer
|
||||
// from PVCs that are used by no pods.
|
||||
type Controller struct {
|
||||
client clientset.Interface
|
||||
|
||||
pvcLister corelisters.PersistentVolumeClaimLister
|
||||
pvcListerSynced cache.InformerSynced
|
||||
|
||||
podLister corelisters.PodLister
|
||||
podListerSynced cache.InformerSynced
|
||||
|
||||
queue workqueue.RateLimitingInterface
|
||||
}
|
||||
|
||||
// NewPVCProtectionController returns a new *{VCProtectionController.
|
||||
func NewPVCProtectionController(pvcInformer coreinformers.PersistentVolumeClaimInformer, podInformer coreinformers.PodInformer, cl clientset.Interface) *Controller {
|
||||
e := &Controller{
|
||||
client: cl,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcprotection"),
|
||||
}
|
||||
if cl != nil && cl.CoreV1().RESTClient().GetRateLimiter() != nil {
|
||||
metrics.RegisterMetricAndTrackRateLimiterUsage("persistentvolumeclaim_protection_controller", cl.CoreV1().RESTClient().GetRateLimiter())
|
||||
}
|
||||
|
||||
e.pvcLister = pvcInformer.Lister()
|
||||
e.pvcListerSynced = pvcInformer.Informer().HasSynced
|
||||
pvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: e.pvcAddedUpdated,
|
||||
UpdateFunc: func(old, new interface{}) {
|
||||
e.pvcAddedUpdated(new)
|
||||
},
|
||||
})
|
||||
|
||||
e.podLister = podInformer.Lister()
|
||||
e.podListerSynced = podInformer.Informer().HasSynced
|
||||
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
e.podAddedDeletedUpdated(obj, false)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
e.podAddedDeletedUpdated(obj, true)
|
||||
},
|
||||
UpdateFunc: func(old, new interface{}) {
|
||||
e.podAddedDeletedUpdated(new, false)
|
||||
},
|
||||
})
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
// Run runs the controller goroutines.
|
||||
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.queue.ShutDown()
|
||||
|
||||
glog.Infof("Starting PVC protection controller")
|
||||
defer glog.Infof("Shutting down PVC protection controller")
|
||||
|
||||
if !controller.WaitForCacheSync("PVC protection", stopCh, c.pvcListerSynced, c.podListerSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func (c *Controller) runWorker() {
|
||||
for c.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
// processNextWorkItem deals with one pvcKey off the queue. It returns false when it's time to quit.
|
||||
func (c *Controller) processNextWorkItem() bool {
|
||||
pvcKey, quit := c.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer c.queue.Done(pvcKey)
|
||||
|
||||
pvcNamespace, pvcName, err := cache.SplitMetaNamespaceKey(pvcKey.(string))
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Error parsing PVC key %q: %v", pvcKey, err))
|
||||
return true
|
||||
}
|
||||
|
||||
err = c.processPVC(pvcNamespace, pvcName)
|
||||
if err == nil {
|
||||
c.queue.Forget(pvcKey)
|
||||
return true
|
||||
}
|
||||
|
||||
utilruntime.HandleError(fmt.Errorf("PVC %v failed with : %v", pvcKey, err))
|
||||
c.queue.AddRateLimited(pvcKey)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *Controller) processPVC(pvcNamespace, pvcName string) error {
|
||||
glog.V(4).Infof("Processing PVC %s/%s", pvcNamespace, pvcName)
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
glog.V(4).Infof("Finished processing PVC %s/%s (%v)", pvcNamespace, pvcName, time.Now().Sub(startTime))
|
||||
}()
|
||||
|
||||
pvc, err := c.pvcLister.PersistentVolumeClaims(pvcNamespace).Get(pvcName)
|
||||
if apierrs.IsNotFound(err) {
|
||||
glog.V(4).Infof("PVC %s/%s not found, ignoring", pvcNamespace, pvcName)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if volumeutil.IsPVCBeingDeleted(pvc) && volumeutil.IsProtectionFinalizerPresent(pvc) {
|
||||
// PVC should be deleted. Check if it's used and remove finalizer if
|
||||
// it's not.
|
||||
isUsed, err := c.isBeingUsed(pvc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !isUsed {
|
||||
return c.removeFinalizer(pvc)
|
||||
}
|
||||
}
|
||||
|
||||
if !volumeutil.IsPVCBeingDeleted(pvc) && !volumeutil.IsProtectionFinalizerPresent(pvc) {
|
||||
// PVC is not being deleted -> it should have the finalizer. The
|
||||
// finalizer should be added by admission plugin, this is just to add
|
||||
// the finalizer to old PVCs that were created before the admission
|
||||
// plugin was enabled.
|
||||
return c.addFinalizer(pvc)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) addFinalizer(pvc *v1.PersistentVolumeClaim) error {
|
||||
claimClone := pvc.DeepCopy()
|
||||
volumeutil.AddProtectionFinalizer(claimClone)
|
||||
_, err := c.client.CoreV1().PersistentVolumeClaims(claimClone.Namespace).Update(claimClone)
|
||||
if err != nil {
|
||||
glog.V(3).Infof("Error adding protection finalizer to PVC %s/%s: %v", pvc.Namespace, pvc.Name)
|
||||
return err
|
||||
}
|
||||
glog.V(3).Infof("Added protection finalizer to PVC %s/%s", pvc.Namespace, pvc.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) removeFinalizer(pvc *v1.PersistentVolumeClaim) error {
|
||||
claimClone := pvc.DeepCopy()
|
||||
volumeutil.RemoveProtectionFinalizer(claimClone)
|
||||
_, err := c.client.CoreV1().PersistentVolumeClaims(claimClone.Namespace).Update(claimClone)
|
||||
if err != nil {
|
||||
glog.V(3).Infof("Error removing protection finalizer from PVC %s/%s: %v", pvc.Namespace, pvc.Name, err)
|
||||
return err
|
||||
}
|
||||
glog.V(3).Infof("Removed protection finalizer from PVC %s/%s", pvc.Namespace, pvc.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) isBeingUsed(pvc *v1.PersistentVolumeClaim) (bool, error) {
|
||||
pods, err := c.podLister.Pods(pvc.Namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
for _, pod := range pods {
|
||||
if pod.Spec.NodeName == "" {
|
||||
// This pod is not scheduled. We have a predicated in scheduler that
|
||||
// prevents scheduling pods with deletion timestamp, so we can be
|
||||
// pretty sure it won't be scheduled in parallel to this check.
|
||||
// Therefore this pod does not block the PVC from deletion.
|
||||
glog.V(4).Infof("Skipping unscheduled pod %s when checking PVC %s/%s", pod.Name, pvc.Namespace, pvc.Name)
|
||||
continue
|
||||
}
|
||||
if volumehelper.IsPodTerminated(pod, pod.Status) {
|
||||
// This pod is being unmounted/detached or is already
|
||||
// unmounted/detached. It does not block the PVC from deletion.
|
||||
continue
|
||||
}
|
||||
for _, volume := range pod.Spec.Volumes {
|
||||
if volume.PersistentVolumeClaim == nil {
|
||||
continue
|
||||
}
|
||||
if volume.PersistentVolumeClaim.ClaimName == pvc.Name {
|
||||
glog.V(2).Infof("Keeping PVC %s/%s, it is used by pod %s/%s", pvc.Namespace, pvc.Name, pod.Namespace, pod.Name)
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(3).Infof("PVC %s/%s is unused", pvc.Namespace, pvc.Name)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// pvcAddedUpdated reacts to pvc added/updated/deleted events
|
||||
func (c *Controller) pvcAddedUpdated(obj interface{}) {
|
||||
pvc, ok := obj.(*v1.PersistentVolumeClaim)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("PVC informer returned non-PVC object: %#v", obj))
|
||||
return
|
||||
}
|
||||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(pvc)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get key for Persistent Volume Claim %#v: %v", pvc, err))
|
||||
return
|
||||
}
|
||||
glog.V(4).Infof("Got event on PVC %s", key)
|
||||
|
||||
if (!volumeutil.IsPVCBeingDeleted(pvc) && !volumeutil.IsProtectionFinalizerPresent(pvc)) || (volumeutil.IsPVCBeingDeleted(pvc) && volumeutil.IsProtectionFinalizerPresent(pvc)) {
|
||||
c.queue.Add(key)
|
||||
}
|
||||
}
|
||||
|
||||
// podAddedDeletedUpdated reacts to Pod events
|
||||
func (c *Controller) podAddedDeletedUpdated(obj interface{}, deleted bool) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
|
||||
return
|
||||
}
|
||||
pod, ok = tombstone.Obj.(*v1.Pod)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod %#v", obj))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Filter out pods that can't help us to remove a finalizer on PVC
|
||||
if !deleted && !volumehelper.IsPodTerminated(pod, pod.Status) && pod.Spec.NodeName != "" {
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Got event on pod %s/%s", pod.Namespace, pod.Name)
|
||||
|
||||
// Enqueue all PVCs that the pod uses
|
||||
for _, volume := range pod.Spec.Volumes {
|
||||
if volume.PersistentVolumeClaim != nil {
|
||||
c.queue.Add(pod.Namespace + "/" + volume.PersistentVolumeClaim.ClaimName)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,397 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package pvcprotection
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
volumeutil "k8s.io/kubernetes/pkg/volume/util"
|
||||
)
|
||||
|
||||
type reaction struct {
|
||||
verb string
|
||||
resource string
|
||||
reactorfn clienttesting.ReactionFunc
|
||||
}
|
||||
|
||||
const (
|
||||
defaultNS = "default"
|
||||
defaultPVCName = "pvc1"
|
||||
defaultPodName = "pod1"
|
||||
defaultNodeName = "node1"
|
||||
)
|
||||
|
||||
func pod() *v1.Pod {
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: defaultPodName,
|
||||
Namespace: defaultNS,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
NodeName: defaultNodeName,
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodPending,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func unscheduled(pod *v1.Pod) *v1.Pod {
|
||||
pod.Spec.NodeName = ""
|
||||
return pod
|
||||
}
|
||||
|
||||
func withPVC(pvcName string, pod *v1.Pod) *v1.Pod {
|
||||
volume := v1.Volume{
|
||||
Name: pvcName,
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: pvcName,
|
||||
},
|
||||
},
|
||||
}
|
||||
pod.Spec.Volumes = append(pod.Spec.Volumes, volume)
|
||||
return pod
|
||||
}
|
||||
|
||||
func withEmptyDir(pod *v1.Pod) *v1.Pod {
|
||||
volume := v1.Volume{
|
||||
Name: "emptyDir",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
EmptyDir: &v1.EmptyDirVolumeSource{},
|
||||
},
|
||||
}
|
||||
pod.Spec.Volumes = append(pod.Spec.Volumes, volume)
|
||||
return pod
|
||||
}
|
||||
|
||||
func withStatus(phase v1.PodPhase, pod *v1.Pod) *v1.Pod {
|
||||
pod.Status.Phase = phase
|
||||
return pod
|
||||
}
|
||||
|
||||
func pvc() *v1.PersistentVolumeClaim {
|
||||
return &v1.PersistentVolumeClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: defaultPVCName,
|
||||
Namespace: defaultNS,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func withProtectionFinalizer(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim {
|
||||
pvc.Finalizers = append(pvc.Finalizers, volumeutil.PVCProtectionFinalizer)
|
||||
return pvc
|
||||
}
|
||||
|
||||
func deleted(pvc *v1.PersistentVolumeClaim) *v1.PersistentVolumeClaim {
|
||||
pvc.DeletionTimestamp = &metav1.Time{}
|
||||
return pvc
|
||||
}
|
||||
|
||||
func generateUpdateErrorFunc(t *testing.T, failures int) clienttesting.ReactionFunc {
|
||||
i := 0
|
||||
return func(action clienttesting.Action) (bool, runtime.Object, error) {
|
||||
i++
|
||||
if i <= failures {
|
||||
// Update fails
|
||||
update, ok := action.(clienttesting.UpdateAction)
|
||||
|
||||
if !ok {
|
||||
t.Fatalf("Reactor got non-update action: %+v", action)
|
||||
}
|
||||
acc, _ := meta.Accessor(update.GetObject())
|
||||
return true, nil, apierrors.NewForbidden(update.GetResource().GroupResource(), acc.GetName(), errors.New("Mock error"))
|
||||
}
|
||||
// Update succeeds
|
||||
return false, nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
func TestPVCProtectionController(t *testing.T) {
|
||||
pvcVer := schema.GroupVersionResource{
|
||||
Group: v1.GroupName,
|
||||
Version: "v1",
|
||||
Resource: "persistentvolumeclaims",
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
// Object to insert into fake kubeclient before the test starts.
|
||||
initialObjects []runtime.Object
|
||||
// Optional client reactors.
|
||||
reactors []reaction
|
||||
// PVC event to simulate. This PVC will be automatically added to
|
||||
// initalObjects.
|
||||
updatedPVC *v1.PersistentVolumeClaim
|
||||
// Pod event to simulate. This Pod will be automatically added to
|
||||
// initalObjects.
|
||||
updatedPod *v1.Pod
|
||||
// Pod event to similate. This Pod is *not* added to
|
||||
// initalObjects.
|
||||
deletedPod *v1.Pod
|
||||
// List of expected kubeclient actions that should happen during the
|
||||
// test.
|
||||
expectedActions []clienttesting.Action
|
||||
}{
|
||||
//
|
||||
// PVC events
|
||||
//
|
||||
{
|
||||
name: "PVC without finalizer -> finalizer is added",
|
||||
updatedPVC: pvc(),
|
||||
expectedActions: []clienttesting.Action{
|
||||
clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "PVC with finalizer -> no action",
|
||||
updatedPVC: withProtectionFinalizer(pvc()),
|
||||
expectedActions: []clienttesting.Action{},
|
||||
},
|
||||
{
|
||||
name: "saving PVC finalizer fails -> controller retries",
|
||||
updatedPVC: pvc(),
|
||||
reactors: []reaction{
|
||||
{
|
||||
verb: "update",
|
||||
resource: "persistentvolumeclaims",
|
||||
reactorfn: generateUpdateErrorFunc(t, 2 /* update fails twice*/),
|
||||
},
|
||||
},
|
||||
expectedActions: []clienttesting.Action{
|
||||
// This fails
|
||||
clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())),
|
||||
// This fails too
|
||||
clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())),
|
||||
// This succeeds
|
||||
clienttesting.NewUpdateAction(pvcVer, defaultNS, withProtectionFinalizer(pvc())),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "deleted PVC with finalizer -> finalizer is removed",
|
||||
updatedPVC: deleted(withProtectionFinalizer(pvc())),
|
||||
expectedActions: []clienttesting.Action{
|
||||
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "finalizer removal fails -> controller retries",
|
||||
updatedPVC: deleted(withProtectionFinalizer(pvc())),
|
||||
reactors: []reaction{
|
||||
{
|
||||
verb: "update",
|
||||
resource: "persistentvolumeclaims",
|
||||
reactorfn: generateUpdateErrorFunc(t, 2 /* update fails twice*/),
|
||||
},
|
||||
},
|
||||
expectedActions: []clienttesting.Action{
|
||||
// Fails
|
||||
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
|
||||
// Fails too
|
||||
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
|
||||
// Succeeds
|
||||
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "deleted PVC with finalizer + pods with the PVC exists -> finalizer is not removed",
|
||||
initialObjects: []runtime.Object{
|
||||
withPVC(defaultPVCName, pod()),
|
||||
},
|
||||
updatedPVC: deleted(withProtectionFinalizer(pvc())),
|
||||
expectedActions: []clienttesting.Action{},
|
||||
},
|
||||
{
|
||||
name: "deleted PVC with finalizer + pods with unrelated PVC and EmptyDir exists -> finalizer is removed",
|
||||
initialObjects: []runtime.Object{
|
||||
withEmptyDir(withPVC("unrelatedPVC", pod())),
|
||||
},
|
||||
updatedPVC: deleted(withProtectionFinalizer(pvc())),
|
||||
expectedActions: []clienttesting.Action{
|
||||
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "deleted PVC with finalizer + pods with the PVC andis finished -> finalizer is removed",
|
||||
initialObjects: []runtime.Object{
|
||||
withStatus(v1.PodFailed, withPVC(defaultPVCName, pod())),
|
||||
},
|
||||
updatedPVC: deleted(withProtectionFinalizer(pvc())),
|
||||
expectedActions: []clienttesting.Action{
|
||||
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
|
||||
},
|
||||
},
|
||||
//
|
||||
// Pod events
|
||||
//
|
||||
{
|
||||
name: "updated running Pod -> no action",
|
||||
initialObjects: []runtime.Object{
|
||||
deleted(withProtectionFinalizer(pvc())),
|
||||
},
|
||||
updatedPod: withStatus(v1.PodRunning, withPVC(defaultPVCName, pod())),
|
||||
expectedActions: []clienttesting.Action{},
|
||||
},
|
||||
{
|
||||
name: "updated finished Pod -> finalizer is removed",
|
||||
initialObjects: []runtime.Object{
|
||||
deleted(withProtectionFinalizer(pvc())),
|
||||
},
|
||||
updatedPod: withStatus(v1.PodSucceeded, withPVC(defaultPVCName, pod())),
|
||||
expectedActions: []clienttesting.Action{
|
||||
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "updated unscheduled Pod -> finalizer is removed",
|
||||
initialObjects: []runtime.Object{
|
||||
deleted(withProtectionFinalizer(pvc())),
|
||||
},
|
||||
updatedPod: unscheduled(withPVC(defaultPVCName, pod())),
|
||||
expectedActions: []clienttesting.Action{
|
||||
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "deleted running Pod -> finalizer is removed",
|
||||
initialObjects: []runtime.Object{
|
||||
deleted(withProtectionFinalizer(pvc())),
|
||||
},
|
||||
deletedPod: withStatus(v1.PodRunning, withPVC(defaultPVCName, pod())),
|
||||
expectedActions: []clienttesting.Action{
|
||||
clienttesting.NewUpdateAction(pvcVer, defaultNS, deleted(pvc())),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
// Create client with initial data
|
||||
objs := test.initialObjects
|
||||
if test.updatedPVC != nil {
|
||||
objs = append(objs, test.updatedPVC)
|
||||
}
|
||||
if test.updatedPod != nil {
|
||||
objs = append(objs, test.updatedPod)
|
||||
}
|
||||
client := fake.NewSimpleClientset(objs...)
|
||||
|
||||
// Create informers
|
||||
informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
||||
pvcInformer := informers.Core().V1().PersistentVolumeClaims()
|
||||
podInformer := informers.Core().V1().Pods()
|
||||
|
||||
// Populate the informers with initial objects so the controller can
|
||||
// Get() and List() it.
|
||||
for _, obj := range objs {
|
||||
switch obj.(type) {
|
||||
case *v1.PersistentVolumeClaim:
|
||||
pvcInformer.Informer().GetStore().Add(obj)
|
||||
case *v1.Pod:
|
||||
podInformer.Informer().GetStore().Add(obj)
|
||||
default:
|
||||
t.Fatalf("Unknown initalObject type: %+v", obj)
|
||||
}
|
||||
}
|
||||
|
||||
// Add reactor to inject test errors.
|
||||
for _, reactor := range test.reactors {
|
||||
client.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorfn)
|
||||
}
|
||||
|
||||
// Create the controller
|
||||
ctrl := NewPVCProtectionController(pvcInformer, podInformer, client)
|
||||
|
||||
// Start the test by simulating an event
|
||||
if test.updatedPVC != nil {
|
||||
ctrl.pvcAddedUpdated(test.updatedPVC)
|
||||
}
|
||||
if test.updatedPod != nil {
|
||||
ctrl.podAddedDeletedUpdated(test.updatedPod, false)
|
||||
}
|
||||
if test.deletedPod != nil {
|
||||
ctrl.podAddedDeletedUpdated(test.deletedPod, true)
|
||||
}
|
||||
|
||||
// Process the controller queue until we get expected results
|
||||
timeout := time.Now().Add(10 * time.Second)
|
||||
lastReportedActionCount := 0
|
||||
for {
|
||||
if time.Now().After(timeout) {
|
||||
t.Errorf("Test %q: timed out", test.name)
|
||||
break
|
||||
}
|
||||
if ctrl.queue.Len() > 0 {
|
||||
glog.V(5).Infof("Test %q: %d events queue, processing one", test.name, ctrl.queue.Len())
|
||||
ctrl.processNextWorkItem()
|
||||
}
|
||||
if ctrl.queue.Len() > 0 {
|
||||
// There is still some work in the queue, process it now
|
||||
continue
|
||||
}
|
||||
currentActionCount := len(client.Actions())
|
||||
if currentActionCount < len(test.expectedActions) {
|
||||
// Do not log evey wait, only when the action count changes.
|
||||
if lastReportedActionCount < currentActionCount {
|
||||
glog.V(5).Infof("Test %q: got %d actions out of %d, waiting for the rest", test.name, currentActionCount, len(test.expectedActions))
|
||||
lastReportedActionCount = currentActionCount
|
||||
}
|
||||
// The test expected more to happen, wait for the actions.
|
||||
// Most probably it's exponential backoff
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
actions := client.Actions()
|
||||
for i, action := range actions {
|
||||
if len(test.expectedActions) < i+1 {
|
||||
t.Errorf("Test %q: %d unexpected actions: %+v", test.name, len(actions)-len(test.expectedActions), spew.Sdump(actions[i:]))
|
||||
break
|
||||
}
|
||||
|
||||
expectedAction := test.expectedActions[i]
|
||||
if !reflect.DeepEqual(expectedAction, action) {
|
||||
t.Errorf("Test %q: action %d\nExpected:\n%s\ngot:\n%s", test.name, i, spew.Sdump(expectedAction), spew.Sdump(action))
|
||||
}
|
||||
}
|
||||
|
||||
if len(test.expectedActions) > len(actions) {
|
||||
t.Errorf("Test %q: %d additional expected actions", test.name, len(test.expectedActions)-len(actions))
|
||||
for _, a := range test.expectedActions[len(actions):] {
|
||||
t.Logf(" %+v", a)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user