add flags to enable etcd3

This commit is contained in:
Hongchao Deng 2016-04-20 12:11:39 +08:00
parent 51db4170c2
commit c0071a1595
17 changed files with 267 additions and 153 deletions

View File

@ -31,7 +31,7 @@ import (
"k8s.io/kubernetes/pkg/genericapiserver"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master/ports"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/storagebackend"
"github.com/spf13/pflag"
)
@ -50,7 +50,7 @@ type APIServer struct {
DeleteCollectionWorkers int
DeprecatedStorageVersion string
EtcdServersOverrides []string
EtcdConfig etcdstorage.EtcdConfig
StorageConfig storagebackend.Config
EventTTL time.Duration
KeystoneURL string
KubeletConfig kubeletclient.KubeletClientConfig
@ -81,7 +81,7 @@ func NewAPIServer() *APIServer {
AdmissionControl: "AlwaysAdmit",
AuthorizationMode: "AlwaysAllow",
DeleteCollectionWorkers: 1,
EtcdConfig: etcdstorage.EtcdConfig{
StorageConfig: storagebackend.Config{
Prefix: genericapiserver.DefaultEtcdPathPrefix,
DeserializationCacheSize: genericapiserver.DefaultDeserializationCacheSize,
},
@ -183,14 +183,15 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.AuthorizationConfig.WebhookConfigFile, "authorization-webhook-config-file", s.AuthorizationConfig.WebhookConfigFile, "File with webhook configuration in kubeconfig format, used with --authorization-mode=Webhook. The API server will query the remote service to determine access on the API server's secure port.")
fs.StringVar(&s.AdmissionControl, "admission-control", s.AdmissionControl, "Ordered list of plug-ins to do admission control of resources into cluster. Comma-delimited list of: "+strings.Join(admission.GetPlugins(), ", "))
fs.StringVar(&s.AdmissionControlConfigFile, "admission-control-config-file", s.AdmissionControlConfigFile, "File with admission control configuration.")
fs.StringSliceVar(&s.EtcdConfig.ServerList, "etcd-servers", s.EtcdConfig.ServerList, "List of etcd servers to watch (http://ip:port), comma separated.")
fs.StringVar(&s.StorageConfig.Type, "storage-backend", s.StorageConfig.Type, "The storage backend for persistence. Options: 'etcd2' (default), 'etcd3'.")
fs.StringSliceVar(&s.StorageConfig.ServerList, "etcd-servers", s.StorageConfig.ServerList, "List of etcd servers to connect with (http://ip:port), comma separated.")
fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.")
fs.StringVar(&s.EtcdConfig.Prefix, "etcd-prefix", s.EtcdConfig.Prefix, "The prefix for all resource paths in etcd.")
fs.StringVar(&s.EtcdConfig.KeyFile, "etcd-keyfile", s.EtcdConfig.KeyFile, "SSL key file used to secure etcd communication")
fs.StringVar(&s.EtcdConfig.CertFile, "etcd-certfile", s.EtcdConfig.CertFile, "SSL certification file used to secure etcd communication")
fs.StringVar(&s.EtcdConfig.CAFile, "etcd-cafile", s.EtcdConfig.CAFile, "SSL Certificate Authority file used to secure etcd communication")
fs.BoolVar(&s.EtcdConfig.Quorum, "etcd-quorum-read", s.EtcdConfig.Quorum, "If true, enable quorum read")
fs.IntVar(&s.EtcdConfig.DeserializationCacheSize, "deserialization-cache-size", s.EtcdConfig.DeserializationCacheSize, "Number of deserialized json objects to cache in memory.")
fs.StringVar(&s.StorageConfig.Prefix, "etcd-prefix", s.StorageConfig.Prefix, "The prefix for all resource paths in etcd.")
fs.StringVar(&s.StorageConfig.KeyFile, "etcd-keyfile", s.StorageConfig.KeyFile, "SSL key file used to secure etcd communication")
fs.StringVar(&s.StorageConfig.CertFile, "etcd-certfile", s.StorageConfig.CertFile, "SSL certification file used to secure etcd communication")
fs.StringVar(&s.StorageConfig.CAFile, "etcd-cafile", s.StorageConfig.CAFile, "SSL Certificate Authority file used to secure etcd communication")
fs.BoolVar(&s.StorageConfig.Quorum, "etcd-quorum-read", s.StorageConfig.Quorum, "If true, enable quorum read")
fs.IntVar(&s.StorageConfig.DeserializationCacheSize, "deserialization-cache-size", s.StorageConfig.DeserializationCacheSize, "Number of deserialized json objects to cache in memory.")
fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.")
fs.StringVar(&s.MasterServiceNamespace, "master-service-namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods")
fs.IntVar(&s.DeleteCollectionWorkers, "delete-collection-workers", s.DeleteCollectionWorkers, "Number of workers spawned for DeleteCollection call. These are used to speed up namespace cleanup.")

View File

@ -80,7 +80,7 @@ cluster's shared state through which all other components interact.`,
func Run(s *options.APIServer) error {
genericapiserver.DefaultAndValidateRunOptions(s.ServerRunOptions)
if len(s.EtcdConfig.ServerList) == 0 {
if len(s.StorageConfig.ServerList) == 0 {
glog.Fatalf("--etcd-servers must be specified")
}
@ -173,7 +173,7 @@ func Run(s *options.APIServer) error {
resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal})
}
storageFactory := genericapiserver.NewDefaultStorageFactory(s.EtcdConfig, api.Codecs, resourceEncoding, apiResourceConfigSource)
storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, api.Codecs, resourceEncoding, apiResourceConfigSource)
storageFactory.AddCohabitatingResources(batch.Resource("jobs"), extensions.Resource("jobs"))
storageFactory.AddCohabitatingResources(autoscaling.Resource("horizontalpodautoscalers"), extensions.Resource("horizontalpodautoscalers"))
for _, override := range s.EtcdServersOverrides {

View File

@ -74,7 +74,7 @@ kube-apiserver
--etcd-keyfile="": SSL key file used to secure etcd communication
--etcd-prefix="/registry": The prefix for all resource paths in etcd.
--etcd-quorum-read[=false]: If true, enable quorum read
--etcd-servers=[]: List of etcd servers to watch (http://ip:port), comma separated.
--etcd-servers=[]: List of etcd servers to connect with (http://ip:port), comma separated.
--etcd-servers-overrides=[]: Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.
--event-ttl=1h0m0s: Amount of time to retain events. Default 1 hour.
--experimental-keystone-url="": If passed, activates the keystone authentication plugin
@ -109,6 +109,7 @@ kube-apiserver
--service-node-port-range=: A port range to reserve for services with NodePort visibility. Example: '30000-32767'. Inclusive at both ends of the range.
--ssh-keyfile="": If non-empty, use secure SSH proxy to the nodes, using this user keyfile
--ssh-user="": If non-empty, use secure SSH proxy to the nodes, using this user name
--storage-backend="": The storage backend for persistence. Options: 'etcd2' (default), 'etcd3'.
--storage-versions="apps/v1alpha1,authorization.k8s.io/v1beta1,autoscaling/v1,batch/v1,componentconfig/v1alpha1,extensions/v1beta1,metrics/v1alpha1,v1": The per-group version to store resources in. Specified in the format "group1/version1,group2/version2,...". In the case where objects are moved from one group to the other, you may specify the format "group1=group2/v1beta1,group3/v1beta1,...". You only need to pass the groups you wish to change from the defaults. It defaults to a list of preferred versions of all registered groups, which is derived from the KUBE_API_VERSIONS environment variable.
--tls-cert-file="": File containing x509 Certificate for HTTPS. (CA cert, if any, concatenated after server cert). If HTTPS serving is enabled, and --tls-cert-file and --tls-private-key-file are not provided, a self-signed certificate and key are generated for the public address and saved to /var/run/kubernetes.
--tls-private-key-file="": File containing x509 private key matching --tls-cert-file.
@ -117,7 +118,7 @@ kube-apiserver
--watch-cache-sizes=[]: List of watch cache sizes for every resource (pods, nodes, etc.), comma separated. The individual override format: resource#size, where size is a number. It takes effect when watch-cache is enabled.
```
###### Auto generated by spf13/cobra on 16-Apr-2016
###### Auto generated by spf13/cobra on 28-Apr-2016
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->

View File

@ -27,7 +27,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/genericapiserver"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/storagebackend"
// Install the testgroup API
_ "k8s.io/kubernetes/cmd/libs/go2idl/client-gen/testdata/apis/testgroup.k8s.io/install"
@ -41,11 +41,11 @@ const (
)
func newStorageFactory() genericapiserver.StorageFactory {
etcdConfig := etcdstorage.EtcdConfig{
config := storagebackend.Config{
Prefix: genericapiserver.DefaultEtcdPathPrefix,
ServerList: []string{"http://127.0.0.1:4001"},
}
storageFactory := genericapiserver.NewDefaultStorageFactory(etcdConfig, api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig())
storageFactory := genericapiserver.NewDefaultStorageFactory(config, api.Codecs, genericapiserver.NewDefaultResourceEncodingConfig(), genericapiserver.NewResourceConfig())
return storageFactory
}

View File

@ -32,7 +32,7 @@ import (
"k8s.io/kubernetes/pkg/genericapiserver"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master/ports"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/util/config"
utilnet "k8s.io/kubernetes/pkg/util/net"
@ -61,7 +61,7 @@ type APIServer struct {
EnableWatchCache bool
EnableSwaggerUI bool
EtcdServersOverrides []string
EtcdConfig etcdstorage.EtcdConfig
StorageConfig storagebackend.Config
EventTTL time.Duration
ExternalHost string
KeystoneURL string
@ -100,7 +100,7 @@ func NewAPIServer() *APIServer {
AuthorizationMode: "AlwaysAllow",
DeleteCollectionWorkers: 1,
EnableLogsSupport: true,
EtcdConfig: etcdstorage.EtcdConfig{
StorageConfig: storagebackend.Config{
Prefix: genericapiserver.DefaultEtcdPathPrefix,
},
EventTTL: 1 * time.Hour,
@ -234,13 +234,14 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.AuthorizationConfig.WebhookConfigFile, "authorization-webhook-config-file", s.AuthorizationConfig.WebhookConfigFile, "File with webhook configuration in kubeconfig format, used with --authorization-mode=Webhook. The API server will query the remote service to determine access on the API server's secure port.")
fs.StringVar(&s.AdmissionControl, "admission-control", s.AdmissionControl, "Ordered list of plug-ins to do admission control of resources into cluster. Comma-delimited list of: "+strings.Join(admission.GetPlugins(), ", "))
fs.StringVar(&s.AdmissionControlConfigFile, "admission-control-config-file", s.AdmissionControlConfigFile, "File with admission control configuration.")
fs.StringSliceVar(&s.EtcdConfig.ServerList, "etcd-servers", s.EtcdConfig.ServerList, "List of etcd servers to watch (http://ip:port), comma separated.")
fs.StringVar(&s.StorageConfig.Type, "storage-backend", s.StorageConfig.Type, "The storage backend for persistence. Options: 'etcd2', 'etcd3'.")
fs.StringSliceVar(&s.StorageConfig.ServerList, "etcd-servers", s.StorageConfig.ServerList, "List of etcd servers to watch (http://ip:port), comma separated.")
fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.")
fs.StringVar(&s.EtcdConfig.Prefix, "etcd-prefix", s.EtcdConfig.Prefix, "The prefix for all resource paths in etcd.")
fs.StringVar(&s.EtcdConfig.KeyFile, "etcd-keyfile", s.EtcdConfig.KeyFile, "SSL key file used to secure etcd communication")
fs.StringVar(&s.EtcdConfig.CertFile, "etcd-certfile", s.EtcdConfig.CertFile, "SSL certification file used to secure etcd communication")
fs.StringVar(&s.EtcdConfig.CAFile, "etcd-cafile", s.EtcdConfig.CAFile, "SSL Certificate Authority file used to secure etcd communication")
fs.BoolVar(&s.EtcdConfig.Quorum, "etcd-quorum-read", s.EtcdConfig.Quorum, "If true, enable quorum read")
fs.StringVar(&s.StorageConfig.Prefix, "etcd-prefix", s.StorageConfig.Prefix, "The prefix for all resource paths in etcd.")
fs.StringVar(&s.StorageConfig.KeyFile, "etcd-keyfile", s.StorageConfig.KeyFile, "SSL key file used to secure etcd communication")
fs.StringVar(&s.StorageConfig.CertFile, "etcd-certfile", s.StorageConfig.CertFile, "SSL certification file used to secure etcd communication")
fs.StringVar(&s.StorageConfig.CAFile, "etcd-cafile", s.StorageConfig.CAFile, "SSL Certificate Authority file used to secure etcd communication")
fs.BoolVar(&s.StorageConfig.Quorum, "etcd-quorum-read", s.StorageConfig.Quorum, "If true, enable quorum read")
fs.StringSliceVar(&s.CorsAllowedOriginList, "cors-allowed-origins", s.CorsAllowedOriginList, "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.")
fs.IPNetVar(&s.ServiceClusterIPRange, "service-cluster-ip-range", s.ServiceClusterIPRange, "A CIDR notation IP range from which to assign service cluster IPs. This must not overlap with any IP ranges assigned to nodes for pods.")

View File

@ -92,7 +92,7 @@ func Run(s *options.APIServer) error {
}
glog.Infof("Will report %v as public IP address.", s.AdvertiseAddress)
if len(s.EtcdConfig.ServerList) == 0 {
if len(s.StorageConfig.ServerList) == 0 {
glog.Fatalf("--etcd-servers must be specified")
}
@ -186,7 +186,7 @@ func Run(s *options.APIServer) error {
resourceEncoding.SetVersionEncoding(group, storageEncodingVersion, unversioned.GroupVersion{Group: group, Version: runtime.APIVersionInternal})
}
storageFactory := genericapiserver.NewDefaultStorageFactory(s.EtcdConfig, api.Codecs, resourceEncoding, apiResourceConfigSource)
storageFactory := genericapiserver.NewDefaultStorageFactory(s.StorageConfig, api.Codecs, resourceEncoding, apiResourceConfigSource)
for _, override := range s.EtcdServersOverrides {
tokens := strings.Split(override, "#")
if len(tokens) != 2 {

View File

@ -22,14 +22,15 @@ import (
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"io/ioutil"
fed_v1a1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
"k8s.io/kubernetes/federation/cmd/federated-apiserver/app/options"
"k8s.io/kubernetes/pkg/api/unversioned"
"net"
"net/http"
"time"
"github.com/stretchr/testify/assert"
fed_v1a1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
"k8s.io/kubernetes/federation/cmd/federated-apiserver/app/options"
"k8s.io/kubernetes/pkg/api/unversioned"
)
func TestLongRunningRequestRegexp(t *testing.T) {
@ -82,7 +83,7 @@ func TestRun(t *testing.T) {
s.InsecurePort = insecurePort
_, ipNet, _ := net.ParseCIDR("10.10.10.0/24")
s.ServiceClusterIPRange = *ipNet
s.EtcdConfig.ServerList = []string{"http://localhost:4001"}
s.StorageConfig.ServerList = []string{"http://localhost:4001"}
go func() {
if err := Run(s); err != nil {
t.Fatalf("Error in bringing up the server: %v", err)

View File

@ -391,6 +391,7 @@ start-services
static-pods-config
stats-port
stop-services
storage-backend
storage-version
storage-versions
streaming-connection-idle-timeout

View File

@ -23,7 +23,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/util/sets"
"github.com/golang/glog"
@ -44,9 +44,9 @@ type StorageFactory interface {
// 2. Resource encodings for storage: group,version,kind to store as
// 3. Cohabitating default: some resources like hpa are exposed through multiple APIs. They must agree on 1 and 2
type DefaultStorageFactory struct {
// DefaultEtcdConfig describes how to connect to etcd in general. It's authentication information will be used for
// every storage.Interface returned.
DefaultEtcdConfig etcdstorage.EtcdConfig
// StorageConfig describes how to create a storage backend in general.
// Its authentication information will be used for every storage.Interface returned.
StorageConfig storagebackend.Config
Overrides map[unversioned.GroupResource]groupResourceOverrides
@ -62,12 +62,12 @@ type DefaultStorageFactory struct {
APIResourceConfigSource APIResourceConfigSource
// newEtcdFn exists to be overwritten for unit testing. You should never set this in a normal world.
newEtcdFn func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error)
newEtcdFn func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error)
}
type groupResourceOverrides struct {
// etcdLocation contains the list of "special" locations that are used for particular GroupResources
// These are merged on top of the default DefaultEtcdConfig when requesting the storage.Interface for a given GroupResource
// These are merged on top of the StorageConfig when requesting the storage.Interface for a given GroupResource
etcdLocation []string
// etcdPrefix contains the list of "special" prefixes for a GroupResource. Resource=* means for the entire group
etcdPrefix string
@ -83,9 +83,9 @@ var _ StorageFactory = &DefaultStorageFactory{}
const AllResources = "*"
func NewDefaultStorageFactory(defaultEtcdConfig etcdstorage.EtcdConfig, defaultSerializer runtime.NegotiatedSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory {
func NewDefaultStorageFactory(config storagebackend.Config, defaultSerializer runtime.NegotiatedSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory {
return &DefaultStorageFactory{
DefaultEtcdConfig: defaultEtcdConfig,
StorageConfig: config,
Overrides: map[unversioned.GroupResource]groupResourceOverrides{},
DefaultSerializer: defaultSerializer,
ResourceEncodingConfig: resourceEncodingConfig,
@ -152,7 +152,7 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st
overriddenEtcdLocations = exactResourceOverride.etcdLocation
}
etcdPrefix := s.DefaultEtcdConfig.Prefix
etcdPrefix := s.StorageConfig.Prefix
if len(groupOverride.etcdPrefix) > 0 {
etcdPrefix = groupOverride.etcdPrefix
}
@ -168,10 +168,10 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st
etcdSerializer = exactResourceOverride.serializer
}
// operate on copy
etcdConfig := s.DefaultEtcdConfig
etcdConfig.Prefix = etcdPrefix
config := s.StorageConfig
config.Prefix = etcdPrefix
if len(overriddenEtcdLocations) > 0 {
etcdConfig.ServerList = overriddenEtcdLocations
config.ServerList = overriddenEtcdLocations
}
storageEncodingVersion, err := s.ResourceEncodingConfig.StoragageEncodingFor(chosenStorageResource)
@ -183,13 +183,11 @@ func (s *DefaultStorageFactory) New(groupResource unversioned.GroupResource) (st
return nil, err
}
glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, etcdConfig)
return s.newEtcdFn(etcdSerializer, storageEncodingVersion, internalVersion, etcdConfig)
glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, config)
return s.newEtcdFn(etcdSerializer, storageEncodingVersion, internalVersion, config)
}
func newEtcd(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) {
var storageConfig etcdstorage.EtcdStorageConfig
storageConfig.Config = etcdConfig
func newEtcd(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error) {
s, ok := ns.SerializerForMediaType("application/json", nil)
if !ok {
return nil, fmt.Errorf("unable to find serializer for JSON")
@ -205,14 +203,14 @@ func newEtcd(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unve
return nil, fmt.Errorf("error setting up decoder from %v to %v: %v", storageVersion, memoryVersion, err)
}
}
storageConfig.Codec = runtime.NewCodec(encoder, decoder)
return storageConfig.NewStorage()
config.Codec = runtime.NewCodec(encoder, decoder)
return storagebackend.Create(config)
}
// Get all backends for all registered storage destinations.
// Used for getting all instances for health validations.
func (s *DefaultStorageFactory) Backends() []string {
backends := sets.NewString(s.DefaultEtcdConfig.ServerList...)
backends := sets.NewString(s.StorageConfig.ServerList...)
for _, overrides := range s.Overrides {
backends.Insert(overrides.etcdLocation...)

View File

@ -25,7 +25,7 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/storagebackend"
)
func TestUpdateEtcdOverrides(t *testing.T) {
@ -49,17 +49,17 @@ func TestUpdateEtcdOverrides(t *testing.T) {
defaultEtcdLocation := []string{"http://127.0.0.1"}
for i, test := range testCases {
actualEtcdConfig := etcdstorage.EtcdConfig{}
newEtcdFn := func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, etcdConfig etcdstorage.EtcdConfig) (etcdStorage storage.Interface, err error) {
actualEtcdConfig = etcdConfig
actualConfig := storagebackend.Config{}
newEtcdFn := func(ns runtime.NegotiatedSerializer, storageVersion, memoryVersion unversioned.GroupVersion, config storagebackend.Config) (etcdStorage storage.Interface, err error) {
actualConfig = config
return nil, nil
}
defaultEtcdConfig := etcdstorage.EtcdConfig{
defaultConfig := storagebackend.Config{
Prefix: DefaultEtcdPathPrefix,
ServerList: defaultEtcdLocation,
}
storageFactory := NewDefaultStorageFactory(defaultEtcdConfig, api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig())
storageFactory := NewDefaultStorageFactory(defaultConfig, api.Codecs, NewDefaultResourceEncodingConfig(), NewResourceConfig())
storageFactory.newEtcdFn = newEtcdFn
storageFactory.SetEtcdLocation(test.resource, test.servers)
@ -69,8 +69,8 @@ func TestUpdateEtcdOverrides(t *testing.T) {
t.Errorf("%d: unexpected error %v", i, err)
continue
}
if !reflect.DeepEqual(actualEtcdConfig.ServerList, test.servers) {
t.Errorf("%d: expected %v, got %v", i, test.servers, actualEtcdConfig.ServerList)
if !reflect.DeepEqual(actualConfig.ServerList, test.servers) {
t.Errorf("%d: expected %v, got %v", i, test.servers, actualConfig.ServerList)
continue
}
@ -79,8 +79,8 @@ func TestUpdateEtcdOverrides(t *testing.T) {
t.Errorf("%d: unexpected error %v", i, err)
continue
}
if !reflect.DeepEqual(actualEtcdConfig.ServerList, defaultEtcdLocation) {
t.Errorf("%d: expected %v, got %v", i, defaultEtcdLocation, actualEtcdConfig.ServerList)
if !reflect.DeepEqual(actualConfig.ServerList, defaultEtcdLocation) {
t.Errorf("%d: expected %v, got %v", i, defaultEtcdLocation, actualConfig.ServerList)
continue
}

View File

@ -32,10 +32,6 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apiserver"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/apps"
@ -46,6 +42,7 @@ import (
batchapiv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
"k8s.io/kubernetes/pkg/apis/extensions"
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/apiserver"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/registry/endpoint"
@ -53,10 +50,12 @@ import (
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
"k8s.io/kubernetes/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/util/intstr"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
@ -73,14 +72,14 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
Config: &genericapiserver.Config{},
}
etcdConfig := etcdstorage.EtcdConfig{
storageConfig := storagebackend.Config{
Prefix: etcdtest.PathPrefix(),
CAFile: server.CAFile,
KeyFile: server.KeyFile,
CertFile: server.CertFile,
}
for _, url := range server.ClientURLs {
etcdConfig.ServerList = append(etcdConfig.ServerList, url.String())
storageConfig.ServerList = append(storageConfig.ServerList, url.String())
}
resourceEncoding := genericapiserver.NewDefaultResourceEncodingConfig()
@ -89,7 +88,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
resourceEncoding.SetVersionEncoding(batch.GroupName, *testapi.Batch.GroupVersion(), unversioned.GroupVersion{Group: batch.GroupName, Version: runtime.APIVersionInternal})
resourceEncoding.SetVersionEncoding(apps.GroupName, *testapi.Apps.GroupVersion(), unversioned.GroupVersion{Group: apps.GroupName, Version: runtime.APIVersionInternal})
resourceEncoding.SetVersionEncoding(extensions.GroupName, *testapi.Extensions.GroupVersion(), unversioned.GroupVersion{Group: extensions.GroupName, Version: runtime.APIVersionInternal})
storageFactory := genericapiserver.NewDefaultStorageFactory(etcdConfig, api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource())
storageFactory := genericapiserver.NewDefaultStorageFactory(storageConfig, api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource())
config.StorageFactory = storageFactory
config.APIResourceConfigSource = DefaultAPIResourceConfigSource()

View File

@ -19,8 +19,6 @@ package etcd
import (
"errors"
"fmt"
"net"
"net/http"
"path"
"reflect"
"strings"
@ -35,90 +33,13 @@ import (
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util"
utilcache "k8s.io/kubernetes/pkg/util/cache"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/watch"
etcd "github.com/coreos/etcd/client"
"github.com/coreos/etcd/pkg/transport"
"github.com/golang/glog"
"golang.org/x/net/context"
)
// storage.Config object for etcd.
type EtcdStorageConfig struct {
Config EtcdConfig
Codec runtime.Codec
}
// implements storage.Config
func (c *EtcdStorageConfig) GetType() string {
return "etcd"
}
// implements storage.Config
func (c *EtcdStorageConfig) NewStorage() (storage.Interface, error) {
etcdClient, err := c.Config.newEtcdClient()
if err != nil {
return nil, err
}
return NewEtcdStorage(etcdClient, c.Codec, c.Config.Prefix, c.Config.Quorum, c.Config.DeserializationCacheSize), nil
}
// Configuration object for constructing etcd.Config
type EtcdConfig struct {
Prefix string
ServerList []string
KeyFile string
CertFile string
CAFile string
Quorum bool
DeserializationCacheSize int
}
func (c *EtcdConfig) newEtcdClient() (etcd.Client, error) {
t, err := c.newHttpTransport()
if err != nil {
return nil, err
}
cli, err := etcd.New(etcd.Config{
Endpoints: c.ServerList,
Transport: t,
})
if err != nil {
return nil, err
}
return cli, nil
}
func (c *EtcdConfig) newHttpTransport() (*http.Transport, error) {
info := transport.TLSInfo{
CertFile: c.CertFile,
KeyFile: c.KeyFile,
CAFile: c.CAFile,
}
cfg, err := info.ClientConfig()
if err != nil {
return nil, err
}
// Copied from etcd.DefaultTransport declaration.
// TODO: Determine if transport needs optimization
tr := utilnet.SetTransportDefaults(&http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
MaxIdleConnsPerHost: 500,
TLSClientConfig: cfg,
})
return tr, nil
}
// Creates a new storage interface from the client
// TODO: deprecate in favor of storage.Config abstraction over time
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int) storage.Interface {

View File

@ -57,6 +57,11 @@ type objState struct {
data []byte
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface {
return newStore(c, codec, prefix)
}
func newStore(c *clientv3.Client, codec runtime.Codec, prefix string) *store {
versioner := etcd.APIObjectVersioner{}
return &store{

View File

@ -0,0 +1,68 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storagebackend
import (
"fmt"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
const (
StorageTypeUnset = ""
StorageTypeETCD2 = "etcd2"
StorageTypeETCD3 = "etcd3"
)
// Config is configuration for creating a storage backend.
type Config struct {
// Type defines the type of storage backend, e.g. "etcd2", etcd3". Default ("") is "etcd2".
Type string
// Codec is used to serialize/deserialize objects.
Codec runtime.Codec
// Prefix is the prefix to all keys passed to storage.Interface methods.
Prefix string
// ServerList is the list of storage servers to connect with.
ServerList []string
// TLS credentials
KeyFile string
CertFile string
CAFile string
// Quorum indicates that whether read operations should be quorum-level consistent.
Quorum bool
// DeserializationCacheSize is the size of cache of deserialized objects.
// Currently this is only supported in etcd2.
// We will drop the cache once using protobuf.
DeserializationCacheSize int
}
// Create creates a storage backend based on given config.
func Create(c Config) (storage.Interface, error) {
switch c.Type {
case StorageTypeUnset, StorageTypeETCD2:
return newETCD2Storage(c)
case StorageTypeETCD3:
// TODO: We have the following features to implement:
// - Support secure connection by using key, cert, and CA files.
// - Honor "https" scheme to support secure connection in gRPC.
// - Support non-quorum read.
return newETCD3Storage(c)
default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}

View File

@ -0,0 +1,40 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storagebackend
import (
"strings"
"github.com/coreos/etcd/clientv3"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd3"
)
func newETCD3Storage(c Config) (storage.Interface, error) {
endpoints := c.ServerList
for i, s := range endpoints {
endpoints[i] = strings.TrimLeft(s, "http://")
}
cfg := clientv3.Config{
Endpoints: endpoints,
}
client, err := clientv3.New(cfg)
if err != nil {
return nil, err
}
return etcd3.New(client, c.Codec, c.Prefix), nil
}

View File

@ -0,0 +1,78 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storagebackend
import (
"net"
"net/http"
"time"
etcd2client "github.com/coreos/etcd/client"
"github.com/coreos/etcd/pkg/transport"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd"
utilnet "k8s.io/kubernetes/pkg/util/net"
)
func newETCD2Storage(c Config) (storage.Interface, error) {
tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
if err != nil {
return nil, err
}
client, err := newETCD2Client(tr, c.ServerList)
if err != nil {
return nil, err
}
return etcd.NewEtcdStorage(client, c.Codec, c.Prefix, c.Quorum, c.DeserializationCacheSize), nil
}
func newETCD2Client(tr *http.Transport, serverList []string) (etcd2client.Client, error) {
cli, err := etcd2client.New(etcd2client.Config{
Endpoints: serverList,
Transport: tr,
})
if err != nil {
return nil, err
}
return cli, nil
}
func newTransportForETCD2(certFile, keyFile, caFile string) (*http.Transport, error) {
info := transport.TLSInfo{
CertFile: certFile,
KeyFile: keyFile,
CAFile: caFile,
}
cfg, err := info.ClientConfig()
if err != nil {
return nil, err
}
// Copied from etcd.DefaultTransport declaration.
// TODO: Determine if transport needs optimization
tr := utilnet.SetTransportDefaults(&http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
MaxIdleConnsPerHost: 500,
TLSClientConfig: cfg,
})
return tr, nil
}

View File

@ -47,8 +47,8 @@ import (
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/runtime"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
"k8s.io/kubernetes/pkg/storage/storagebackend"
"k8s.io/kubernetes/plugin/pkg/admission/admit"
)
@ -150,14 +150,14 @@ func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Se
// Returns a basic master config.
func NewMasterConfig() *master.Config {
etcdConfig := etcdstorage.EtcdConfig{
config := storagebackend.Config{
ServerList: []string{"http://127.0.0.1:4001"},
Prefix: etcdtest.PathPrefix(),
}
negotiatedSerializer := NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json")
storageFactory := genericapiserver.NewDefaultStorageFactory(etcdConfig, negotiatedSerializer, genericapiserver.NewDefaultResourceEncodingConfig(), master.DefaultAPIResourceConfigSource())
storageFactory := genericapiserver.NewDefaultStorageFactory(config, negotiatedSerializer, genericapiserver.NewDefaultResourceEncodingConfig(), master.DefaultAPIResourceConfigSource())
storageFactory.SetSerializer(
unversioned.GroupResource{Group: api.GroupName, Resource: genericapiserver.AllResources},
NewSingleContentTypeSerializer(api.Scheme, testapi.Default.Codec(), "application/json"))