remove storage dependency on api
This commit is contained in:
@@ -33,10 +33,8 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
apitesting "k8s.io/kubernetes/pkg/api/testing"
|
||||
corepod "k8s.io/kubernetes/pkg/registry/core/pod"
|
||||
"k8s.io/client-go/pkg/api"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
|
||||
@@ -44,8 +42,52 @@ import (
|
||||
"k8s.io/kubernetes/pkg/storage/etcd3"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
_ "k8s.io/client-go/pkg/api/install"
|
||||
)
|
||||
|
||||
func DeepEqualSafePodSpec() api.PodSpec {
|
||||
grace := int64(30)
|
||||
return api.PodSpec{
|
||||
RestartPolicy: api.RestartPolicyAlways,
|
||||
DNSPolicy: api.DNSClusterFirst,
|
||||
TerminationGracePeriodSeconds: &grace,
|
||||
SecurityContext: &api.PodSecurityContext{},
|
||||
SchedulerName: api.DefaultSchedulerName,
|
||||
}
|
||||
}
|
||||
|
||||
// GetAttrs returns labels and fields of a given object for filtering purposes.
|
||||
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod, ok := obj.(*api.Pod)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("not a pod")
|
||||
}
|
||||
return labels.Set(pod.ObjectMeta.Labels), PodToSelectableFields(pod), nil
|
||||
}
|
||||
|
||||
// PodToSelectableFields returns a field set that represents the object
|
||||
// TODO: fields are not labels, and the validation rules for them do not apply.
|
||||
func PodToSelectableFields(pod *api.Pod) fields.Set {
|
||||
// The purpose of allocation with a given number of elements is to reduce
|
||||
// amount of allocations needed to create the fields.Set. If you add any
|
||||
// field here or the number of object-meta related fields changes, this should
|
||||
// be adjusted.
|
||||
podSpecificFieldsSet := make(fields.Set, 5)
|
||||
podSpecificFieldsSet["spec.nodeName"] = pod.Spec.NodeName
|
||||
podSpecificFieldsSet["spec.restartPolicy"] = string(pod.Spec.RestartPolicy)
|
||||
podSpecificFieldsSet["status.phase"] = string(pod.Status.Phase)
|
||||
return AddObjectMetaFieldsSet(podSpecificFieldsSet, &pod.ObjectMeta, true)
|
||||
}
|
||||
|
||||
func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, hasNamespaceField bool) fields.Set {
|
||||
source["metadata.name"] = objectMeta.Name
|
||||
if hasNamespaceField {
|
||||
source["metadata.namespace"] = objectMeta.Namespace
|
||||
}
|
||||
return source
|
||||
}
|
||||
|
||||
func newEtcdTestStorage(t *testing.T, codec runtime.Codec, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) {
|
||||
server, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
|
||||
storage := etcd3.New(server.V3Client, codec, prefix)
|
||||
@@ -58,12 +100,13 @@ func newTestCacher(s storage.Interface, cap int) *storage.Cacher {
|
||||
CacheCapacity: cap,
|
||||
Storage: s,
|
||||
Versioner: etcdstorage.APIObjectVersioner{},
|
||||
Copier: api.Scheme,
|
||||
Type: &api.Pod{},
|
||||
ResourcePrefix: prefix,
|
||||
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
||||
GetAttrsFunc: corepod.GetAttrs,
|
||||
GetAttrsFunc: GetAttrs,
|
||||
NewListFunc: func() runtime.Object { return &api.PodList{} },
|
||||
Codec: testapi.Default.Codec(),
|
||||
Codec: api.Codecs.LegacyCodec(v1.SchemeGroupVersion),
|
||||
}
|
||||
return storage.NewCacherFromConfig(config)
|
||||
}
|
||||
@@ -71,7 +114,7 @@ func newTestCacher(s storage.Interface, cap int) *storage.Cacher {
|
||||
func makeTestPod(name string) *api.Pod {
|
||||
return &api.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
|
||||
Spec: apitesting.DeepEqualSafePodSpec(),
|
||||
Spec: DeepEqualSafePodSpec(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -97,7 +140,7 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod {
|
||||
}
|
||||
|
||||
func TestGet(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
@@ -128,7 +171,7 @@ func TestGet(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestList(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
@@ -209,7 +252,7 @@ func TestList(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestInfiniteList(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
@@ -263,7 +306,7 @@ func (self *injectListError) List(ctx context.Context, key string, resourceVersi
|
||||
}
|
||||
|
||||
func TestWatch(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix())
|
||||
// Inject one list error to make sure we test the relist case.
|
||||
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
|
||||
defer server.Terminate(t)
|
||||
@@ -340,7 +383,7 @@ func TestWatch(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWatcherTimeout(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
@@ -382,7 +425,7 @@ func TestWatcherTimeout(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFiltering(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
@@ -444,7 +487,7 @@ func TestFiltering(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStartingResourceVersion(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
@@ -492,7 +535,7 @@ func TestStartingResourceVersion(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRandomWatchDeliver(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
|
||||
server, etcdStorage := newEtcdTestStorage(t, api.Codecs.LegacyCodec(v1.SchemeGroupVersion), etcdtest.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
|
||||
Reference in New Issue
Block a user