Merge pull request #101601 from AliceZhang2016/indexed-job-stable-hostname

specify pod name and hostname in indexed job
This commit is contained in:
Kubernetes Prow Robot
2021-05-20 01:41:05 -07:00
committed by GitHub
9 changed files with 151 additions and 24 deletions

View File

@@ -449,6 +449,8 @@ type PodControlInterface interface {
CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error
// CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller.
CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
// CreatePodsWithControllerRefAndGenerateName creates new pods according to the spec, sets object as the pod's controller and sets pod's generateName.
CreatePodsWithControllerRefAndGenerateName(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error
// DeletePod deletes the pod identified by podID.
DeletePod(namespace string, podID string, object runtime.Object) error
// PatchPod patches the pod.
@@ -514,14 +516,29 @@ func validateControllerRef(controllerRef *metav1.OwnerReference) error {
}
func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error {
return r.createPods(namespace, template, object, nil)
pod, err := GetPodFromTemplate(template, object, nil)
if err != nil {
return err
}
return r.createPods(namespace, pod, object)
}
func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
return r.CreatePodsWithControllerRefAndGenerateName(namespace, template, controllerObject, controllerRef, "")
}
func (r RealPodControl) CreatePodsWithControllerRefAndGenerateName(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference, generateName string) error {
if err := validateControllerRef(controllerRef); err != nil {
return err
}
return r.createPods(namespace, template, controllerObject, controllerRef)
pod, err := GetPodFromTemplate(template, controllerObject, controllerRef)
if err != nil {
return err
}
if len(generateName) > 0 {
pod.ObjectMeta.GenerateName = generateName
}
return r.createPods(namespace, pod, controllerObject)
}
func (r RealPodControl) PatchPod(namespace, name string, data []byte) error {
@@ -554,11 +571,7 @@ func GetPodFromTemplate(template *v1.PodTemplateSpec, parentObject runtime.Objec
return pod, nil
}
func (r RealPodControl) createPods(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
pod, err := GetPodFromTemplate(template, object, controllerRef)
if err != nil {
return err
}
func (r RealPodControl) createPods(namespace string, pod *v1.Pod, object runtime.Object) error {
if len(labels.Set(pod.Labels)) == 0 {
return fmt.Errorf("unable to create pods, no labels")
}
@@ -652,6 +665,21 @@ func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1.
return nil
}
func (f *FakePodControl) CreatePodsWithControllerRefAndGenerateName(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference, generateNamePrefix string) error {
f.Lock()
defer f.Unlock()
f.CreateCallCount++
if f.CreateLimit != 0 && f.CreateCallCount > f.CreateLimit {
return fmt.Errorf("not creating pod, limit %d already reached (create call %d)", f.CreateLimit, f.CreateCallCount)
}
f.Templates = append(f.Templates, *spec)
f.ControllerRefs = append(f.ControllerRefs, *controllerRef)
if f.Err != nil {
return f.Err
}
return nil
}
func (f *FakePodControl) DeletePod(namespace string, podID string, object runtime.Object) error {
f.Lock()
defer f.Unlock()

View File

@@ -29,6 +29,7 @@ import (
"time"
apps "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -320,6 +321,47 @@ func TestCreatePods(t *testing.T) {
"Body: %s", fakeHandler.RequestBody)
}
func TestCreatePodsWithControllerRefAndGenerateName(t *testing.T) {
ns := metav1.NamespaceDefault
body := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "empty_pod"}})
fakeHandler := utiltesting.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
podControl := RealPodControl{
KubeClient: clientset,
Recorder: &record.FakeRecorder{},
}
controllerSpec := newReplicationController(1)
controllerRef := metav1.NewControllerRef(controllerSpec, batchv1.SchemeGroupVersion.WithKind("Job"))
// Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template
generateName := "hello-"
err := podControl.CreatePodsWithControllerRefAndGenerateName(ns, controllerSpec.Spec.Template, controllerSpec, controllerRef, generateName)
assert.NoError(t, err, "unexpected error: %v", err)
expectedPod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: controllerSpec.Spec.Template.Labels,
GenerateName: generateName,
OwnerReferences: []metav1.OwnerReference{*controllerRef},
},
Spec: controllerSpec.Spec.Template.Spec,
}
fakeHandler.ValidateRequest(t, "/api/v1/namespaces/default/pods", "POST", nil)
var actualPod = &v1.Pod{}
err = json.Unmarshal([]byte(fakeHandler.RequestBody), actualPod)
assert.NoError(t, err, "unexpected error: %v", err)
assert.True(t, apiequality.Semantic.DeepDerivative(&expectedPod, actualPod),
"Body: %s", fakeHandler.RequestBody)
}
func TestDeletePodsAllowsMissing(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
podControl := RealPodControl{

View File

@@ -26,6 +26,7 @@ import (
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/kubernetes/pkg/controller"
)
@@ -216,6 +217,15 @@ func addCompletionIndexAnnotation(template *v1.PodTemplateSpec, index int) {
template.Annotations[batch.JobCompletionIndexAnnotation] = strconv.Itoa(index)
}
func podGenerateNameWithIndex(jobName string, index int) string {
appendIndex := "-" + strconv.Itoa(index) + "-"
generateNamePrefix := jobName + appendIndex
if len(generateNamePrefix) > names.MaxGeneratedNameLength {
generateNamePrefix = generateNamePrefix[:names.MaxGeneratedNameLength-len(appendIndex)] + appendIndex
}
return generateNamePrefix
}
type byCompletionIndex []*v1.Pod
func (bci byCompletionIndex) Less(i, j int) bool {

View File

@@ -269,6 +269,38 @@ func TestAppendDuplicatedIndexPodsForRemoval(t *testing.T) {
}
}
func TestPodGenerateNameWithIndex(t *testing.T) {
cases := map[string]struct {
jobname string
index int
wantPodGenerateName string
}{
"short job name": {
jobname: "indexed-job",
index: 1,
wantPodGenerateName: "indexed-job-1-",
},
"job name exceeds MaxGeneneratedNameLength": {
jobname: "hhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooo",
index: 1,
wantPodGenerateName: "hhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhh-1-",
},
"job name with index suffix exceeds MaxGeneratedNameLength": {
jobname: "hhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhoo",
index: 1,
wantPodGenerateName: "hhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhh-1-",
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
podGenerateName := podGenerateNameWithIndex(tc.jobname, tc.index)
if diff := cmp.Equal(tc.wantPodGenerateName, podGenerateName); !diff {
t.Errorf("Got pod generateName %s, want %s", podGenerateName, tc.wantPodGenerateName)
}
})
}
}
func hollowPodsWithIndexPhase(descs []indexPhase) []*v1.Pod {
pods := make([]*v1.Pod, 0, len(descs))
for _, desc := range descs {

View File

@@ -871,9 +871,11 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
if completionIndex != unknownCompletionIndex {
template = podTemplate.DeepCopy()
addCompletionIndexAnnotation(template, completionIndex)
template.Spec.Hostname = fmt.Sprintf("%s-%d", job.Name, completionIndex)
}
defer wait.Done()
err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind))
generateName := podGenerateNameWithIndex(job.Name, completionIndex)
err := jm.podControl.CreatePodsWithControllerRefAndGenerateName(job.Namespace, template, job, metav1.NewControllerRef(job, controllerKind), generateName)
if err != nil {
if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// If the namespace is being torn down, we can safely ignore

View File

@@ -151,6 +151,7 @@ func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status
p.Annotations = map[string]string{
batch.JobCompletionIndexAnnotation: s.Index,
}
p.Spec.Hostname = fmt.Sprintf("%s-%s", job.Name, s.Index)
}
podIndexer.Add(p)
}
@@ -735,7 +736,7 @@ func TestControllerSyncJob(t *testing.T) {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.Templates))
}
if tc.completionMode == batch.IndexedCompletion {
checkCompletionIndexesInPods(t, &fakePodControl, tc.expectedCreatedIndexes)
checkIndexedJobPods(t, &fakePodControl, tc.expectedCreatedIndexes, job.Name)
}
if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions {
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", tc.expectedDeletions, len(fakePodControl.DeletePodName))
@@ -806,7 +807,7 @@ func TestControllerSyncJob(t *testing.T) {
}
}
func checkCompletionIndexesInPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Int) {
func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Int, jobName string) {
t.Helper()
gotIndexes := sets.NewInt()
for _, p := range control.Templates {
@@ -817,6 +818,10 @@ func checkCompletionIndexesInPods(t *testing.T, control *controller.FakePodContr
} else {
gotIndexes.Insert(ix)
}
expectedName := fmt.Sprintf("%s-%d", jobName, ix)
if diff := cmp.Equal(expectedName, p.Spec.Hostname); !diff {
t.Errorf("Got pod hostname %s, want %s", p.Spec.Hostname, expectedName)
}
}
if diff := cmp.Diff(wantIndexes.List(), gotIndexes.List()); diff != "" {
t.Errorf("Unexpected created completion indexes (-want,+got):\n%s", diff)