StatefulSet: Use ControllerRefManager to adopt/orphan.
This commit is contained in:
parent
b7163bdb75
commit
e4f67c8170
@ -32,6 +32,8 @@ go_library(
|
|||||||
"//vendor:github.com/golang/glog",
|
"//vendor:github.com/golang/glog",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
"//vendor:k8s.io/apimachinery/pkg/api/errors",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
|
||||||
|
"//vendor:k8s.io/apimachinery/pkg/labels",
|
||||||
|
"//vendor:k8s.io/apimachinery/pkg/runtime/schema",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/errors",
|
"//vendor:k8s.io/apimachinery/pkg/util/errors",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
|
||||||
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
"//vendor:k8s.io/apimachinery/pkg/util/wait",
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
@ -56,6 +57,8 @@ type StatefulSetController struct {
|
|||||||
// control returns an interface capable of syncing a stateful set.
|
// control returns an interface capable of syncing a stateful set.
|
||||||
// Abstracted out for testing.
|
// Abstracted out for testing.
|
||||||
control StatefulSetControlInterface
|
control StatefulSetControlInterface
|
||||||
|
// podControl is used for patching pods.
|
||||||
|
podControl controller.PodControlInterface
|
||||||
// podLister is able to list/get pods from a shared informer's store
|
// podLister is able to list/get pods from a shared informer's store
|
||||||
podLister corelisters.PodLister
|
podLister corelisters.PodLister
|
||||||
// podListerSynced returns true if the pod shared informer has synced at least once
|
// podListerSynced returns true if the pod shared informer has synced at least once
|
||||||
@ -95,6 +98,7 @@ func NewStatefulSetController(
|
|||||||
),
|
),
|
||||||
pvcListerSynced: pvcInformer.Informer().HasSynced,
|
pvcListerSynced: pvcInformer.Informer().HasSynced,
|
||||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
|
||||||
|
podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},
|
||||||
}
|
}
|
||||||
|
|
||||||
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
@ -210,13 +214,26 @@ func (ssc *StatefulSetController) deletePod(obj interface{}) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPodsForStatefulSets returns the pods that match the selectors of the given statefulset.
|
// getPodsForStatefulSet returns the Pods that a given StatefulSet should manage.
|
||||||
func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet) ([]*v1.Pod, error) {
|
// It also reconciles ControllerRef by adopting/orphaning.
|
||||||
sel, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
|
//
|
||||||
|
// NOTE: Returned Pods are pointers to objects from the cache.
|
||||||
|
// If you need to modify one, you need to copy it first.
|
||||||
|
func (ssc *StatefulSetController) getPodsForStatefulSet(set *apps.StatefulSet, selector labels.Selector) ([]*v1.Pod, error) {
|
||||||
|
// List all pods to include the pods that don't match the selector anymore but
|
||||||
|
// has a ControllerRef pointing to this StatefulSet.
|
||||||
|
pods, err := ssc.podLister.Pods(set.Namespace).List(labels.Everything())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []*v1.Pod{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return ssc.podLister.Pods(set.Namespace).List(sel)
|
|
||||||
|
filter := func(pod *v1.Pod) bool {
|
||||||
|
// Only claim if it matches our StatefulSet name. Otherwise release/ignore.
|
||||||
|
return isMemberOf(set, pod)
|
||||||
|
}
|
||||||
|
|
||||||
|
cm := controller.NewPodControllerRefManager(ssc.podControl, set, selector, getSSKind())
|
||||||
|
return cm.ClaimPods(pods, filter)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getStatefulSetForPod returns the StatefulSet managing the given pod.
|
// getStatefulSetForPod returns the StatefulSet managing the given pod.
|
||||||
@ -298,11 +315,17 @@ func (ssc *StatefulSetController) sync(key string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("Unable to retrieve StatefulSet %v from store: %v", key, err))
|
utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
|
||||||
|
// This is a non-transient error, so don't retry.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
pods, err := ssc.getPodsForStatefulSet(set)
|
pods, err := ssc.getPodsForStatefulSet(set, selector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -74,6 +74,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, p
|
|||||||
// if the ordinal is greater than the number of replicas add it to the condemned list
|
// if the ordinal is greater than the number of replicas add it to the condemned list
|
||||||
condemned = append(condemned, pods[i])
|
condemned = append(condemned, pods[i])
|
||||||
}
|
}
|
||||||
|
// If the ordinal could not be parsed (ord < 0), ignore the Pod.
|
||||||
}
|
}
|
||||||
|
|
||||||
// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod
|
// for any empty indices in the sequence [0,set.Spec.Replicas) create a new Pod
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package statefulset
|
package statefulset
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -345,6 +346,88 @@ func TestStatefulSetControllerGetStatefulSetForPodOverlapping(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetPodsForStatefulSetAdopt(t *testing.T) {
|
||||||
|
ssc, spc := newFakeStatefulSetController()
|
||||||
|
set := newStatefulSet(5)
|
||||||
|
pod1 := newStatefulSetPod(set, 1)
|
||||||
|
// pod2 is an orphan with matching labels and name.
|
||||||
|
pod2 := newStatefulSetPod(set, 2)
|
||||||
|
pod2.OwnerReferences = nil
|
||||||
|
// pod3 has wrong labels.
|
||||||
|
pod3 := newStatefulSetPod(set, 3)
|
||||||
|
pod3.OwnerReferences = nil
|
||||||
|
pod3.Labels = nil
|
||||||
|
// pod4 has wrong name.
|
||||||
|
pod4 := newStatefulSetPod(set, 4)
|
||||||
|
pod4.OwnerReferences = nil
|
||||||
|
pod4.Name = "x" + pod4.Name
|
||||||
|
|
||||||
|
spc.podsIndexer.Add(pod1)
|
||||||
|
spc.podsIndexer.Add(pod2)
|
||||||
|
spc.podsIndexer.Add(pod3)
|
||||||
|
spc.podsIndexer.Add(pod4)
|
||||||
|
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
pods, err := ssc.getPodsForStatefulSet(set, selector)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("getPodsForStatefulSet() error: %v", err)
|
||||||
|
}
|
||||||
|
var got []string
|
||||||
|
for _, pod := range pods {
|
||||||
|
got = append(got, pod.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// pod2 should be claimed, pod3 and pod4 ignored
|
||||||
|
want := []string{pod1.Name, pod2.Name}
|
||||||
|
sort.Strings(got)
|
||||||
|
sort.Strings(want)
|
||||||
|
if !reflect.DeepEqual(got, want) {
|
||||||
|
t.Errorf("getPodsForStatefulSet() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetPodsForStatefulSetRelease(t *testing.T) {
|
||||||
|
ssc, spc := newFakeStatefulSetController()
|
||||||
|
set := newStatefulSet(3)
|
||||||
|
pod1 := newStatefulSetPod(set, 1)
|
||||||
|
// pod2 is owned but has wrong name.
|
||||||
|
pod2 := newStatefulSetPod(set, 2)
|
||||||
|
pod2.Name = "x" + pod2.Name
|
||||||
|
// pod3 is owned but has wrong labels.
|
||||||
|
pod3 := newStatefulSetPod(set, 3)
|
||||||
|
pod3.Labels = nil
|
||||||
|
// pod4 is an orphan that doesn't match.
|
||||||
|
pod4 := newStatefulSetPod(set, 4)
|
||||||
|
pod4.OwnerReferences = nil
|
||||||
|
pod4.Labels = nil
|
||||||
|
|
||||||
|
spc.podsIndexer.Add(pod1)
|
||||||
|
spc.podsIndexer.Add(pod2)
|
||||||
|
spc.podsIndexer.Add(pod3)
|
||||||
|
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
pods, err := ssc.getPodsForStatefulSet(set, selector)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("getPodsForStatefulSet() error: %v", err)
|
||||||
|
}
|
||||||
|
var got []string
|
||||||
|
for _, pod := range pods {
|
||||||
|
got = append(got, pod.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Expect only pod1 (pod2 and pod3 should be released, pod4 ignored).
|
||||||
|
want := []string{pod1.Name}
|
||||||
|
sort.Strings(got)
|
||||||
|
sort.Strings(want)
|
||||||
|
if !reflect.DeepEqual(got, want) {
|
||||||
|
t.Errorf("getPodsForStatefulSet() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newFakeStatefulSetController() (*StatefulSetController, *fakeStatefulPodControl) {
|
func newFakeStatefulSetController() (*StatefulSetController, *fakeStatefulPodControl) {
|
||||||
client := fake.NewSimpleClientset()
|
client := fake.NewSimpleClientset()
|
||||||
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
podapi "k8s.io/kubernetes/pkg/api/v1/pod"
|
podapi "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||||
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
|
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
|
||||||
@ -231,6 +232,10 @@ func isHealthy(pod *v1.Pod) bool {
|
|||||||
return isRunningAndReady(pod) && !isTerminated(pod)
|
return isRunningAndReady(pod) && !isTerminated(pod)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getSSKind() schema.GroupVersionKind {
|
||||||
|
return apps.SchemeGroupVersion.WithKind("StatefulSet")
|
||||||
|
}
|
||||||
|
|
||||||
// newControllerRef returns an ControllerRef pointing to a given StatefulSet.
|
// newControllerRef returns an ControllerRef pointing to a given StatefulSet.
|
||||||
func newControllerRef(set *apps.StatefulSet) *metav1.OwnerReference {
|
func newControllerRef(set *apps.StatefulSet) *metav1.OwnerReference {
|
||||||
isController := true
|
isController := true
|
||||||
|
Loading…
Reference in New Issue
Block a user