Merge pull request #33512 from deads2k/api-15-move-core-reststorage

Automatic merge from submit-queue

move core storage out of master.go

Moves the core resource creation out of master.go and makes it more congruent to the other storages.

WIP because I haven't run tests yet, but I figured I'd see what breaks in the morning.
This commit is contained in:
Kubernetes Submit Queue 2016-09-29 14:35:20 -07:00 committed by GitHub
commit a1b1a1a728
5 changed files with 522 additions and 947 deletions

View File

@ -74,37 +74,14 @@ import (
autoscalingrest "k8s.io/kubernetes/pkg/registry/autoscaling/rest"
batchrest "k8s.io/kubernetes/pkg/registry/batch/rest"
certificatesrest "k8s.io/kubernetes/pkg/registry/certificates/rest"
corerest "k8s.io/kubernetes/pkg/registry/core/rest"
extensionsrest "k8s.io/kubernetes/pkg/registry/extensions/rest"
policyrest "k8s.io/kubernetes/pkg/registry/policy/rest"
rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
storagerest "k8s.io/kubernetes/pkg/registry/storage/rest"
// direct etcd registry dependencies
"k8s.io/kubernetes/pkg/registry/core/componentstatus"
configmapetcd "k8s.io/kubernetes/pkg/registry/core/configmap/etcd"
controlleretcd "k8s.io/kubernetes/pkg/registry/core/controller/etcd"
"k8s.io/kubernetes/pkg/registry/core/endpoint"
endpointsetcd "k8s.io/kubernetes/pkg/registry/core/endpoint/etcd"
eventetcd "k8s.io/kubernetes/pkg/registry/core/event/etcd"
limitrangeetcd "k8s.io/kubernetes/pkg/registry/core/limitrange/etcd"
"k8s.io/kubernetes/pkg/registry/core/namespace"
namespaceetcd "k8s.io/kubernetes/pkg/registry/core/namespace/etcd"
"k8s.io/kubernetes/pkg/registry/core/node"
nodeetcd "k8s.io/kubernetes/pkg/registry/core/node/etcd"
pvetcd "k8s.io/kubernetes/pkg/registry/core/persistentvolume/etcd"
pvcetcd "k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/etcd"
podetcd "k8s.io/kubernetes/pkg/registry/core/pod/etcd"
podtemplateetcd "k8s.io/kubernetes/pkg/registry/core/podtemplate/etcd"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
resourcequotaetcd "k8s.io/kubernetes/pkg/registry/core/resourcequota/etcd"
secretetcd "k8s.io/kubernetes/pkg/registry/core/secret/etcd"
"k8s.io/kubernetes/pkg/registry/core/service"
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
etcdallocator "k8s.io/kubernetes/pkg/registry/core/service/allocator/etcd"
serviceetcd "k8s.io/kubernetes/pkg/registry/core/service/etcd"
ipallocator "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
serviceaccountetcd "k8s.io/kubernetes/pkg/registry/core/serviceaccount/etcd"
"k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata"
thirdpartyresourcedataetcd "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata/etcd"
)
@ -146,20 +123,11 @@ type EndpointReconcilerConfig struct {
type Master struct {
*genericapiserver.GenericAPIServer
// Map of v1 resources to their REST storages.
v1ResourcesStorage map[string]rest.Storage
legacyRESTStorageProvider corerest.LegacyRESTStorageProvider
legacyRESTStorage corerest.LegacyRESTStorage
enableCoreControllers bool
deleteCollectionWorkers int
// registries are internal client APIs for accessing the storage layer
// TODO: define the internal typed interface in a way that clients can
// also be replaced
nodeRegistry node.Registry
namespaceRegistry namespace.Registry
serviceRegistry service.Registry
endpointRegistry endpoint.Registry
serviceClusterIPAllocator rangeallocation.RangeRegistry
serviceNodePortAllocator rangeallocation.RangeRegistry
// storage for third party objects
thirdPartyStorageConfig *storagebackend.Config
@ -243,6 +211,16 @@ func (c completedConfig) New() (*Master, error) {
enableGarbageCollection: c.GenericConfig.EnableGarbageCollection,
storageFactory: c.StorageFactory,
},
legacyRESTStorageProvider: corerest.LegacyRESTStorageProvider{
StorageFactory: c.StorageFactory,
ProxyTransport: s.ProxyTransport,
KubeletClient: c.KubeletClient,
EventTTL: c.EventTTL,
ServiceClusterIPRange: c.GenericConfig.ServiceClusterIPRange,
ServiceNodePortRange: c.GenericConfig.ServiceNodePortRange,
ComponentStatusServerFunc: func() map[string]apiserver.Server { return getServersToValidate(c.StorageFactory) },
},
}
if c.EnableWatchCache {
@ -279,29 +257,20 @@ func (c completedConfig) New() (*Master, error) {
}
func (m *Master) InstallAPIs(c *Config) {
restOptionsGetter := func(resource unversioned.GroupResource) generic.RESTOptions {
return m.restOptionsFactory.NewFor(resource)
}
apiGroupsInfo := []genericapiserver.APIGroupInfo{}
// Install v1 unless disabled.
if c.GenericConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) {
// Install v1 API.
m.initV1ResourcesStorage(c)
apiGroupInfo := genericapiserver.APIGroupInfo{
GroupMeta: *registered.GroupOrDie(api.GroupName),
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{
"v1": m.v1ResourcesStorage,
},
IsLegacyGroup: true,
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
SubresourceGroupVersionKind: map[string]unversioned.GroupVersionKind{},
}
if autoscalingGroupVersion := (unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}); registered.IsEnabledVersion(autoscalingGroupVersion) {
apiGroupInfo.SubresourceGroupVersionKind["replicationcontrollers/scale"] = autoscalingGroupVersion.WithKind("Scale")
}
if policyGroupVersion := (unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}); registered.IsEnabledVersion(policyGroupVersion) {
apiGroupInfo.SubresourceGroupVersionKind["pods/eviction"] = policyGroupVersion.WithKind("Eviction")
legacyRESTStorage, apiGroupInfo, err := m.legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
if err != nil {
glog.Fatalf("Error building core storage: %v", err)
}
m.legacyRESTStorage = legacyRESTStorage
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
}
@ -334,10 +303,6 @@ func (m *Master) InstallAPIs(c *Config) {
m.thirdPartyResources = map[string]*thirdPartyEntry{}
}
restOptionsGetter := func(resource unversioned.GroupResource) generic.RESTOptions {
return m.restOptionsFactory.NewFor(resource)
}
// stabilize order.
// TODO find a better way to configure priority of groups
for _, group := range sets.StringKeySet(c.RESTStorageProviders).List() {
@ -386,133 +351,11 @@ func (m *Master) InstallAPIs(c *Config) {
}
}
func (m *Master) initV1ResourcesStorage(c *Config) {
restOptions := func(resource string) generic.RESTOptions {
return m.restOptionsFactory.NewFor(api.Resource(resource))
}
podTemplateStorage := podtemplateetcd.NewREST(restOptions("podTemplates"))
eventStorage := eventetcd.NewREST(restOptions("events"), uint64(c.EventTTL.Seconds()))
limitRangeStorage := limitrangeetcd.NewREST(restOptions("limitRanges"))
resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(restOptions("resourceQuotas"))
secretStorage := secretetcd.NewREST(restOptions("secrets"))
serviceAccountStorage := serviceaccountetcd.NewREST(restOptions("serviceAccounts"))
persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(restOptions("persistentVolumes"))
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(restOptions("persistentVolumeClaims"))
configMapStorage := configmapetcd.NewREST(restOptions("configMaps"))
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(restOptions("namespaces"))
m.namespaceRegistry = namespace.NewRegistry(namespaceStorage)
endpointsStorage := endpointsetcd.NewREST(restOptions("endpoints"))
m.endpointRegistry = endpoint.NewRegistry(endpointsStorage)
nodeStorage := nodeetcd.NewStorage(restOptions("nodes"), c.KubeletClient, m.ProxyTransport)
m.nodeRegistry = node.NewRegistry(nodeStorage.Node)
podStorage := podetcd.NewStorage(
restOptions("pods"),
kubeletclient.ConnectionInfoGetter(nodeStorage.Node),
m.ProxyTransport,
)
serviceRESTStorage, serviceStatusStorage := serviceetcd.NewREST(restOptions("services"))
m.serviceRegistry = service.NewRegistry(serviceRESTStorage)
var serviceClusterIPRegistry rangeallocation.RangeRegistry
serviceClusterIPRange := m.ServiceClusterIPRange
if serviceClusterIPRange == nil {
glog.Fatalf("service clusterIPRange is nil")
return
}
serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
if err != nil {
glog.Fatal(err.Error())
}
serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
serviceClusterIPRegistry = etcd
return etcd
})
m.serviceClusterIPAllocator = serviceClusterIPRegistry
var serviceNodePortRegistry rangeallocation.RangeRegistry
serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(m.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig)
serviceNodePortRegistry = etcd
return etcd
})
m.serviceNodePortAllocator = serviceNodePortRegistry
controllerStorage := controlleretcd.NewStorage(restOptions("replicationControllers"))
serviceRest := service.NewStorage(m.serviceRegistry, m.endpointRegistry, serviceClusterIPAllocator, serviceNodePortAllocator, m.ProxyTransport)
// TODO: Factor out the core API registration
m.v1ResourcesStorage = map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
"pods/binding": podStorage.Binding,
"bindings": podStorage.Binding,
"podTemplates": podTemplateStorage,
"replicationControllers": controllerStorage.Controller,
"replicationControllers/status": controllerStorage.Status,
"services": serviceRest.Service,
"services/proxy": serviceRest.Proxy,
"services/status": serviceStatusStorage,
"endpoints": endpointsStorage,
"nodes": nodeStorage.Node,
"nodes/status": nodeStorage.Status,
"nodes/proxy": nodeStorage.Proxy,
"events": eventStorage,
"limitRanges": limitRangeStorage,
"resourceQuotas": resourceQuotaStorage,
"resourceQuotas/status": resourceQuotaStatusStorage,
"namespaces": namespaceStorage,
"namespaces/status": namespaceStatusStorage,
"namespaces/finalize": namespaceFinalizeStorage,
"secrets": secretStorage,
"serviceAccounts": serviceAccountStorage,
"persistentVolumes": persistentVolumeStorage,
"persistentVolumes/status": persistentVolumeStatusStorage,
"persistentVolumeClaims": persistentVolumeClaimStorage,
"persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
"configMaps": configMapStorage,
"componentStatuses": componentstatus.NewStorage(func() map[string]apiserver.Server { return m.getServersToValidate(c) }),
}
if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}) {
m.v1ResourcesStorage["replicationControllers/scale"] = controllerStorage.Scale
}
if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}) {
m.v1ResourcesStorage["pods/eviction"] = podStorage.Eviction
}
}
// NewBootstrapController returns a controller for watching the core capabilities of the master. If
// endpointReconcilerConfig.Interval is 0, the default value of DefaultEndpointReconcilerInterval
// will be used instead. If endpointReconcilerConfig.Reconciler is nil, the default
// MasterCountEndpointReconciler will be used.
// TODO this should be kicked off as a server PostHook
func (m *Master) NewBootstrapController(endpointReconcilerConfig EndpointReconcilerConfig) *Controller {
if endpointReconcilerConfig.Interval == 0 {
endpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval
@ -521,12 +364,12 @@ func (m *Master) NewBootstrapController(endpointReconcilerConfig EndpointReconci
if endpointReconcilerConfig.Reconciler == nil {
// use a default endpoint reconciler if nothing is set
// m.endpointRegistry is set via m.InstallAPIs -> m.initV1ResourcesStorage
endpointReconcilerConfig.Reconciler = NewMasterCountEndpointReconciler(m.MasterCount, m.endpointRegistry)
endpointReconcilerConfig.Reconciler = NewMasterCountEndpointReconciler(m.MasterCount, m.legacyRESTStorage.EndpointRegistry)
}
return &Controller{
NamespaceRegistry: m.namespaceRegistry,
ServiceRegistry: m.serviceRegistry,
NamespaceRegistry: m.legacyRESTStorage.NamespaceRegistry,
ServiceRegistry: m.legacyRESTStorage.ServiceRegistry,
EndpointReconciler: endpointReconcilerConfig.Reconciler,
EndpointInterval: endpointReconcilerConfig.Interval,
@ -534,12 +377,12 @@ func (m *Master) NewBootstrapController(endpointReconcilerConfig EndpointReconci
SystemNamespaces: []string{api.NamespaceSystem},
SystemNamespacesInterval: 1 * time.Minute,
ServiceClusterIPRegistry: m.serviceClusterIPAllocator,
ServiceClusterIPRange: m.ServiceClusterIPRange,
ServiceClusterIPRegistry: m.legacyRESTStorage.ServiceClusterIPAllocator,
ServiceClusterIPRange: m.legacyRESTStorageProvider.ServiceClusterIPRange,
ServiceClusterIPInterval: 3 * time.Minute,
ServiceNodePortRegistry: m.serviceNodePortAllocator,
ServiceNodePortRange: m.ServiceNodePortRange,
ServiceNodePortRegistry: m.legacyRESTStorage.ServiceNodePortAllocator,
ServiceNodePortRange: m.legacyRESTStorageProvider.ServiceNodePortRange,
ServiceNodePortInterval: 3 * time.Minute,
PublicIP: m.ClusterIP,
@ -553,13 +396,13 @@ func (m *Master) NewBootstrapController(endpointReconcilerConfig EndpointReconci
}
}
func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
func getServersToValidate(storageFactory genericapiserver.StorageFactory) map[string]apiserver.Server {
serversToValidate := map[string]apiserver.Server{
"controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"},
"scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"},
}
for ix, machine := range c.StorageFactory.Backends() {
for ix, machine := range storageFactory.Backends() {
etcdUrl, err := url.Parse(machine)
if err != nil {
glog.Errorf("Failed to parse etcd url for validation: %v", err)
@ -867,7 +710,7 @@ func findExternalAddress(node *api.Node) (string, error) {
}
func (m *Master) getNodeAddresses() ([]string, error) {
nodes, err := m.nodeRegistry.ListNodes(api.NewDefaultContext(), nil)
nodes, err := m.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil)
if err != nil {
return nil, err
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package master
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
@ -28,10 +27,8 @@ import (
"reflect"
"strings"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
@ -47,22 +44,13 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions"
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/apis/rbac"
"k8s.io/kubernetes/pkg/apiserver/request"
"k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/registry/core/endpoint"
"k8s.io/kubernetes/pkg/registry/core/namespace"
ipallocator "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
extensionsrest "k8s.io/kubernetes/pkg/registry/extensions/rest"
"k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
"k8s.io/kubernetes/pkg/util/intstr"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/version"
@ -72,7 +60,6 @@ import (
"github.com/go-openapi/strfmt"
"github.com/go-openapi/validate"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
)
// setUp is a convience function for setting up for (most) tests.
@ -82,7 +69,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
master := &Master{
GenericAPIServer: &genericapiserver.GenericAPIServer{},
}
config := Config{
config := &Config{
GenericConfig: &genericapiserver.Config{},
}
@ -108,6 +95,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
config.GenericConfig.ProxyTLSClientConfig = &tls.Config{}
config.GenericConfig.RequestContextMapper = api.NewRequestContextMapper()
config.GenericConfig.EnableVersion = true
config.EnableCoreControllers = false
// TODO: this is kind of hacky. The trouble is that the sync loop
// runs in a go-routine and there is no way to validate in the test
@ -117,9 +105,14 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
// run the sync routine and register types manually.
config.disableThirdPartyControllerForTesting = true
master.nodeRegistry = registrytest.NewNodeRegistry([]string{"node1", "node2"}, api.NodeResources{})
master, err := config.Complete().New()
if err != nil {
t.Fatal(err)
}
return master, server, config, assert.New(t)
master.legacyRESTStorage.NodeRegistry = registrytest.NewNodeRegistry([]string{"node1", "node2"}, api.NodeResources{})
return master, server, *config, assert.New(t)
}
func newMaster(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
@ -187,26 +180,6 @@ func TestNew(t *testing.T) {
assert.Equal(master.ProxyTransport.(*http.Transport).TLSClientConfig, config.GenericConfig.ProxyTLSClientConfig)
}
// TestNamespaceSubresources ensures the namespace subresource parsing in apiserver/handlers.go doesn't drift
func TestNamespaceSubresources(t *testing.T) {
master, etcdserver, _, _ := newMaster(t)
defer etcdserver.Terminate(t)
expectedSubresources := request.NamespaceSubResourcesForTest
foundSubresources := sets.NewString()
for k := range master.v1ResourcesStorage {
parts := strings.Split(k, "/")
if len(parts) == 2 && parts[0] == "namespaces" {
foundSubresources.Insert(parts[1])
}
}
if !reflect.DeepEqual(expectedSubresources.List(), foundSubresources.List()) {
t.Errorf("Expected namespace subresources %#v, got %#v. Update apiserver/handlers.go#namespaceSubresources", expectedSubresources.List(), foundSubresources.List())
}
}
// TestVersion tests /version
func TestVersion(t *testing.T) {
s, etcdserver, _, _ := newMaster(t)
@ -232,10 +205,10 @@ func TestVersion(t *testing.T) {
// TestGetServersToValidate verifies the unexported getServersToValidate function
func TestGetServersToValidate(t *testing.T) {
master, etcdserver, config, assert := setUp(t)
_, etcdserver, config, assert := setUp(t)
defer etcdserver.Terminate(t)
servers := master.getServersToValidate(&config)
servers := getServersToValidate(config.StorageFactory)
// Expected servers to validate: scheduler, controller-manager and etcd.
assert.Equal(3, len(servers), "unexpected server list: %#v", servers)
@ -276,74 +249,6 @@ func (*fakeEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP,
return nil
}
// TestNewBootstrapController verifies master fields are properly copied into controller
func TestNewBootstrapController(t *testing.T) {
// Tests a subset of inputs to ensure they are set properly in the controller
master, etcdserver, _, assert := setUp(t)
defer etcdserver.Terminate(t)
portRange := utilnet.PortRange{Base: 10, Size: 10}
master.namespaceRegistry = namespace.NewRegistry(nil)
master.serviceRegistry = registrytest.NewServiceRegistry()
master.endpointRegistry = endpoint.NewRegistry(nil)
master.ServiceNodePortRange = portRange
master.MasterCount = 1
master.ServiceReadWritePort = 1000
master.PublicReadWritePort = 1010
// test with an empty EndpointReconcilerConfig to ensure the defaults are applied
controller := master.NewBootstrapController(EndpointReconcilerConfig{})
assert.Equal(controller.NamespaceRegistry, master.namespaceRegistry)
assert.Equal(controller.EndpointReconciler, NewMasterCountEndpointReconciler(master.MasterCount, master.endpointRegistry))
assert.Equal(controller.EndpointInterval, DefaultEndpointReconcilerInterval)
assert.Equal(controller.ServiceRegistry, master.serviceRegistry)
assert.Equal(controller.ServiceNodePortRange, portRange)
assert.Equal(controller.ServicePort, master.ServiceReadWritePort)
assert.Equal(controller.PublicServicePort, master.PublicReadWritePort)
// test with a filled-in EndpointReconcilerConfig to make sure its values are used
controller = master.NewBootstrapController(EndpointReconcilerConfig{
Reconciler: &fakeEndpointReconciler{},
Interval: 5 * time.Second,
})
assert.Equal(controller.EndpointReconciler, &fakeEndpointReconciler{})
assert.Equal(controller.EndpointInterval, 5*time.Second)
}
// TestControllerServicePorts verifies master extraServicePorts are
// correctly copied into controller
func TestControllerServicePorts(t *testing.T) {
master, etcdserver, _, assert := setUp(t)
defer etcdserver.Terminate(t)
master.namespaceRegistry = namespace.NewRegistry(nil)
master.serviceRegistry = registrytest.NewServiceRegistry()
master.endpointRegistry = endpoint.NewRegistry(nil)
master.ExtraServicePorts = []api.ServicePort{
{
Name: "additional-port-1",
Port: 1000,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(1000),
},
{
Name: "additional-port-2",
Port: 1010,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(1010),
},
}
controller := master.NewBootstrapController(EndpointReconcilerConfig{})
assert.Equal(int32(1000), controller.ExtraServicePorts[0].Port)
assert.Equal(int32(1010), controller.ExtraServicePorts[1].Port)
}
// TestGetNodeAddresses verifies that proper results are returned
// when requesting node addresses.
func TestGetNodeAddresses(t *testing.T) {
@ -351,14 +256,14 @@ func TestGetNodeAddresses(t *testing.T) {
defer etcdserver.Terminate(t)
// Fail case (no addresses associated with nodes)
nodes, _ := master.nodeRegistry.ListNodes(api.NewDefaultContext(), nil)
nodes, _ := master.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil)
addrs, err := master.getNodeAddresses()
assert.Error(err, "getNodeAddresses should have caused an error as there are no addresses.")
assert.Equal([]string(nil), addrs)
// Pass case with External type IP
nodes, _ = master.nodeRegistry.ListNodes(api.NewDefaultContext(), nil)
nodes, _ = master.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil)
for index := range nodes.Items {
nodes.Items[index].Status.Addresses = []api.NodeAddress{{Type: api.NodeExternalIP, Address: "127.0.0.1"}}
}
@ -367,7 +272,7 @@ func TestGetNodeAddresses(t *testing.T) {
assert.Equal([]string{"127.0.0.1", "127.0.0.1"}, addrs)
// Pass case with LegacyHost type IP
nodes, _ = master.nodeRegistry.ListNodes(api.NewDefaultContext(), nil)
nodes, _ = master.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil)
for index := range nodes.Items {
nodes.Items[index].Status.Addresses = []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.2"}}
}
@ -376,6 +281,19 @@ func TestGetNodeAddresses(t *testing.T) {
assert.Equal([]string{"127.0.0.2", "127.0.0.2"}, addrs)
}
func decodeResponse(resp *http.Response, obj interface{}) error {
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if err := json.Unmarshal(data, obj); err != nil {
return err
}
return nil
}
// Because we need to be backwards compatible with release 1.1, at endpoints
// that exist in release 1.1, the responses should have empty APIVersion.
func TestAPIVersionOfDiscoveryEndpoints(t *testing.T) {
@ -564,613 +482,6 @@ func TestDiscoveryAtAPIS(t *testing.T) {
}
}
var versionsToTest = []string{"v1", "v3"}
type Foo struct {
unversioned.TypeMeta `json:",inline"`
api.ObjectMeta `json:"metadata,omitempty" description:"standard object metadata"`
SomeField string `json:"someField"`
OtherField int `json:"otherField"`
}
type FooList struct {
unversioned.TypeMeta `json:",inline"`
unversioned.ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
Items []Foo `json:"items"`
}
func initThirdParty(t *testing.T, version, name string) (*Master, *etcdtesting.EtcdTestServer, *httptest.Server, *assert.Assertions) {
return initThirdPartyMultiple(t, []string{version}, []string{name})
}
func initThirdPartyMultiple(t *testing.T, versions, names []string) (*Master, *etcdtesting.EtcdTestServer, *httptest.Server, *assert.Assertions) {
master, etcdserver, _, assert := newMaster(t)
_, master.ServiceClusterIPRange, _ = net.ParseCIDR("10.0.0.0/24")
for ix := range names {
api := &extensions.ThirdPartyResource{
ObjectMeta: api.ObjectMeta{
Name: names[ix],
},
Versions: []extensions.APIVersion{
{
Name: versions[ix],
},
},
}
hasRsrc, err := master.HasThirdPartyResource(api)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if !hasRsrc {
err := master.InstallThirdPartyResource(api)
if !assert.NoError(err) {
t.Errorf("Failed to install API: %v", err)
t.FailNow()
}
} else {
t.Errorf("Expected %s: %v not to be present!", names[ix], api)
}
}
server := httptest.NewServer(master.HandlerContainer.ServeMux)
return master, etcdserver, server, assert
}
func TestInstallMultipleAPIs(t *testing.T) {
names := []string{"foo.company.com", "bar.company.com"}
versions := []string{"v1", "v1"}
_, etcdserver, server, assert := initThirdPartyMultiple(t, versions, names)
defer server.Close()
defer etcdserver.Terminate(t)
for ix := range names {
kind, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(
&extensions.ThirdPartyResource{ObjectMeta: api.ObjectMeta{Name: names[ix]}})
assert.NoError(err, "Failed to extract group & kind")
plural, _ := meta.KindToResource(unversioned.GroupVersionKind{
Group: group,
Version: versions[ix],
Kind: kind,
})
resp, err := http.Get(
fmt.Sprintf("%s/apis/%s/%s/namespaces/default/%s", server.URL, group, versions[ix], plural.Resource))
if !assert.NoError(err, "Failed to do HTTP GET") {
return
}
defer resp.Body.Close()
assert.Equal(http.StatusOK, resp.StatusCode)
data, err := ioutil.ReadAll(resp.Body)
assert.NoError(err)
obj := map[string]interface{}{}
if err = json.Unmarshal(data, &obj); err != nil {
assert.NoError(err, fmt.Sprintf("unexpected error: %v", err))
}
kindOut, found := obj["kind"]
if !found {
t.Errorf("Missing 'kind' in %v", obj)
}
assert.Equal(kindOut, kind+"List")
}
}
func TestInstallThirdPartyAPIList(t *testing.T) {
for _, version := range versionsToTest {
testInstallThirdPartyAPIListVersion(t, version)
}
}
func testInstallThirdPartyAPIListVersion(t *testing.T, version string) {
tests := []struct {
items []Foo
name string
test string
}{
{
name: "foo.company.com",
test: "null",
},
{
items: []Foo{},
name: "foo.company.com",
test: "empty",
},
{
items: []Foo{},
name: "policy.company.com",
test: "plurals",
},
{
items: []Foo{
{
ObjectMeta: api.ObjectMeta{
Name: "test",
},
TypeMeta: unversioned.TypeMeta{
Kind: "Foo",
APIVersion: version,
},
SomeField: "test field",
OtherField: 10,
},
{
ObjectMeta: api.ObjectMeta{
Name: "bar",
},
TypeMeta: unversioned.TypeMeta{
Kind: "Foo",
APIVersion: version,
},
SomeField: "test field another",
OtherField: 20,
},
},
name: "foo.company.com",
test: "real list",
},
}
for _, test := range tests {
func() {
master, etcdserver, server, assert := initThirdParty(t, version, test.name)
defer server.Close()
defer etcdserver.Terminate(t)
kind, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(
&extensions.ThirdPartyResource{ObjectMeta: api.ObjectMeta{Name: test.name}})
assert.NoError(err, test.test)
plural, _ := meta.KindToResource(unversioned.GroupVersionKind{
Group: group,
Version: version,
Kind: kind,
})
if test.items != nil {
s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig)
defer destroyFunc()
err := createThirdPartyList(
s,
fmt.Sprintf("/ThirdPartyResourceData/%s/%s/default", group, plural.Resource),
test.items)
if !assert.NoError(err, test.test) {
return
}
}
resp, err := http.Get(
fmt.Sprintf("%s/apis/%s/%s/namespaces/default/%s", server.URL, group, version, plural.Resource))
if !assert.NoError(err, test.test) {
return
}
defer resp.Body.Close()
assert.Equal(http.StatusOK, resp.StatusCode, test.test)
data, err := ioutil.ReadAll(resp.Body)
assert.NoError(err, test.test)
list := FooList{}
if err = json.Unmarshal(data, &list); err != nil {
assert.NoError(err, "unexpected error: %v %s", err, test.test)
}
if test.items == nil {
if len(list.Items) != 0 {
assert.NoError(err, "expected no items, saw: %v %s", err, list.Items, test.test)
}
return
}
if len(list.Items) != len(test.items) {
t.Fatalf("(%s) unexpected length: %d vs %d", test.name, len(list.Items), len(test.items))
}
// The order of elements in LIST is not guaranteed.
mapping := make(map[string]int)
for ix := range test.items {
mapping[test.items[ix].Name] = ix
}
for ix := range list.Items {
// Copy things that are set dynamically on the server
expectedObj := test.items[mapping[list.Items[ix].Name]]
expectedObj.SelfLink = list.Items[ix].SelfLink
expectedObj.ResourceVersion = list.Items[ix].ResourceVersion
expectedObj.Namespace = list.Items[ix].Namespace
expectedObj.UID = list.Items[ix].UID
expectedObj.CreationTimestamp = list.Items[ix].CreationTimestamp
// We endure the order of items by sorting them (using 'mapping')
// so that this function passes.
if !reflect.DeepEqual(list.Items[ix], expectedObj) {
t.Errorf("(%s) expected:\n%#v\nsaw:\n%#v\n", test.name, expectedObj, list.Items[ix])
}
}
}()
}
}
func encodeToThirdParty(name string, obj interface{}) (runtime.Object, error) {
serial, err := json.Marshal(obj)
if err != nil {
return nil, err
}
thirdPartyData := extensions.ThirdPartyResourceData{
ObjectMeta: api.ObjectMeta{Name: name},
Data: serial,
}
return &thirdPartyData, nil
}
func createThirdPartyObject(s storage.Interface, path, name string, obj interface{}) error {
data, err := encodeToThirdParty(name, obj)
if err != nil {
return err
}
return s.Create(context.TODO(), etcdtest.AddPrefix(path), data, nil, 0)
}
func createThirdPartyList(s storage.Interface, path string, list []Foo) error {
for _, obj := range list {
if err := createThirdPartyObject(s, path+"/"+obj.Name, obj.Name, obj); err != nil {
return err
}
}
return nil
}
func decodeResponse(resp *http.Response, obj interface{}) error {
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if err := json.Unmarshal(data, obj); err != nil {
return err
}
return nil
}
func writeResponseToFile(resp *http.Response, filename string) error {
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
return ioutil.WriteFile(filename, data, 0755)
}
func TestInstallThirdPartyAPIGet(t *testing.T) {
for _, version := range versionsToTest {
testInstallThirdPartyAPIGetVersion(t, version)
}
}
func testInstallThirdPartyAPIGetVersion(t *testing.T, version string) {
master, etcdserver, server, assert := initThirdParty(t, version, "foo.company.com")
defer server.Close()
defer etcdserver.Terminate(t)
expectedObj := Foo{
ObjectMeta: api.ObjectMeta{
Name: "test",
},
TypeMeta: unversioned.TypeMeta{
Kind: "Foo",
APIVersion: version,
},
SomeField: "test field",
OtherField: 10,
}
s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig)
defer destroyFunc()
if !assert.NoError(createThirdPartyObject(s, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) {
t.FailNow()
return
}
resp, err := http.Get(server.URL + "/apis/company.com/" + version + "/namespaces/default/foos/test")
if !assert.NoError(err) {
return
}
assert.Equal(http.StatusOK, resp.StatusCode)
item := Foo{}
assert.NoError(decodeResponse(resp, &item))
if !assert.False(reflect.DeepEqual(item, expectedObj)) {
t.Errorf("expected objects to not be equal:\n%v\nsaw:\n%v\n", expectedObj, item)
}
// Fill in data that the apiserver injects
expectedObj.SelfLink = item.SelfLink
expectedObj.ResourceVersion = item.ResourceVersion
if !assert.True(reflect.DeepEqual(item, expectedObj)) {
t.Errorf("expected:\n%#v\nsaw:\n%#v\n", expectedObj, item)
}
}
func TestInstallThirdPartyAPIPost(t *testing.T) {
registered.AddThirdPartyAPIGroupVersions(unversioned.GroupVersion{Group: "company.com", Version: "v1"}, unversioned.GroupVersion{Group: "company.com", Version: "v3"})
for _, version := range versionsToTest {
testInstallThirdPartyAPIPostForVersion(t, version)
}
}
func testInstallThirdPartyAPIPostForVersion(t *testing.T, version string) {
master, etcdserver, server, assert := initThirdParty(t, version, "foo.company.com")
defer server.Close()
defer etcdserver.Terminate(t)
inputObj := Foo{
ObjectMeta: api.ObjectMeta{
Name: "test",
},
TypeMeta: unversioned.TypeMeta{
Kind: "Foo",
APIVersion: "company.com/" + version,
},
SomeField: "test field",
OtherField: 10,
}
data, err := json.Marshal(inputObj)
if !assert.NoError(err) {
return
}
resp, err := http.Post(server.URL+"/apis/company.com/"+version+"/namespaces/default/foos", "application/json", bytes.NewBuffer(data))
if !assert.NoError(err) {
t.Fatalf("unexpected error: %v", err)
}
assert.Equal(http.StatusCreated, resp.StatusCode)
item := Foo{}
assert.NoError(decodeResponse(resp, &item))
// fill in fields set by the apiserver
expectedObj := inputObj
expectedObj.SelfLink = item.SelfLink
expectedObj.ResourceVersion = item.ResourceVersion
expectedObj.Namespace = item.Namespace
expectedObj.UID = item.UID
expectedObj.CreationTimestamp = item.CreationTimestamp
if !assert.True(reflect.DeepEqual(item, expectedObj)) {
t.Errorf("expected:\n%v\nsaw:\n%v\n", expectedObj, item)
}
thirdPartyObj := extensions.ThirdPartyResourceData{}
s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig)
defer destroyFunc()
err = s.Get(context.TODO(), etcdtest.AddPrefix("/ThirdPartyResourceData/company.com/foos/default/test"), &thirdPartyObj, false)
if !assert.NoError(err) {
t.FailNow()
}
item = Foo{}
assert.NoError(json.Unmarshal(thirdPartyObj.Data, &item))
if !assert.True(reflect.DeepEqual(item, inputObj)) {
t.Errorf("expected:\n%v\nsaw:\n%v\n", inputObj, item)
}
}
func TestInstallThirdPartyAPIDelete(t *testing.T) {
for _, version := range versionsToTest {
testInstallThirdPartyAPIDeleteVersion(t, version)
}
}
func testInstallThirdPartyAPIDeleteVersion(t *testing.T, version string) {
master, etcdserver, server, assert := initThirdParty(t, version, "foo.company.com")
defer server.Close()
defer etcdserver.Terminate(t)
expectedObj := Foo{
ObjectMeta: api.ObjectMeta{
Name: "test",
Namespace: "default",
},
TypeMeta: unversioned.TypeMeta{
Kind: "Foo",
},
SomeField: "test field",
OtherField: 10,
}
s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig)
defer destroyFunc()
if !assert.NoError(createThirdPartyObject(s, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) {
t.FailNow()
return
}
resp, err := http.Get(server.URL + "/apis/company.com/" + version + "/namespaces/default/foos/test")
if !assert.NoError(err) {
return
}
assert.Equal(http.StatusOK, resp.StatusCode)
item := Foo{}
assert.NoError(decodeResponse(resp, &item))
// Fill in fields set by the apiserver
expectedObj.SelfLink = item.SelfLink
expectedObj.ResourceVersion = item.ResourceVersion
expectedObj.Namespace = item.Namespace
if !assert.True(reflect.DeepEqual(item, expectedObj)) {
t.Errorf("expected:\n%v\nsaw:\n%v\n", expectedObj, item)
}
resp, err = httpDelete(server.URL + "/apis/company.com/" + version + "/namespaces/default/foos/test")
if !assert.NoError(err) {
return
}
assert.Equal(http.StatusOK, resp.StatusCode)
resp, err = http.Get(server.URL + "/apis/company.com/" + version + "/namespaces/default/foos/test")
if !assert.NoError(err) {
return
}
assert.Equal(http.StatusNotFound, resp.StatusCode)
expectedDeletedKey := etcdtest.AddPrefix("ThirdPartyResourceData/company.com/foos/default/test")
thirdPartyObj := extensions.ThirdPartyResourceData{}
err = s.Get(context.TODO(), expectedDeletedKey, &thirdPartyObj, false)
if !storage.IsNotFound(err) {
t.Errorf("expected deletion didn't happen: %v", err)
}
}
func httpDelete(url string) (*http.Response, error) {
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
return nil, err
}
client := &http.Client{}
return client.Do(req)
}
func TestInstallThirdPartyAPIListOptions(t *testing.T) {
for _, version := range versionsToTest {
testInstallThirdPartyAPIListOptionsForVersion(t, version)
}
}
func testInstallThirdPartyAPIListOptionsForVersion(t *testing.T, version string) {
_, etcdserver, server, assert := initThirdParty(t, version, "foo.company.com")
defer server.Close()
defer etcdserver.Terminate(t)
// send a GET request with query parameter
resp, err := httpGetWithRV(server.URL + "/apis/company.com/" + version + "/namespaces/default/foos")
if !assert.NoError(err) {
t.Fatalf("unexpected error: %v", err)
}
assert.Equal(http.StatusOK, resp.StatusCode)
}
func httpGetWithRV(url string) (*http.Response, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
q := req.URL.Query()
// resourceversion is part of a ListOptions
q.Add("resourceversion", "0")
req.URL.RawQuery = q.Encode()
client := &http.Client{}
return client.Do(req)
}
func TestInstallThirdPartyResourceRemove(t *testing.T) {
for _, version := range versionsToTest {
testInstallThirdPartyResourceRemove(t, version)
}
}
func testInstallThirdPartyResourceRemove(t *testing.T, version string) {
master, etcdserver, server, assert := initThirdParty(t, version, "foo.company.com")
defer server.Close()
defer etcdserver.Terminate(t)
expectedObj := Foo{
ObjectMeta: api.ObjectMeta{
Name: "test",
},
TypeMeta: unversioned.TypeMeta{
Kind: "Foo",
},
SomeField: "test field",
OtherField: 10,
}
s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig)
defer destroyFunc()
if !assert.NoError(createThirdPartyObject(s, "/ThirdPartyResourceData/company.com/foos/default/test", "test", expectedObj)) {
t.FailNow()
return
}
secondObj := expectedObj
secondObj.Name = "bar"
if !assert.NoError(createThirdPartyObject(s, "/ThirdPartyResourceData/company.com/foos/default/bar", "bar", secondObj)) {
t.FailNow()
return
}
resp, err := http.Get(server.URL + "/apis/company.com/" + version + "/namespaces/default/foos/test")
if !assert.NoError(err) {
t.FailNow()
return
}
if resp.StatusCode != http.StatusOK {
t.Errorf("unexpected status: %v", resp)
}
item := Foo{}
if err := decodeResponse(resp, &item); err != nil {
t.Errorf("unexpected error: %v", err)
}
// TODO: validate etcd set things here
item.ObjectMeta = expectedObj.ObjectMeta
if !assert.True(reflect.DeepEqual(item, expectedObj)) {
t.Errorf("expected:\n%v\nsaw:\n%v\n", expectedObj, item)
}
path := extensionsrest.MakeThirdPartyPath("company.com")
master.RemoveThirdPartyResource(path + "/foos")
resp, err = http.Get(server.URL + "/apis/company.com/" + version + "/namespaces/default/foos/test")
if !assert.NoError(err) {
return
}
if resp.StatusCode != http.StatusNotFound {
t.Errorf("unexpected status: %v", resp)
}
expectedDeletedKeys := []string{
etcdtest.AddPrefix("/ThirdPartyResourceData/company.com/foos/default/test"),
etcdtest.AddPrefix("/ThirdPartyResourceData/company.com/foos/default/bar"),
}
for _, key := range expectedDeletedKeys {
thirdPartyObj := extensions.ThirdPartyResourceData{}
s, destroyFunc := generic.NewRawStorage(master.thirdPartyStorageConfig)
err := s.Get(context.TODO(), key, &thirdPartyObj, false)
if !storage.IsNotFound(err) {
t.Errorf("expected deletion didn't happen: %v", err)
}
destroyFunc()
}
installed := master.ListThirdPartyResources()
if len(installed) != 0 {
t.Errorf("Resource(s) still installed: %v", installed)
}
services := master.HandlerContainer.RegisteredWebServices()
for ix := range services {
if strings.HasPrefix(services[ix].RootPath(), "/apis/company.com") {
t.Errorf("Web service still installed at %s: %#v", services[ix].RootPath(), services[ix])
}
}
}
func TestThirdPartyDiscovery(t *testing.T) {
for _, version := range versionsToTest {
testThirdPartyDiscovery(t, version)
}
}
type FakeTunneler struct {
SecondsSinceSyncValue int64
SecondsSinceSSHKeySyncValue int64
@ -1203,48 +514,14 @@ func TestIsTunnelSyncHealthy(t *testing.T) {
assert.Error(err, "IsTunnelSyncHealthy() should have returned an error.")
}
func testThirdPartyDiscovery(t *testing.T, version string) {
_, etcdserver, server, assert := initThirdParty(t, version, "foo.company.com")
defer server.Close()
defer etcdserver.Terminate(t)
func writeResponseToFile(resp *http.Response, filename string) error {
defer resp.Body.Close()
resp, err := http.Get(server.URL + "/apis/company.com/")
if !assert.NoError(err) {
return
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
assert.Equal(http.StatusOK, resp.StatusCode)
group := unversioned.APIGroup{}
assert.NoError(decodeResponse(resp, &group))
assert.Equal(group.APIVersion, "v1")
assert.Equal(group.Kind, "APIGroup")
assert.Equal(group.Name, "company.com")
expectedVersion := unversioned.GroupVersionForDiscovery{
GroupVersion: "company.com/" + version,
Version: version,
}
assert.Equal(group.Versions, []unversioned.GroupVersionForDiscovery{expectedVersion})
assert.Equal(group.PreferredVersion, expectedVersion)
resp, err = http.Get(server.URL + "/apis/company.com/" + version)
if !assert.NoError(err) {
return
}
assert.Equal(http.StatusOK, resp.StatusCode)
resourceList := unversioned.APIResourceList{}
assert.NoError(decodeResponse(resp, &resourceList))
assert.Equal(resourceList.APIVersion, "v1")
assert.Equal(resourceList.Kind, "APIResourceList")
assert.Equal(resourceList.GroupVersion, "company.com/"+version)
assert.Equal(resourceList.APIResources, []unversioned.APIResource{
{
Name: "foos",
Namespaced: true,
Kind: "Foo",
},
})
return ioutil.WriteFile(filename, data, 0755)
}
// TestValidOpenAPISpec verifies that the open api is added

View File

@ -0,0 +1,226 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rest
import (
"fmt"
"net"
"net/http"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apiserver"
"k8s.io/kubernetes/pkg/genericapiserver"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/registry/core/componentstatus"
configmapetcd "k8s.io/kubernetes/pkg/registry/core/configmap/etcd"
controlleretcd "k8s.io/kubernetes/pkg/registry/core/controller/etcd"
"k8s.io/kubernetes/pkg/registry/core/endpoint"
endpointsetcd "k8s.io/kubernetes/pkg/registry/core/endpoint/etcd"
eventetcd "k8s.io/kubernetes/pkg/registry/core/event/etcd"
limitrangeetcd "k8s.io/kubernetes/pkg/registry/core/limitrange/etcd"
"k8s.io/kubernetes/pkg/registry/core/namespace"
namespaceetcd "k8s.io/kubernetes/pkg/registry/core/namespace/etcd"
"k8s.io/kubernetes/pkg/registry/core/node"
nodeetcd "k8s.io/kubernetes/pkg/registry/core/node/etcd"
pvetcd "k8s.io/kubernetes/pkg/registry/core/persistentvolume/etcd"
pvcetcd "k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/etcd"
podetcd "k8s.io/kubernetes/pkg/registry/core/pod/etcd"
podtemplateetcd "k8s.io/kubernetes/pkg/registry/core/podtemplate/etcd"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
resourcequotaetcd "k8s.io/kubernetes/pkg/registry/core/resourcequota/etcd"
secretetcd "k8s.io/kubernetes/pkg/registry/core/secret/etcd"
"k8s.io/kubernetes/pkg/registry/core/service"
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
etcdallocator "k8s.io/kubernetes/pkg/registry/core/service/allocator/etcd"
serviceetcd "k8s.io/kubernetes/pkg/registry/core/service/etcd"
ipallocator "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
serviceaccountetcd "k8s.io/kubernetes/pkg/registry/core/serviceaccount/etcd"
utilnet "k8s.io/kubernetes/pkg/util/net"
)
// LegacyRESTStorageProvider provides information needed to build RESTStorage for core, but
// does NOT implement the "normal" RESTStorageProvider (yet!)
type LegacyRESTStorageProvider struct {
StorageFactory genericapiserver.StorageFactory
// Used for custom proxy dialing, and proxy TLS options
ProxyTransport http.RoundTripper
KubeletClient kubeletclient.KubeletClient
EventTTL time.Duration
// ServiceClusterIPRange is used to build cluster IPs for discovery.
ServiceClusterIPRange *net.IPNet
ServiceNodePortRange utilnet.PortRange
// ComponentStatusServerFunc is a func used to locate servers to back component status
ComponentStatusServerFunc ComponentStatusServerFunc
}
type ComponentStatusServerFunc func() map[string]apiserver.Server
// LegacyRESTStorage returns stateful information about particular instances of REST storage to
// master.go for wiring controllers.
// TODO remove this by running the controller as a poststarthook
type LegacyRESTStorage struct {
NodeRegistry node.Registry
NamespaceRegistry namespace.Registry
ServiceRegistry service.Registry
EndpointRegistry endpoint.Registry
ServiceClusterIPAllocator rangeallocation.RangeRegistry
ServiceNodePortAllocator rangeallocation.RangeRegistry
}
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter genericapiserver.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.APIGroupInfo{
GroupMeta: *registered.GroupOrDie(api.GroupName),
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
IsLegacyGroup: true,
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
SubresourceGroupVersionKind: map[string]unversioned.GroupVersionKind{},
}
if autoscalingGroupVersion := (unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}); registered.IsEnabledVersion(autoscalingGroupVersion) {
apiGroupInfo.SubresourceGroupVersionKind["replicationcontrollers/scale"] = autoscalingGroupVersion.WithKind("Scale")
}
if policyGroupVersion := (unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}); registered.IsEnabledVersion(policyGroupVersion) {
apiGroupInfo.SubresourceGroupVersionKind["pods/eviction"] = policyGroupVersion.WithKind("Eviction")
}
restStorage := LegacyRESTStorage{}
podTemplateStorage := podtemplateetcd.NewREST(restOptionsGetter(api.Resource("podTemplates")))
eventStorage := eventetcd.NewREST(restOptionsGetter(api.Resource("events")), uint64(c.EventTTL.Seconds()))
limitRangeStorage := limitrangeetcd.NewREST(restOptionsGetter(api.Resource("limitRanges")))
resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(restOptionsGetter(api.Resource("resourceQuotas")))
secretStorage := secretetcd.NewREST(restOptionsGetter(api.Resource("secrets")))
serviceAccountStorage := serviceaccountetcd.NewREST(restOptionsGetter(api.Resource("serviceAccounts")))
persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(restOptionsGetter(api.Resource("persistentVolumes")))
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(restOptionsGetter(api.Resource("persistentVolumeClaims")))
configMapStorage := configmapetcd.NewREST(restOptionsGetter(api.Resource("configMaps")))
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(restOptionsGetter(api.Resource("namespaces")))
restStorage.NamespaceRegistry = namespace.NewRegistry(namespaceStorage)
endpointsStorage := endpointsetcd.NewREST(restOptionsGetter(api.Resource("endpoints")))
restStorage.EndpointRegistry = endpoint.NewRegistry(endpointsStorage)
nodeStorage := nodeetcd.NewStorage(restOptionsGetter(api.Resource("nodes")), c.KubeletClient, c.ProxyTransport)
restStorage.NodeRegistry = node.NewRegistry(nodeStorage.Node)
podStorage := podetcd.NewStorage(
restOptionsGetter(api.Resource("pods")),
kubeletclient.ConnectionInfoGetter(nodeStorage.Node),
c.ProxyTransport,
)
serviceRESTStorage, serviceStatusStorage := serviceetcd.NewREST(restOptionsGetter(api.Resource("services")))
restStorage.ServiceRegistry = service.NewRegistry(serviceRESTStorage)
var serviceClusterIPRegistry rangeallocation.RangeRegistry
serviceClusterIPRange := c.ServiceClusterIPRange
if serviceClusterIPRange == nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("service clusterIPRange is nil")
}
serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
ServiceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), serviceStorageConfig)
serviceClusterIPRegistry = etcd
return etcd
})
restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry
var serviceNodePortRegistry rangeallocation.RangeRegistry
ServiceNodePortAllocator := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", api.Resource("servicenodeportallocations"), serviceStorageConfig)
serviceNodePortRegistry = etcd
return etcd
})
restStorage.ServiceNodePortAllocator = serviceNodePortRegistry
controllerStorage := controlleretcd.NewStorage(restOptionsGetter(api.Resource("replicationControllers")))
serviceRest := service.NewStorage(restStorage.ServiceRegistry, restStorage.EndpointRegistry, ServiceClusterIPAllocator, ServiceNodePortAllocator, c.ProxyTransport)
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
"pods/binding": podStorage.Binding,
"bindings": podStorage.Binding,
"podTemplates": podTemplateStorage,
"replicationControllers": controllerStorage.Controller,
"replicationControllers/status": controllerStorage.Status,
"services": serviceRest.Service,
"services/proxy": serviceRest.Proxy,
"services/status": serviceStatusStorage,
"endpoints": endpointsStorage,
"nodes": nodeStorage.Node,
"nodes/status": nodeStorage.Status,
"nodes/proxy": nodeStorage.Proxy,
"events": eventStorage,
"limitRanges": limitRangeStorage,
"resourceQuotas": resourceQuotaStorage,
"resourceQuotas/status": resourceQuotaStatusStorage,
"namespaces": namespaceStorage,
"namespaces/status": namespaceStatusStorage,
"namespaces/finalize": namespaceFinalizeStorage,
"secrets": secretStorage,
"serviceAccounts": serviceAccountStorage,
"persistentVolumes": persistentVolumeStorage,
"persistentVolumes/status": persistentVolumeStatusStorage,
"persistentVolumeClaims": persistentVolumeClaimStorage,
"persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
"configMaps": configMapStorage,
"componentStatuses": componentstatus.NewStorage(c.ComponentStatusServerFunc),
}
if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}) {
restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
}
if registered.IsEnabledVersion(unversioned.GroupVersion{Group: "policy", Version: "v1alpha1"}) {
restStorageMap["pods/eviction"] = podStorage.Eviction
}
apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap
return restStorage, apiGroupInfo, nil
}

View File

@ -51,7 +51,7 @@ type ThirdPartyController struct {
thirdPartyResourceRegistry *thirdpartyresourceetcd.REST
}
// Synchronize a single resource with RESTful resources on the master
// SyncOneResource synchronizes a single resource with RESTful resources on the master
func (t *ThirdPartyController) SyncOneResource(rsrc *extensions.ThirdPartyResource) error {
// TODO: we also need to test if the existing installed resource matches the resource we are sync-ing.
// Currently, if there is an older, incompatible resource installed, we won't remove it. We should detect

View File

@ -0,0 +1,229 @@
// +build integration,!no-etcd
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package thirdparty
// This file contains tests for the storage classes API resource.
import (
"encoding/json"
"reflect"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/integration/framework"
)
// TODO these tests will eventually be runnable in a single test
func TestThirdPartyDelete(t *testing.T) {
_, s := framework.RunAMaster(framework.NewIntegrationTestMasterConfig())
defer s.Close()
clientConfig := &restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{NegotiatedSerializer: api.Codecs}}
client := internalclientset.NewForConfigOrDie(clientConfig)
DoTestInstallThirdPartyAPIDelete(t, client, clientConfig)
}
func TestThirdPartyMultiple(t *testing.T) {
_, s := framework.RunAMaster(framework.NewIntegrationTestMasterConfig())
defer s.Close()
clientConfig := &restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{NegotiatedSerializer: api.Codecs}}
client := internalclientset.NewForConfigOrDie(clientConfig)
DoTestInstallMultipleAPIs(t, client, clientConfig)
}
// TODO make multiple versions work. they've been broken
var versionsToTest = []string{"v1"}
type Foo struct {
unversioned.TypeMeta `json:",inline"`
api.ObjectMeta `json:"metadata,omitempty" description:"standard object metadata"`
SomeField string `json:"someField"`
OtherField int `json:"otherField"`
}
type FooList struct {
unversioned.TypeMeta `json:",inline"`
unversioned.ListMeta `json:"metadata,omitempty" description:"standard list metadata; see http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata"`
Items []Foo `json:"items"`
}
// installThirdParty installs a third party resoure and returns a defer func
func installThirdParty(t *testing.T, client internalclientset.Interface, clientConfig *restclient.Config, tpr *extensions.ThirdPartyResource, group, version, resource string) func() {
var err error
_, err = client.Extensions().ThirdPartyResources().Create(tpr)
if err != nil {
t.Fatal(err)
}
fooClientConfig := *clientConfig
fooClientConfig.APIPath = "apis"
fooClientConfig.GroupVersion = &unversioned.GroupVersion{Group: group, Version: version}
fooClient, err := restclient.RESTClientFor(&fooClientConfig)
if err != nil {
t.Fatal(err)
}
err = wait.Poll(100*time.Millisecond, 60*time.Second, func() (bool, error) {
_, err := fooClient.Get().Namespace("default").Resource(resource).DoRaw()
if err == nil {
return true, nil
}
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
})
if err != nil {
t.Fatal(err)
}
return func() {
client.Extensions().ThirdPartyResources().Delete(tpr.Name, nil)
err = wait.Poll(100*time.Millisecond, 60*time.Second, func() (bool, error) {
_, err := fooClient.Get().Namespace("default").Resource(resource).DoRaw()
if apierrors.IsNotFound(err) {
return true, nil
}
return false, err
})
if err != nil {
t.Fatal(err)
}
}
}
func DoTestInstallMultipleAPIs(t *testing.T, client internalclientset.Interface, clientConfig *restclient.Config) {
group := "company.com"
version := "v1"
defer installThirdParty(t, client, clientConfig,
&extensions.ThirdPartyResource{
ObjectMeta: api.ObjectMeta{Name: "foo.company.com"},
Versions: []extensions.APIVersion{{Name: version}},
}, group, version, "foos",
)()
// TODO make multiple resources in one version work
// defer installThirdParty(t, client, clientConfig,
// &extensions.ThirdPartyResource{
// ObjectMeta: api.ObjectMeta{Name: "bar.company.com"},
// Versions: []extensions.APIVersion{{Name: version}},
// }, group, version, "bars",
// )()
}
func DoTestInstallThirdPartyAPIDelete(t *testing.T, client internalclientset.Interface, clientConfig *restclient.Config) {
for _, version := range versionsToTest {
testInstallThirdPartyAPIDeleteVersion(t, client, clientConfig, version)
}
}
func testInstallThirdPartyAPIDeleteVersion(t *testing.T, client internalclientset.Interface, clientConfig *restclient.Config, version string) {
group := "company.com"
defer installThirdParty(t, client, clientConfig,
&extensions.ThirdPartyResource{
ObjectMeta: api.ObjectMeta{Name: "foo.company.com"},
Versions: []extensions.APIVersion{{Name: version}},
}, group, version, "foos",
)()
fooClientConfig := *clientConfig
fooClientConfig.APIPath = "apis"
fooClientConfig.GroupVersion = &unversioned.GroupVersion{Group: group, Version: version}
fooClient, err := restclient.RESTClientFor(&fooClientConfig)
if err != nil {
t.Fatal(err)
}
expectedObj := Foo{
ObjectMeta: api.ObjectMeta{
Name: "test",
Namespace: "default",
},
TypeMeta: unversioned.TypeMeta{
Kind: "Foo",
},
SomeField: "test field",
OtherField: 10,
}
objBytes, err := json.Marshal(&expectedObj)
if err != nil {
t.Fatal(err)
}
if _, err := fooClient.Post().Namespace("default").Resource("foos").Body(objBytes).DoRaw(); err != nil {
t.Fatal(err)
}
apiBytes, err := fooClient.Get().Namespace("default").Resource("foos").Name("test").DoRaw()
if err != nil {
t.Fatal(err)
}
item := Foo{}
err = json.Unmarshal(apiBytes, &item)
if err != nil {
t.Fatal(err)
}
// Fill in fields set by the apiserver
item.SelfLink = expectedObj.SelfLink
item.ResourceVersion = expectedObj.ResourceVersion
item.Namespace = expectedObj.Namespace
item.UID = expectedObj.UID
item.CreationTimestamp = expectedObj.CreationTimestamp
if !reflect.DeepEqual(item, expectedObj) {
t.Fatalf("expected:\n%v\n", diff.ObjectGoPrintSideBySide(expectedObj, item))
}
listBytes, err := fooClient.Get().Namespace("default").Resource("foos").DoRaw()
if err != nil {
t.Fatal(err)
}
list := FooList{}
err = json.Unmarshal(listBytes, &list)
if err != nil {
t.Fatal(err)
}
if len(list.Items) != 1 {
t.Fatalf("wrong item: %v", list)
}
if _, err := fooClient.Delete().Namespace("default").Resource("foos").Name("test").DoRaw(); err != nil {
t.Fatal(err)
}
if _, err := fooClient.Get().Namespace("default").Resource("foos").Name("test").DoRaw(); !apierrors.IsNotFound(err) {
t.Fatal(err)
}
}