diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go index 05eac20d183..c291f6dd750 100644 --- a/cmd/kube-apiserver/app/options/options.go +++ b/cmd/kube-apiserver/app/options/options.go @@ -31,7 +31,7 @@ import ( "k8s.io/kubernetes/pkg/genericapiserver" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/master/ports" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" + "k8s.io/kubernetes/pkg/storage/storagebackend" "github.com/spf13/pflag" ) @@ -50,7 +50,7 @@ type APIServer struct { DeleteCollectionWorkers int DeprecatedStorageVersion string EtcdServersOverrides []string - EtcdConfig etcdstorage.EtcdConfig + StorageConfig storagebackend.Config EventTTL time.Duration KeystoneURL string KubeletConfig kubeletclient.KubeletClientConfig @@ -81,7 +81,7 @@ func NewAPIServer() *APIServer { AdmissionControl: "AlwaysAdmit", AuthorizationMode: "AlwaysAllow", DeleteCollectionWorkers: 1, - EtcdConfig: etcdstorage.EtcdConfig{ + StorageConfig: storagebackend.Config{ Prefix: genericapiserver.DefaultEtcdPathPrefix, DeserializationCacheSize: genericapiserver.DefaultDeserializationCacheSize, }, @@ -183,14 +183,15 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.AuthorizationConfig.WebhookConfigFile, "authorization-webhook-config-file", s.AuthorizationConfig.WebhookConfigFile, "File with webhook configuration in kubeconfig format, used with --authorization-mode=Webhook. The API server will query the remote service to determine access on the API server's secure port.") fs.StringVar(&s.AdmissionControl, "admission-control", s.AdmissionControl, "Ordered list of plug-ins to do admission control of resources into cluster. Comma-delimited list of: "+strings.Join(admission.GetPlugins(), ", ")) fs.StringVar(&s.AdmissionControlConfigFile, "admission-control-config-file", s.AdmissionControlConfigFile, "File with admission control configuration.") - fs.StringSliceVar(&s.EtcdConfig.ServerList, "etcd-servers", s.EtcdConfig.ServerList, "List of etcd servers to watch (http://ip:port), comma separated.") + fs.StringVar(&s.StorageConfig.Type, "storage-backend", s.StorageConfig.Type, "The storage backend for persistence. Options: 'etcd2' (default), 'etcd3'.") + fs.StringSliceVar(&s.StorageConfig.ServerList, "etcd-servers", s.StorageConfig.ServerList, "List of etcd servers to connect with (http://ip:port), comma separated.") fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.") - fs.StringVar(&s.EtcdConfig.Prefix, "etcd-prefix", s.EtcdConfig.Prefix, "The prefix for all resource paths in etcd.") - fs.StringVar(&s.EtcdConfig.KeyFile, "etcd-keyfile", s.EtcdConfig.KeyFile, "SSL key file used to secure etcd communication") - fs.StringVar(&s.EtcdConfig.CertFile, "etcd-certfile", s.EtcdConfig.CertFile, "SSL certification file used to secure etcd communication") - fs.StringVar(&s.EtcdConfig.CAFile, "etcd-cafile", s.EtcdConfig.CAFile, "SSL Certificate Authority file used to secure etcd communication") - fs.BoolVar(&s.EtcdConfig.Quorum, "etcd-quorum-read", s.EtcdConfig.Quorum, "If true, enable quorum read") - fs.IntVar(&s.EtcdConfig.DeserializationCacheSize, "deserialization-cache-size", s.EtcdConfig.DeserializationCacheSize, "Number of deserialized json objects to cache in memory.") + fs.StringVar(&s.StorageConfig.Prefix, "etcd-prefix", s.StorageConfig.Prefix, "The prefix for all resource paths in etcd.") + fs.StringVar(&s.StorageConfig.KeyFile, "etcd-keyfile", s.StorageConfig.KeyFile, "SSL key file used to secure etcd communication") + fs.StringVar(&s.StorageConfig.CertFile, "etcd-certfile", s.StorageConfig.CertFile, "SSL certification file used to secure etcd communication") + fs.StringVar(&s.StorageConfig.CAFile, "etcd-cafile", s.StorageConfig.CAFile, "SSL Certificate Authority file used to secure etcd communication") + fs.BoolVar(&s.StorageConfig.Quorum, "etcd-quorum-read", s.StorageConfig.Quorum, "If true, enable quorum read") + fs.IntVar(&s.StorageConfig.DeserializationCacheSize, "deserialization-cache-size", s.StorageConfig.DeserializationCacheSize, "Number of deserialized json objects to cache in memory.") fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.") fs.StringVar(&s.MasterServiceNamespace, "master-service-namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods") fs.IntVar(&s.DeleteCollectionWorkers, "delete-collection-workers", s.DeleteCollectionWorkers, "Number of workers spawned for DeleteCollection call. These are used to speed up namespace cleanup.") diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index c90296e4286..b27e2c67279 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -80,7 +80,7 @@ cluster's shared state through which all other components interact.`, func Run(s *options.APIServer) error { genericapiserver.DefaultAndValidateRunOptions(s.ServerRunOptions) - if len(s.EtcdConfig.ServerList) == 0 { + if len(s.StorageConfig.ServerList) == 0 { glog.Fatalf("--etcd-servers must be specified") } @@ -173,7 +173,7 @@ func Run(s *options.APIServer) error { resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal}) } - storageFactory := genericapiserver.NewDefaultStorageFactory(s.EtcdConfig, api.Codecs, resourceEncoding, apiResourceConfigSource) + storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, api.Codecs, resourceEncoding, apiResourceConfigSource) storageFactory.AddCohabitatingResources(batch.Resource("jobs"), extensions.Resource("jobs")) storageFactory.AddCohabitatingResources(autoscaling.Resource("horizontalpodautoscalers"), extensions.Resource("horizontalpodautoscalers")) for _, override := range s.EtcdServersOverrides { diff --git a/docs/admin/kube-apiserver.md b/docs/admin/kube-apiserver.md index 71c5c6c60a4..e4c0c5af6db 100644 --- a/docs/admin/kube-apiserver.md +++ b/docs/admin/kube-apiserver.md @@ -74,7 +74,7 @@ kube-apiserver --etcd-keyfile="": SSL key file used to secure etcd communication --etcd-prefix="/registry": The prefix for all resource paths in etcd. --etcd-quorum-read[=false]: If true, enable quorum read - --etcd-servers=[]: List of etcd servers to watch (http://ip:port), comma separated. + --etcd-servers=[]: List of etcd servers to connect with (http://ip:port), comma separated. --etcd-servers-overrides=[]: Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated. --event-ttl=1h0m0s: Amount of time to retain events. Default 1 hour. --experimental-keystone-url="": If passed, activates the keystone authentication plugin @@ -109,6 +109,7 @@ kube-apiserver --service-node-port-range=: A port range to reserve for services with NodePort visibility. Example: '30000-32767'. Inclusive at both ends of the range. --ssh-keyfile="": If non-empty, use secure SSH proxy to the nodes, using this user keyfile --ssh-user="": If non-empty, use secure SSH proxy to the nodes, using this user name + --storage-backend="": The storage backend for persistence. Options: 'etcd2' (default), 'etcd3'. --storage-versions="apps/v1alpha1,authorization.k8s.io/v1beta1,autoscaling/v1,batch/v1,componentconfig/v1alpha1,extensions/v1beta1,metrics/v1alpha1,v1": The per-group version to store resources in. Specified in the format "group1/version1,group2/version2,...". In the case where objects are moved from one group to the other, you may specify the format "group1=group2/v1beta1,group3/v1beta1,...". You only need to pass the groups you wish to change from the defaults. It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable. --tls-cert-file="": File containing x509 Certificate for HTTPS. (CA cert, if any, concatenated after server cert). If HTTPS serving is enabled, and --tls-cert-file and --tls-private-key-file are not provided, a self-signed certificate and key are generated for the public address and saved to /var/run/kubernetes. --tls-private-key-file="": File containing x509 private key matching --tls-cert-file. @@ -117,7 +118,7 @@ kube-apiserver --watch-cache-sizes=[]: List of watch cache sizes for every resource (pods, nodes, etc.), comma separated. The individual override format: resource#size, where size is a number. It takes effect when watch-cache is enabled. ``` -###### Auto generated by spf13/cobra on 16-Apr-2016 +###### Auto generated by spf13/cobra on 28-Apr-2016 diff --git a/examples/apiserver/apiserver.go b/examples/apiserver/apiserver.go index 4f1cea9b889..d2b875dcd4b 100644 --- a/examples/apiserver/apiserver.go +++ b/examples/apiserver/apiserver.go @@ -27,7 +27,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/genericapiserver" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" + "k8s.io/kubernetes/pkg/storage/storagebackend" // Install the testgroup API _ "k8s.io/kubernetes/cmd/libs/go2idl/client-gen/testdata/apis/testgroup.k8s.io/install" @@ -41,11 +41,11 @@ const ( ) func newStorageFactory() genericapiserver.StorageFactory { - etcdConfig := etcdstorage.EtcdConfig{ + config := storagebackend.Config{ Prefix: genericapiserver.DefaultEtcdPathPrefix, ServerList: []string{"http://127.0.0.1:4001"}, } - storageFactory := genericapiserver.NewDefaultStorageFactory(etcdConfig, api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig()) + storageFactory := genericapiserver.NewDefaultStorageFactory(config, api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig()) return storageFactory } diff --git a/federation/cmd/federated-apiserver/app/options/options.go b/federation/cmd/federated-apiserver/app/options/options.go index 0ab9ab391af..5656447b0aa 100644 --- a/federation/cmd/federated-apiserver/app/options/options.go +++ b/federation/cmd/federated-apiserver/app/options/options.go @@ -32,7 +32,7 @@ import ( "k8s.io/kubernetes/pkg/genericapiserver" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/master/ports" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" + "k8s.io/kubernetes/pkg/storage/storagebackend" "k8s.io/kubernetes/pkg/util/config" utilnet "k8s.io/kubernetes/pkg/util/net" @@ -61,7 +61,7 @@ type APIServer struct { EnableWatchCache bool EnableSwaggerUI bool EtcdServersOverrides []string - EtcdConfig etcdstorage.EtcdConfig + StorageConfig storagebackend.Config EventTTL time.Duration ExternalHost string KeystoneURL string @@ -100,7 +100,7 @@ func NewAPIServer() *APIServer { AuthorizationMode: "AlwaysAllow", DeleteCollectionWorkers: 1, EnableLogsSupport: true, - EtcdConfig: etcdstorage.EtcdConfig{ + StorageConfig: storagebackend.Config{ Prefix: genericapiserver.DefaultEtcdPathPrefix, }, EventTTL: 1 * time.Hour, @@ -234,13 +234,14 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.AuthorizationConfig.WebhookConfigFile, "authorization-webhook-config-file", s.AuthorizationConfig.WebhookConfigFile, "File with webhook configuration in kubeconfig format, used with --authorization-mode=Webhook. The API server will query the remote service to determine access on the API server's secure port.") fs.StringVar(&s.AdmissionControl, "admission-control", s.AdmissionControl, "Ordered list of plug-ins to do admission control of resources into cluster. Comma-delimited list of: "+strings.Join(admission.GetPlugins(), ", ")) fs.StringVar(&s.AdmissionControlConfigFile, "admission-control-config-file", s.AdmissionControlConfigFile, "File with admission control configuration.") - fs.StringSliceVar(&s.EtcdConfig.ServerList, "etcd-servers", s.EtcdConfig.ServerList, "List of etcd servers to watch (http://ip:port), comma separated.") + fs.StringVar(&s.StorageConfig.Type, "storage-backend", s.StorageConfig.Type, "The storage backend for persistence. Options: 'etcd2', 'etcd3'.") + fs.StringSliceVar(&s.StorageConfig.ServerList, "etcd-servers", s.StorageConfig.ServerList, "List of etcd servers to watch (http://ip:port), comma separated.") fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.") - fs.StringVar(&s.EtcdConfig.Prefix, "etcd-prefix", s.EtcdConfig.Prefix, "The prefix for all resource paths in etcd.") - fs.StringVar(&s.EtcdConfig.KeyFile, "etcd-keyfile", s.EtcdConfig.KeyFile, "SSL key file used to secure etcd communication") - fs.StringVar(&s.EtcdConfig.CertFile, "etcd-certfile", s.EtcdConfig.CertFile, "SSL certification file used to secure etcd communication") - fs.StringVar(&s.EtcdConfig.CAFile, "etcd-cafile", s.EtcdConfig.CAFile, "SSL Certificate Authority file used to secure etcd communication") - fs.BoolVar(&s.EtcdConfig.Quorum, "etcd-quorum-read", s.EtcdConfig.Quorum, "If true, enable quorum read") + fs.StringVar(&s.StorageConfig.Prefix, "etcd-prefix", s.StorageConfig.Prefix, "The prefix for all resource paths in etcd.") + fs.StringVar(&s.StorageConfig.KeyFile, "etcd-keyfile", s.StorageConfig.KeyFile, "SSL key file used to secure etcd communication") + fs.StringVar(&s.StorageConfig.CertFile, "etcd-certfile", s.StorageConfig.CertFile, "SSL certification file used to secure etcd communication") + fs.StringVar(&s.StorageConfig.CAFile, "etcd-cafile", s.StorageConfig.CAFile, "SSL Certificate Authority file used to secure etcd communication") + fs.BoolVar(&s.StorageConfig.Quorum, "etcd-quorum-read", s.StorageConfig.Quorum, "If true, enable quorum read") fs.StringSliceVar(&s.CorsAllowedOriginList, "cors-allowed-origins", s.CorsAllowedOriginList, "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.") fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.") fs.IPNetVar(&s.ServiceClusterIPRange, "service-cluster-ip-range", s.ServiceClusterIPRange, "A CIDR notation IP range from which to assign service cluster IPs. This must not overlap with any IP ranges assigned to nodes for pods.") diff --git a/federation/cmd/federated-apiserver/app/server.go b/federation/cmd/federated-apiserver/app/server.go index 103f651464c..79aefb2b8cf 100644 --- a/federation/cmd/federated-apiserver/app/server.go +++ b/federation/cmd/federated-apiserver/app/server.go @@ -92,7 +92,7 @@ func Run(s *options.APIServer) error { } glog.Infof("Will report %v as public IP address.", s.AdvertiseAddress) - if len(s.EtcdConfig.ServerList) == 0 { + if len(s.StorageConfig.ServerList) == 0 { glog.Fatalf("--etcd-servers must be specified") } @@ -186,7 +186,7 @@ func Run(s *options.APIServer) error { resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal}) } - storageFactory := genericapiserver.NewDefaultStorageFactory(s.EtcdConfig, api.Codecs, resourceEncoding, apiResourceConfigSource) + storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, api.Codecs, resourceEncoding, apiResourceConfigSource) for _, override := range s.EtcdServersOverrides { tokens := strings.Split(override, "#") if len(tokens) != 2 { diff --git a/federation/cmd/federated-apiserver/app/server_test.go b/federation/cmd/federated-apiserver/app/server_test.go index c8d61f0a575..6b74464dd48 100644 --- a/federation/cmd/federated-apiserver/app/server_test.go +++ b/federation/cmd/federated-apiserver/app/server_test.go @@ -22,14 +22,15 @@ import ( "encoding/json" "fmt" - "github.com/stretchr/testify/assert" "io/ioutil" - fed_v1a1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1" - "k8s.io/kubernetes/federation/cmd/federated-apiserver/app/options" - "k8s.io/kubernetes/pkg/api/unversioned" "net" "net/http" "time" + + "github.com/stretchr/testify/assert" + fed_v1a1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1" + "k8s.io/kubernetes/federation/cmd/federated-apiserver/app/options" + "k8s.io/kubernetes/pkg/api/unversioned" ) func TestLongRunningRequestRegexp(t *testing.T) { @@ -82,7 +83,7 @@ func TestRun(t *testing.T) { s.InsecurePort = insecurePort _, ipNet, _ := net.ParseCIDR("10.10.10.0/24") s.ServiceClusterIPRange = *ipNet - s.EtcdConfig.ServerList = []string{"http://localhost:4001"} + s.StorageConfig.ServerList = []string{"http://localhost:4001"} go func() { if err := Run(s); err != nil { t.Fatalf("Error in bringing up the server: %v", err) diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 4bc5d56cf6d..0e584bd7896 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -391,6 +391,7 @@ start-services static-pods-config stats-port stop-services +storage-backend storage-version storage-versions streaming-connection-idle-timeout diff --git a/pkg/genericapiserver/storage_factory.go b/pkg/genericapiserver/storage_factory.go index be54b10b6e5..4e1f4c97e97 100644 --- a/pkg/genericapiserver/storage_factory.go +++ b/pkg/genericapiserver/storage_factory.go @@ -23,7 +23,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime/serializer/versioning" "k8s.io/kubernetes/pkg/storage" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" + "k8s.io/kubernetes/pkg/storage/storagebackend" "k8s.io/kubernetes/pkg/util/sets" "github.com/golang/glog" @@ -44,9 +44,9 @@ type StorageFactory interface { // 2. Resource encodings for storage: group,version,kind to store as // 3. Cohabitating default: some resources like hpa are exposed through multiple APIs. They must agree on 1 and 2 type DefaultStorageFactory struct { - // DefaultEtcdConfig describes how to connect to etcd in general. It's authentication information will be used for - // every storage.Interface returned. - DefaultEtcdConfig etcdstorage.EtcdConfig + // StorageConfig describes how to create a storage backend in general. + // Its authentication information will be used for every storage.Interface returned. + StorageConfig storagebackend.Config Overrides map[unversioned.GroupResource]groupResourceOverrides @@ -62,12 +62,12 @@ type DefaultStorageFactory struct { APIResourceConfigSource APIResourceConfigSource // newEtcdFn exists to be overwritten for unit testing. You should never set this in a normal world. - newEtcdFn func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) + newEtcdFn func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error) } type groupResourceOverrides struct { // etcdLocation contains the list of "special" locations that are used for particular GroupResources - // These are merged on top of the default DefaultEtcdConfig when requesting the storage.Interface for a given GroupResource + // These are merged on top of the StorageConfig when requesting the storage.Interface for a given GroupResource etcdLocation []string // etcdPrefix contains the list of "special" prefixes for a GroupResource. Resource=* means for the entire group etcdPrefix string @@ -83,9 +83,9 @@ var _ StorageFactory = &DefaultStorageFactory{} const AllResources = "*" -func NewDefaultStorageFactory(defaultEtcdConfig etcdstorage.EtcdConfig, defaultSerializer runtime.NegotiatedSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory { +func NewDefaultStorageFactory(config storagebackend.Config, defaultSerializer runtime.NegotiatedSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory { return &DefaultStorageFactory{ - DefaultEtcdConfig: defaultEtcdConfig, + StorageConfig: config, Overrides: map[unversioned.GroupResource]groupResourceOverrides{}, DefaultSerializer: defaultSerializer, ResourceEncodingConfig: resourceEncodingConfig, @@ -152,7 +152,7 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st overriddenEtcdLocations = exactResourceOverride.etcdLocation } - etcdPrefix := s.DefaultEtcdConfig.Prefix + etcdPrefix := s.StorageConfig.Prefix if len(groupOverride.etcdPrefix) > 0 { etcdPrefix = groupOverride.etcdPrefix } @@ -168,10 +168,10 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st etcdSerializer = exactResourceOverride.serializer } // operate on copy - etcdConfig := s.DefaultEtcdConfig - etcdConfig.Prefix = etcdPrefix + config := s.StorageConfig + config.Prefix = etcdPrefix if len(overriddenEtcdLocations) > 0 { - etcdConfig.ServerList = overriddenEtcdLocations + config.ServerList = overriddenEtcdLocations } storageEncodingVersion, err := s.ResourceEncodingConfig.StoragageEncodingFor(chosenStorageResource) @@ -183,13 +183,11 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st return nil, err } - glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, etcdConfig) - return s.newEtcdFn(etcdSerializer, storageEncodingVersion, internalVersion, etcdConfig) + glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, config) + return s.newEtcdFn(etcdSerializer, storageEncodingVersion, internalVersion, config) } -func newEtcd(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) { - var storageConfig etcdstorage.EtcdStorageConfig - storageConfig.Config = etcdConfig +func newEtcd(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error) { s, ok := ns.SerializerForMediaType("application/json", nil) if !ok { return nil, fmt.Errorf("unable to find serializer for JSON") @@ -205,14 +203,14 @@ func newEtcd(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unve return nil, fmt.Errorf("error setting up decoder from %v to %v: %v", storageVersion, memoryVersion, err) } } - storageConfig.Codec = runtime.NewCodec(encoder, decoder) - return storageConfig.NewStorage() + config.Codec = runtime.NewCodec(encoder, decoder) + return storagebackend.Create(config) } // Get all backends for all registered storage destinations. // Used for getting all instances for health validations. func (s *DefaultStorageFactory) Backends() []string { - backends := sets.NewString(s.DefaultEtcdConfig.ServerList...) + backends := sets.NewString(s.StorageConfig.ServerList...) for _, overrides := range s.Overrides { backends.Insert(overrides.etcdLocation...) diff --git a/pkg/genericapiserver/storage_factory_test.go b/pkg/genericapiserver/storage_factory_test.go index 90540681799..1f727fb086e 100644 --- a/pkg/genericapiserver/storage_factory_test.go +++ b/pkg/genericapiserver/storage_factory_test.go @@ -25,7 +25,7 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/storage" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" + "k8s.io/kubernetes/pkg/storage/storagebackend" ) func TestUpdateEtcdOverrides(t *testing.T) { @@ -49,17 +49,17 @@ func TestUpdateEtcdOverrides(t *testing.T) { defaultEtcdLocation := []string{"http://127.0.0.1"} for i, test := range testCases { - actualEtcdConfig := etcdstorage.EtcdConfig{} - newEtcdFn := func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) { - actualEtcdConfig = etcdConfig + actualConfig := storagebackend.Config{} + newEtcdFn := func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error) { + actualConfig = config return nil, nil } - defaultEtcdConfig := etcdstorage.EtcdConfig{ + defaultConfig := storagebackend.Config{ Prefix: DefaultEtcdPathPrefix, ServerList: defaultEtcdLocation, } - storageFactory := NewDefaultStorageFactory(defaultEtcdConfig, api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig()) + storageFactory := NewDefaultStorageFactory(defaultConfig, api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig()) storageFactory.newEtcdFn = newEtcdFn storageFactory.SetEtcdLocation(test.resource, test.servers) @@ -69,8 +69,8 @@ func TestUpdateEtcdOverrides(t *testing.T) { t.Errorf("%d: unexpected error %v", i, err) continue } - if !reflect.DeepEqual(actualEtcdConfig.ServerList, test.servers) { - t.Errorf("%d: expected %v, got %v", i, test.servers, actualEtcdConfig.ServerList) + if !reflect.DeepEqual(actualConfig.ServerList, test.servers) { + t.Errorf("%d: expected %v, got %v", i, test.servers, actualConfig.ServerList) continue } @@ -79,8 +79,8 @@ func TestUpdateEtcdOverrides(t *testing.T) { t.Errorf("%d: unexpected error %v", i, err) continue } - if !reflect.DeepEqual(actualEtcdConfig.ServerList, defaultEtcdLocation) { - t.Errorf("%d: expected %v, got %v", i, defaultEtcdLocation, actualEtcdConfig.ServerList) + if !reflect.DeepEqual(actualConfig.ServerList, defaultEtcdLocation) { + t.Errorf("%d: expected %v, got %v", i, defaultEtcdLocation, actualConfig.ServerList) continue } diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index 9fa84e8a575..33f222888d6 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -32,10 +32,6 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/apiserver" - utilnet "k8s.io/kubernetes/pkg/util/net" - "k8s.io/kubernetes/pkg/util/sets" - apiv1 "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apis/apps" @@ -46,6 +42,7 @@ import ( batchapiv1 "k8s.io/kubernetes/pkg/apis/batch/v1" "k8s.io/kubernetes/pkg/apis/extensions" extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + "k8s.io/kubernetes/pkg/apiserver" "k8s.io/kubernetes/pkg/genericapiserver" "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/registry/endpoint" @@ -53,10 +50,12 @@ import ( "k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/storage" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" "k8s.io/kubernetes/pkg/storage/etcd/etcdtest" etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing" + "k8s.io/kubernetes/pkg/storage/storagebackend" "k8s.io/kubernetes/pkg/util/intstr" + utilnet "k8s.io/kubernetes/pkg/util/net" + "k8s.io/kubernetes/pkg/util/sets" "github.com/stretchr/testify/assert" "golang.org/x/net/context" @@ -73,14 +72,14 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert. Config: &genericapiserver.Config{}, } - etcdConfig := etcdstorage.EtcdConfig{ + storageConfig := storagebackend.Config{ Prefix: etcdtest.PathPrefix(), CAFile: server.CAFile, KeyFile: server.KeyFile, CertFile: server.CertFile, } for _, url := range server.ClientURLs { - etcdConfig.ServerList = append(etcdConfig.ServerList, url.String()) + storageConfig.ServerList = append(storageConfig.ServerList, url.String()) } resourceEncoding := genericapiserver.NewDefaultResourceEncodingConfig() @@ -89,7 +88,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert. resourceEncoding.SetVersionEncoding(batch.GroupName, *testapi.Batch.GroupVersion(), unversioned.GroupVersion{Group: batch.GroupName, Version: runtime.APIVersionInternal}) resourceEncoding.SetVersionEncoding(apps.GroupName, *testapi.Apps.GroupVersion(), unversioned.GroupVersion{Group: apps.GroupName, Version: runtime.APIVersionInternal}) resourceEncoding.SetVersionEncoding(extensions.GroupName, *testapi.Extensions.GroupVersion(), unversioned.GroupVersion{Group: extensions.GroupName, Version: runtime.APIVersionInternal}) - storageFactory := genericapiserver.NewDefaultStorageFactory(etcdConfig, api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource()) + storageFactory := genericapiserver.NewDefaultStorageFactory(storageConfig, api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource()) config.StorageFactory = storageFactory config.APIResourceConfigSource = DefaultAPIResourceConfigSource() diff --git a/pkg/storage/etcd/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go index 46213769ff6..42f2bc83c50 100644 --- a/pkg/storage/etcd/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -19,8 +19,6 @@ package etcd import ( "errors" "fmt" - "net" - "net/http" "path" "reflect" "strings" @@ -35,90 +33,13 @@ import ( etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" "k8s.io/kubernetes/pkg/util" utilcache "k8s.io/kubernetes/pkg/util/cache" - utilnet "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/watch" etcd "github.com/coreos/etcd/client" - "github.com/coreos/etcd/pkg/transport" "github.com/golang/glog" "golang.org/x/net/context" ) -// storage.Config object for etcd. -type EtcdStorageConfig struct { - Config EtcdConfig - Codec runtime.Codec -} - -// implements storage.Config -func (c *EtcdStorageConfig) GetType() string { - return "etcd" -} - -// implements storage.Config -func (c *EtcdStorageConfig) NewStorage() (storage.Interface, error) { - etcdClient, err := c.Config.newEtcdClient() - if err != nil { - return nil, err - } - return NewEtcdStorage(etcdClient, c.Codec, c.Config.Prefix, c.Config.Quorum, c.Config.DeserializationCacheSize), nil -} - -// Configuration object for constructing etcd.Config -type EtcdConfig struct { - Prefix string - ServerList []string - KeyFile string - CertFile string - CAFile string - Quorum bool - DeserializationCacheSize int -} - -func (c *EtcdConfig) newEtcdClient() (etcd.Client, error) { - t, err := c.newHttpTransport() - if err != nil { - return nil, err - } - - cli, err := etcd.New(etcd.Config{ - Endpoints: c.ServerList, - Transport: t, - }) - if err != nil { - return nil, err - } - - return cli, nil -} - -func (c *EtcdConfig) newHttpTransport() (*http.Transport, error) { - info := transport.TLSInfo{ - CertFile: c.CertFile, - KeyFile: c.KeyFile, - CAFile: c.CAFile, - } - cfg, err := info.ClientConfig() - if err != nil { - return nil, err - } - - // Copied from etcd.DefaultTransport declaration. - // TODO: Determine if transport needs optimization - tr := utilnet.SetTransportDefaults(&http.Transport{ - Proxy: http.ProxyFromEnvironment, - Dial: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).Dial, - TLSHandshakeTimeout: 10 * time.Second, - MaxIdleConnsPerHost: 500, - TLSClientConfig: cfg, - }) - - return tr, nil -} - // Creates a new storage interface from the client // TODO: deprecate in favor of storage.Config abstraction over time func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int) storage.Interface { diff --git a/pkg/storage/etcd3/store.go b/pkg/storage/etcd3/store.go index f7a25987685..4758331cbbd 100644 --- a/pkg/storage/etcd3/store.go +++ b/pkg/storage/etcd3/store.go @@ -57,6 +57,11 @@ type objState struct { data []byte } +// New returns an etcd3 implementation of storage.Interface. +func New(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface { + return newStore(c, codec, prefix) +} + func newStore(c *clientv3.Client, codec runtime.Codec, prefix string) *store { versioner := etcd.APIObjectVersioner{} return &store{ diff --git a/pkg/storage/storagebackend/config.go b/pkg/storage/storagebackend/config.go new file mode 100644 index 00000000000..6d3b127e778 --- /dev/null +++ b/pkg/storage/storagebackend/config.go @@ -0,0 +1,68 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storagebackend + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" +) + +const ( + StorageTypeUnset = "" + StorageTypeETCD2 = "etcd2" + StorageTypeETCD3 = "etcd3" +) + +// Config is configuration for creating a storage backend. +type Config struct { + // Type defines the type of storage backend, e.g. "etcd2", etcd3". Default ("") is "etcd2". + Type string + // Codec is used to serialize/deserialize objects. + Codec runtime.Codec + // Prefix is the prefix to all keys passed to storage.Interface methods. + Prefix string + // ServerList is the list of storage servers to connect with. + ServerList []string + // TLS credentials + KeyFile string + CertFile string + CAFile string + // Quorum indicates that whether read operations should be quorum-level consistent. + Quorum bool + // DeserializationCacheSize is the size of cache of deserialized objects. + // Currently this is only supported in etcd2. + // We will drop the cache once using protobuf. + DeserializationCacheSize int +} + +// Create creates a storage backend based on given config. +func Create(c Config) (storage.Interface, error) { + switch c.Type { + case StorageTypeUnset, StorageTypeETCD2: + return newETCD2Storage(c) + case StorageTypeETCD3: + // TODO: We have the following features to implement: + // - Support secure connection by using key, cert, and CA files. + // - Honor "https" scheme to support secure connection in gRPC. + // - Support non-quorum read. + return newETCD3Storage(c) + default: + return nil, fmt.Errorf("unknown storage type: %s", c.Type) + } +} diff --git a/pkg/storage/storagebackend/etcd3.go b/pkg/storage/storagebackend/etcd3.go new file mode 100644 index 00000000000..5be33b02d1e --- /dev/null +++ b/pkg/storage/storagebackend/etcd3.go @@ -0,0 +1,40 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storagebackend + +import ( + "strings" + + "github.com/coreos/etcd/clientv3" + "k8s.io/kubernetes/pkg/storage" + "k8s.io/kubernetes/pkg/storage/etcd3" +) + +func newETCD3Storage(c Config) (storage.Interface, error) { + endpoints := c.ServerList + for i, s := range endpoints { + endpoints[i] = strings.TrimLeft(s, "http://") + } + cfg := clientv3.Config{ + Endpoints: endpoints, + } + client, err := clientv3.New(cfg) + if err != nil { + return nil, err + } + return etcd3.New(client, c.Codec, c.Prefix), nil +} diff --git a/pkg/storage/storagebackend/etdc2.go b/pkg/storage/storagebackend/etdc2.go new file mode 100644 index 00000000000..b1176042eed --- /dev/null +++ b/pkg/storage/storagebackend/etdc2.go @@ -0,0 +1,78 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storagebackend + +import ( + "net" + "net/http" + "time" + + etcd2client "github.com/coreos/etcd/client" + "github.com/coreos/etcd/pkg/transport" + "k8s.io/kubernetes/pkg/storage" + "k8s.io/kubernetes/pkg/storage/etcd" + utilnet "k8s.io/kubernetes/pkg/util/net" +) + +func newETCD2Storage(c Config) (storage.Interface, error) { + tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile) + if err != nil { + return nil, err + } + client, err := newETCD2Client(tr, c.ServerList) + if err != nil { + return nil, err + } + return etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize), nil +} + +func newETCD2Client(tr *http.Transport, serverList []string) (etcd2client.Client, error) { + cli, err := etcd2client.New(etcd2client.Config{ + Endpoints: serverList, + Transport: tr, + }) + if err != nil { + return nil, err + } + + return cli, nil +} + +func newTransportForETCD2(certFile, keyFile, caFile string) (*http.Transport, error) { + info := transport.TLSInfo{ + CertFile: certFile, + KeyFile: keyFile, + CAFile: caFile, + } + cfg, err := info.ClientConfig() + if err != nil { + return nil, err + } + // Copied from etcd.DefaultTransport declaration. + // TODO: Determine if transport needs optimization + tr := utilnet.SetTransportDefaults(&http.Transport{ + Proxy: http.ProxyFromEnvironment, + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: 10 * time.Second, + MaxIdleConnsPerHost: 500, + TLSClientConfig: cfg, + }) + return tr, nil +} diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index c0fd44801ff..3e60bac3544 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -47,8 +47,8 @@ import ( kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/master" "k8s.io/kubernetes/pkg/runtime" - etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" "k8s.io/kubernetes/pkg/storage/etcd/etcdtest" + "k8s.io/kubernetes/pkg/storage/storagebackend" "k8s.io/kubernetes/plugin/pkg/admission/admit" ) @@ -150,14 +150,14 @@ func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Se // Returns a basic master config. func NewMasterConfig() *master.Config { - etcdConfig := etcdstorage.EtcdConfig{ + config := storagebackend.Config{ ServerList: []string{"http://127.0.0.1:4001"}, Prefix: etcdtest.PathPrefix(), } negotiatedSerializer := NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json") - storageFactory := genericapiserver.NewDefaultStorageFactory(etcdConfig, negotiatedSerializer, genericapiserver.NewDefaultResourceEncodingConfig(), master.DefaultAPIResourceConfigSource()) + storageFactory := genericapiserver.NewDefaultStorageFactory(config, negotiatedSerializer, genericapiserver.NewDefaultResourceEncodingConfig(), master.DefaultAPIResourceConfigSource()) storageFactory.SetSerializer( unversioned.GroupResource{Group: api.GroupName, Resource: genericapiserver.AllResources}, NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json"))