New Job resource

This commit is contained in:
Maciej Szulik
2015-08-18 16:39:49 +02:00
parent 90ba96d486
commit e55c59e8c2
15 changed files with 1338 additions and 0 deletions

19
pkg/registry/job/doc.go Normal file
View File

@@ -0,0 +1,19 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package controller provides Registry interface and it's RESTStorage
// implementation for storing Job api objects.
package job

View File

@@ -0,0 +1,77 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/expapi"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/generic"
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
"k8s.io/kubernetes/pkg/registry/job"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// rest implements a RESTStorage for jobs against etcd
type REST struct {
*etcdgeneric.Etcd
}
// jobPrefix is the location for jobs in etcd, only exposed
// for testing
var jobPrefix = "/jobs"
// NewREST returns a RESTStorage object that will work against Jobs.
func NewREST(s storage.Interface) *REST {
store := &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &expapi.Job{} },
// NewListFunc returns an object capable of storing results of an etcd list.
NewListFunc: func() runtime.Object { return &expapi.JobList{} },
// Produces a path that etcd understands, to the root of the resource
// by combining the namespace in the context with the given prefix
KeyRootFunc: func(ctx api.Context) string {
return etcdgeneric.NamespaceKeyRootFunc(ctx, jobPrefix)
},
// Produces a path that etcd understands, to the resource by combining
// the namespace in the context with the given prefix
KeyFunc: func(ctx api.Context, name string) (string, error) {
return etcdgeneric.NamespaceKeyFunc(ctx, jobPrefix, name)
},
// Retrieve the name field of a job
ObjectNameFunc: func(obj runtime.Object) (string, error) {
return obj.(*expapi.Job).Name, nil
},
// Used to match objects based on labels/fields for list and watch
PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher {
return job.MatchJob(label, field)
},
EndpointName: "jobs",
// Used to validate job creation
CreateStrategy: job.Strategy,
// Used to validate job updates
UpdateStrategy: job.Strategy,
Storage: s,
}
return &REST{store}
}

View File

@@ -0,0 +1,148 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/expapi"
// Ensure that expapi/v1 package is initialized.
_ "k8s.io/kubernetes/pkg/expapi/v1"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/tools"
)
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "experimental")
return NewREST(etcdStorage), fakeClient
}
func validNewJob() *expapi.Job {
completions := 1
parallelism := 1
return &expapi.Job{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: "default",
},
Spec: expapi.JobSpec{
Completions: &completions,
Parallelism: &parallelism,
Selector: map[string]string{"a": "b"},
Template: &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{"a": "b"},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "test",
Image: "test_image",
ImagePullPolicy: api.PullIfNotPresent,
},
},
RestartPolicy: api.RestartPolicyOnFailure,
DNSPolicy: api.DNSClusterFirst,
},
},
},
}
}
func TestCreate(t *testing.T) {
storage, fakeClient := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd)
validJob := validNewJob()
validJob.ObjectMeta = api.ObjectMeta{}
test.TestCreate(
// valid
validJob,
// invalid (empty selector)
&expapi.Job{
Spec: expapi.JobSpec{
Completions: validJob.Spec.Completions,
Selector: map[string]string{},
Template: validJob.Spec.Template,
},
},
)
}
func TestUpdate(t *testing.T) {
storage, fakeClient := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd)
completions := 2
test.TestUpdate(
// valid
validNewJob(),
// updateFunc
func(obj runtime.Object) runtime.Object {
object := obj.(*expapi.Job)
object.Spec.Completions = &completions
return object
},
// invalid updateFunc
func(obj runtime.Object) runtime.Object {
object := obj.(*expapi.Job)
object.Spec.Selector = map[string]string{}
return object
},
)
}
func TestDelete(t *testing.T) {
storage, fakeClient := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd)
test.TestDelete(validNewJob())
}
func TestGet(t *testing.T) {
storage, fakeClient := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd)
test.TestGet(validNewJob())
}
func TestList(t *testing.T) {
storage, fakeClient := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd)
test.TestList(validNewJob())
}
func TestWatch(t *testing.T) {
storage, fakeClient := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd)
test.TestWatch(
validNewJob(),
// matching labels
[]labels.Set{},
// not matching labels
[]labels.Set{
{"x": "y"},
},
// matching fields
[]fields.Set{},
// not matching fields
[]fields.Set{
{"metadata.name": "xyz"},
{"name": "foo"},
},
)
}

View File

@@ -0,0 +1,99 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/expapi"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/watch"
)
// Registry is an interface for things that know how to store Jobs.
type Registry interface {
// ListJobs obtains a list of Jobs having labels and fields which match selector.
ListJobs(ctx api.Context, label labels.Selector, field fields.Selector) (*expapi.JobList, error)
// WatchJobs watch for new/changed/deleted Jobs.
WatchJobs(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error)
// GetJobs gets a specific Job.
GetJob(ctx api.Context, name string) (*expapi.Job, error)
// CreateJob creates a Job based on a specification.
CreateJob(ctx api.Context, job *expapi.Job) (*expapi.Job, error)
// UpdateJob updates an existing Job.
UpdateJob(ctx api.Context, job *expapi.Job) (*expapi.Job, error)
// DeleteJob deletes an existing Job.
DeleteJob(ctx api.Context, name string) error
}
// storage puts strong typing around storage calls
type storage struct {
rest.StandardStorage
}
// NewRegistry returns a new Registry interface for the given Storage. Any mismatched
// types will panic.
func NewRegistry(s rest.StandardStorage) Registry {
return &storage{s}
}
func (s *storage) ListJobs(ctx api.Context, label labels.Selector, field fields.Selector) (*expapi.JobList, error) {
if !field.Empty() {
return nil, fmt.Errorf("field selector not supported yet")
}
obj, err := s.List(ctx, label, field)
if err != nil {
return nil, err
}
return obj.(*expapi.JobList), err
}
func (s *storage) WatchJobs(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
return s.Watch(ctx, label, field, resourceVersion)
}
func (s *storage) GetJob(ctx api.Context, name string) (*expapi.Job, error) {
obj, err := s.Get(ctx, name)
if err != nil {
return nil, err
}
return obj.(*expapi.Job), nil
}
func (s *storage) CreateJob(ctx api.Context, job *expapi.Job) (*expapi.Job, error) {
obj, err := s.Create(ctx, job)
if err != nil {
return nil, err
}
return obj.(*expapi.Job), nil
}
func (s *storage) UpdateJob(ctx api.Context, job *expapi.Job) (*expapi.Job, error) {
obj, _, err := s.Update(ctx, job)
if err != nil {
return nil, err
}
return obj.(*expapi.Job), nil
}
func (s *storage) DeleteJob(ctx api.Context, name string) error {
_, err := s.Delete(ctx, name, nil)
return err
}

View File

@@ -0,0 +1,105 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"fmt"
"strconv"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/expapi"
"k8s.io/kubernetes/pkg/expapi/validation"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/fielderrors"
)
// jobStrategy implements verification logic for Replication Controllers.
type jobStrategy struct {
runtime.ObjectTyper
api.NameGenerator
}
// Strategy is the default logic that applies when creating and updating Replication Controller objects.
var Strategy = jobStrategy{api.Scheme, api.SimpleNameGenerator}
// NamespaceScoped returns true because all jobs need to be within a namespace.
func (jobStrategy) NamespaceScoped() bool {
return true
}
// PrepareForCreate clears the status of a job before creation.
func (jobStrategy) PrepareForCreate(obj runtime.Object) {
job := obj.(*expapi.Job)
job.Status = expapi.JobStatus{}
}
// PrepareForUpdate clears fields that are not allowed to be set by end users on update.
func (jobStrategy) PrepareForUpdate(obj, old runtime.Object) {
newJob := obj.(*expapi.Job)
oldJob := old.(*expapi.Job)
newJob.Status = oldJob.Status
}
// Validate validates a new job.
func (jobStrategy) Validate(ctx api.Context, obj runtime.Object) fielderrors.ValidationErrorList {
job := obj.(*expapi.Job)
return validation.ValidateJob(job)
}
func (jobStrategy) AllowUnconditionalUpdate() bool {
return true
}
// AllowCreateOnUpdate is false for jobs; this means a POST is needed to create one.
func (jobStrategy) AllowCreateOnUpdate() bool {
return false
}
// ValidateUpdate is the default update validation for an end user.
func (jobStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) fielderrors.ValidationErrorList {
validationErrorList := validation.ValidateJob(obj.(*expapi.Job))
updateErrorList := validation.ValidateJobUpdate(old.(*expapi.Job), obj.(*expapi.Job))
return append(validationErrorList, updateErrorList...)
}
// JobSelectableFields returns a field set that represents the object for matching purposes.
func JobToSelectableFields(job *expapi.Job) fields.Set {
return fields.Set{
"metadata.name": job.Name,
"status.successful": strconv.Itoa(job.Status.Successful),
}
}
// MatchJob is the filter used by the generic etcd backend to route
// watch events from etcd to clients of the apiserver only interested in specific
// labels/fields.
func MatchJob(label labels.Selector, field fields.Selector) generic.Matcher {
return &generic.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
job, ok := obj.(*expapi.Job)
if !ok {
return nil, nil, fmt.Errorf("Given object is not a job.")
}
return labels.Set(job.ObjectMeta.Labels), JobToSelectableFields(job), nil
},
}
}