Merge pull request #112789 from enj/enj/r/kms_load_once_v2

Load encryption config once (second approach)
This commit is contained in:
Kubernetes Prow Robot
2022-10-13 11:25:02 -07:00
committed by GitHub
21 changed files with 348 additions and 230 deletions

View File

@@ -28,13 +28,13 @@ import (
"github.com/spf13/pflag"
"k8s.io/apiextensions-apiserver/pkg/apiserver"
"k8s.io/apiextensions-apiserver/pkg/cmd/server/options"
"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2"
)
@@ -47,10 +47,11 @@ type TestServerInstanceOptions struct {
// TestServer return values supplied by kube-test-ApiServer
type TestServer struct {
ClientConfig *restclient.Config // Rest client config
ServerOpts *options.CustomResourceDefinitionsServerOptions // ServerOpts
TearDownFn TearDownFunc // TearDown function
TmpDir string // Temp Dir used, by the apiserver
ClientConfig *restclient.Config // Rest client config
ServerOpts *options.CustomResourceDefinitionsServerOptions // ServerOpts
TearDownFn TearDownFunc // TearDown function
TmpDir string // Temp Dir used, by the apiserver
CompletedConfig apiserver.CompletedConfig
}
// Logger allows t.Testing and b.Testing to be passed to StartTestServer and StartTestServerOrDie
@@ -144,7 +145,8 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin
if err != nil {
return result, fmt.Errorf("failed to create config from options: %v", err)
}
server, err := config.Complete().New(genericapiserver.NewEmptyDelegate())
completedConfig := config.Complete()
server, err := completedConfig.New(genericapiserver.NewEmptyDelegate())
if err != nil {
return result, fmt.Errorf("failed to create server: %v", err)
}
@@ -187,6 +189,7 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin
result.ClientConfig = server.GenericAPIServer.LoopbackClientConfig
result.ServerOpts = s
result.TearDownFn = tearDown
result.CompletedConfig = completedConfig
return result, nil
}

View File

@@ -26,21 +26,38 @@ import (
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
serveroptions "k8s.io/apiextensions-apiserver/pkg/cmd/server/options"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiextensions-apiserver/pkg/apiserver"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
serveroptions "k8s.io/apiextensions-apiserver/pkg/cmd/server/options"
servertesting "k8s.io/apiextensions-apiserver/pkg/cmd/server/testing"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)
// StartDefaultServer starts a test server.
func StartDefaultServer(t servertesting.Logger, flags ...string) (func(), *rest.Config, *serveroptions.CustomResourceDefinitionsServerOptions, error) {
tearDownFn, s, err := startDefaultServer(t, flags...)
if err != nil {
return nil, nil, nil, err
}
return tearDownFn, s.ClientConfig, s.ServerOpts, nil
}
func StartDefaultServerWithConfigAccess(t servertesting.Logger, flags ...string) (func(), *rest.Config, apiserver.CompletedConfig, error) {
tearDownFn, s, err := startDefaultServer(t, flags...)
if err != nil {
return nil, nil, apiserver.CompletedConfig{}, err
}
return tearDownFn, s.ClientConfig, s.CompletedConfig, nil
}
func startDefaultServer(t servertesting.Logger, flags ...string) (func(), servertesting.TestServer, error) {
// create kubeconfig which will not actually be used. But authz/authn needs it to startup.
fakeKubeConfig, err := ioutil.TempFile("", "kubeconfig")
if err != nil {
return nil, nil, nil, err
return nil, servertesting.TestServer{}, err
}
fakeKubeConfig.WriteString(`
apiVersion: v1
@@ -75,7 +92,7 @@ users:
), nil)
if err != nil {
os.Remove(fakeKubeConfig.Name())
return nil, nil, nil, err
return nil, servertesting.TestServer{}, err
}
tearDownFn := func() {
@@ -83,7 +100,7 @@ users:
s.TearDownFn()
}
return tearDownFn, s.ClientConfig, s.ServerOpts, nil
return tearDownFn, s, nil
}
// StartDefaultServerWithClients starts a test server and returns clients for it.

View File

@@ -298,7 +298,7 @@ func TestPruningStatus(t *testing.T) {
}
func TestPruningFromStorage(t *testing.T) {
tearDown, config, options, err := fixtures.StartDefaultServer(t)
tearDown, config, completedConfig, err := fixtures.StartDefaultServerWithConfigAccess(t)
if err != nil {
t.Fatal(err)
}
@@ -314,11 +314,6 @@ func TestPruningFromStorage(t *testing.T) {
t.Fatal(err)
}
serverConfig, err := options.Config()
if err != nil {
t.Fatal(err)
}
crd := pruningFixture.DeepCopy()
crd.Spec.Versions[0].Schema = &apiextensionsv1.CustomResourceValidation{}
if err := yaml.Unmarshal([]byte(fooSchema), &crd.Spec.Versions[0].Schema.OpenAPIV3Schema); err != nil {
@@ -330,7 +325,7 @@ func TestPruningFromStorage(t *testing.T) {
t.Fatal(err)
}
restOptions, err := serverConfig.GenericConfig.RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural})
restOptions, err := completedConfig.GenericConfig.RESTOptionsGetter.GetRESTOptions(schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural})
if err != nil {
t.Fatal(err)
}

View File

@@ -43,10 +43,12 @@ const (
)
var (
aesKeySizes = []int{16, 24, 32}
// See https://golang.org/pkg/crypto/aes/#NewCipher for details on supported key sizes for AES.
secretBoxKeySizes = []int{32}
aesKeySizes = []int{16, 24, 32}
// See https://godoc.org/golang.org/x/crypto/nacl/secretbox#Open for details on the supported key sizes for Secretbox.
secretBoxKeySizes = []int{32}
root = field.NewPath("resources")
)

View File

@@ -94,16 +94,6 @@ func (p *kmsv2PluginProbe) toHealthzCheck(idx int) healthz.HealthChecker {
})
}
func GetKMSPluginHealthzCheckers(filepath string, stopCh <-chan struct{}) ([]healthz.HealthChecker, error) {
_, kmsHealthChecks, err := LoadEncryptionConfig(filepath, stopCh)
return kmsHealthChecks, err
}
func GetTransformerOverrides(filepath string, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, error) {
transformers, _, err := LoadEncryptionConfig(filepath, stopCh)
return transformers, err
}
func LoadEncryptionConfig(filepath string, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, []healthz.HealthChecker, error) {
config, err := loadConfig(filepath)
if err != nil {
@@ -159,7 +149,7 @@ func getTransformerOverridesAndKMSPluginProbes(config *apiserverconfig.Encryptio
for gr, transList := range resourceToPrefixTransformer {
gr := gr
transList := transList
transformers[gr] = value.NewMutableTransformer(value.NewPrefixTransformers(fmt.Errorf("no matching prefix found"), transList...))
transformers[gr] = value.NewPrefixTransformers(fmt.Errorf("no matching prefix found"), transList...)
}
return transformers, probes, nil
@@ -425,8 +415,8 @@ var (
// The factory to create kms service. This is to make writing test easier.
envelopeServiceFactory = envelope.NewGRPCService
// The factory to create kmsv2 service.
envelopeKMSv2ServiceFactory = envelopekmsv2.NewGRPCService
// The factory to create kmsv2 service. Exported for integration tests.
EnvelopeKMSv2ServiceFactory = envelopekmsv2.NewGRPCService
)
func kmsPrefixTransformer(config *apiserverconfig.KMSConfiguration, stopCh <-chan struct{}) (value.PrefixTransformer, healthChecker, error) {
@@ -458,7 +448,7 @@ func kmsPrefixTransformer(config *apiserverconfig.KMSConfiguration, stopCh <-cha
return value.PrefixTransformer{}, nil, fmt.Errorf("could not configure KMSv2 plugin %q, KMSv2 feature is not enabled", kmsName)
}
envelopeService, err := envelopeKMSv2ServiceFactory(ctx, config.Endpoint, config.Timeout.Duration)
envelopeService, err := EnvelopeKMSv2ServiceFactory(ctx, config.Endpoint, config.Timeout.Duration)
if err != nil {
return value.PrefixTransformer{}, nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, error: %v", kmsName, err)
}

View File

@@ -163,12 +163,12 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)()
// Set factory for mock envelope service
factory := envelopeServiceFactory
factoryKMSv2 := envelopeKMSv2ServiceFactory
factoryKMSv2 := EnvelopeKMSv2ServiceFactory
envelopeServiceFactory = newMockEnvelopeService
envelopeKMSv2ServiceFactory = newMockEnvelopeKMSv2Service
EnvelopeKMSv2ServiceFactory = newMockEnvelopeKMSv2Service
defer func() {
envelopeServiceFactory = factory
envelopeKMSv2ServiceFactory = factoryKMSv2
EnvelopeKMSv2ServiceFactory = factoryKMSv2
}()
ctx := testContext(t)

View File

@@ -36,6 +36,7 @@ import (
"k8s.io/apiserver/pkg/storage/storagebackend"
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
"k8s.io/apiserver/pkg/storage/value"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/klog/v2"
)
@@ -59,6 +60,15 @@ type EtcdOptions struct {
DefaultWatchCacheSize int
// WatchCacheSizes represents override to a given resource
WatchCacheSizes []string
// complete guards fields that must be initialized via Complete before the Apply methods can be used.
complete bool
transformerOverrides map[schema.GroupResource]value.Transformer
kmsPluginHealthzChecks []healthz.HealthChecker
// SkipHealthEndpoints, when true, causes the Apply methods to not set up health endpoints.
// This allows multiple invocations of the Apply methods without duplication of said endpoints.
SkipHealthEndpoints bool
}
var storageTypes = sets.NewString(
@@ -190,39 +200,65 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
"The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.")
}
// Complete must be called exactly once before using any of the Apply methods. It is responsible for setting
// up objects that must be created once and reused across multiple invocations such as storage transformers.
// This method mutates the receiver (EtcdOptions). It must never mutate the inputs.
func (s *EtcdOptions) Complete(storageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker, stopCh <-chan struct{}) error {
if s == nil {
return nil
}
if s.complete {
return fmt.Errorf("EtcdOptions.Complete called more than once")
}
if len(s.EncryptionProviderConfigFilepath) != 0 {
transformerOverrides, kmsPluginHealthzChecks, err := encryptionconfig.LoadEncryptionConfig(s.EncryptionProviderConfigFilepath, stopCh)
if err != nil {
return err
}
s.transformerOverrides = transformerOverrides
s.kmsPluginHealthzChecks = kmsPluginHealthzChecks
}
s.StorageConfig.StorageObjectCountTracker = storageObjectCountTracker
s.complete = true
return nil
}
// ApplyTo mutates the provided server.Config. It must never mutate the receiver (EtcdOptions).
func (s *EtcdOptions) ApplyTo(c *server.Config) error {
if s == nil {
return nil
}
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
return s.ApplyWithStorageFactoryTo(&SimpleStorageFactory{StorageConfig: s.StorageConfig}, c)
}
// ApplyWithStorageFactoryTo mutates the provided server.Config. It must never mutate the receiver (EtcdOptions).
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
if s == nil {
return nil
}
transformerOverrides := make(map[schema.GroupResource]value.Transformer)
if len(s.EncryptionProviderConfigFilepath) > 0 {
var err error
transformerOverrides, err = encryptionconfig.GetTransformerOverrides(s.EncryptionProviderConfigFilepath, c.DrainedNotify())
if err != nil {
if !s.complete {
return fmt.Errorf("EtcdOptions.Apply called without completion")
}
if !s.SkipHealthEndpoints {
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
}
}
// use the StorageObjectCountTracker interface instance from server.Config
s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker
c.RESTOptionsGetter = &SimpleRestOptionsFactory{
Options: *s,
TransformerOverrides: transformerOverrides,
if len(s.transformerOverrides) > 0 {
factory = &transformerStorageFactory{
delegate: factory,
transformerOverrides: s.transformerOverrides,
}
}
return nil
}
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
}
// use the StorageObjectCountTracker interface instance from server.Config
s.StorageConfig.StorageObjectCountTracker = c.StorageObjectCountTracker
c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
return nil
@@ -245,57 +281,11 @@ func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error {
return readyCheck()
}))
if s.EncryptionProviderConfigFilepath != "" {
kmsPluginHealthzChecks, err := encryptionconfig.GetKMSPluginHealthzCheckers(s.EncryptionProviderConfigFilepath, c.DrainedNotify())
if err != nil {
return err
}
c.AddHealthChecks(kmsPluginHealthzChecks...)
}
c.AddHealthChecks(s.kmsPluginHealthzChecks...)
return nil
}
type SimpleRestOptionsFactory struct {
Options EtcdOptions
TransformerOverrides map[schema.GroupResource]value.Transformer
}
func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
ret := generic.RESTOptions{
StorageConfig: f.Options.StorageConfig.ForResource(resource),
Decorator: generic.UndecoratedStorage,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
ResourcePrefix: resource.Group + "/" + resource.Resource,
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
}
if f.TransformerOverrides != nil {
if transformer, ok := f.TransformerOverrides[resource]; ok {
ret.StorageConfig.Transformer = transformer
}
}
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
size, ok := sizes[resource]
if ok && size > 0 {
klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource)
}
if ok && size <= 0 {
klog.V(3).InfoS("Not using watch cache", "resource", resource)
ret.Decorator = generic.UndecoratedStorage
} else {
klog.V(3).InfoS("Using watch cache", "resource", resource)
ret.Decorator = genericregistry.StorageWithCacher()
}
}
return ret, nil
}
type StorageFactoryRestOptionsFactory struct {
Options EtcdOptions
StorageFactory serverstorage.StorageFactory
@@ -316,6 +306,7 @@ func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupR
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
}
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
@@ -326,8 +317,10 @@ func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupR
klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource)
}
if ok && size <= 0 {
klog.V(3).InfoS("Not using watch cache", "resource", resource)
ret.Decorator = generic.UndecoratedStorage
} else {
klog.V(3).InfoS("Using watch cache", "resource", resource)
ret.Decorator = genericregistry.StorageWithCacher()
}
}
@@ -369,3 +362,60 @@ func WriteWatchCacheSizes(watchCacheSizes map[schema.GroupResource]int) ([]strin
}
return cacheSizes, nil
}
var _ serverstorage.StorageFactory = &SimpleStorageFactory{}
// SimpleStorageFactory provides a StorageFactory implementation that should be used when different
// resources essentially share the same storage config (as defined by the given storagebackend.Config).
// It assumes the resources are stored at a path that is purely based on the schema.GroupResource.
// Users that need flexibility and per resource overrides should use DefaultStorageFactory instead.
type SimpleStorageFactory struct {
StorageConfig storagebackend.Config
}
func (s *SimpleStorageFactory) NewConfig(resource schema.GroupResource) (*storagebackend.ConfigForResource, error) {
return s.StorageConfig.ForResource(resource), nil
}
func (s *SimpleStorageFactory) ResourcePrefix(resource schema.GroupResource) string {
return resource.Group + "/" + resource.Resource
}
func (s *SimpleStorageFactory) Backends() []serverstorage.Backend {
// nothing should ever call this method but we still provide a functional implementation
return serverstorage.Backends(s.StorageConfig)
}
var _ serverstorage.StorageFactory = &transformerStorageFactory{}
type transformerStorageFactory struct {
delegate serverstorage.StorageFactory
transformerOverrides map[schema.GroupResource]value.Transformer
}
func (t *transformerStorageFactory) NewConfig(resource schema.GroupResource) (*storagebackend.ConfigForResource, error) {
config, err := t.delegate.NewConfig(resource)
if err != nil {
return nil, err
}
transformer, ok := t.transformerOverrides[resource]
if !ok {
return config, nil
}
configCopy := *config
resourceConfig := configCopy.Config
resourceConfig.Transformer = transformer
configCopy.Config = resourceConfig
return &configCopy, nil
}
func (t *transformerStorageFactory) ResourcePrefix(resource schema.GroupResource) string {
return t.delegate.ResourcePrefix(resource)
}
func (t *transformerStorageFactory) Backends() []serverstorage.Backend {
return t.delegate.Backends()
}

View File

@@ -204,6 +204,7 @@ func TestKMSHealthzEndpoint(t *testing.T) {
name string
encryptionConfigPath string
wantChecks []string
skipHealth bool
}{
{
name: "single kms-provider, expect single kms healthz check",
@@ -215,6 +216,12 @@ func TestKMSHealthzEndpoint(t *testing.T) {
encryptionConfigPath: "testdata/encryption-configs/multiple-kms-providers.yaml",
wantChecks: []string{"etcd", "kms-provider-0", "kms-provider-1"},
},
{
name: "two kms-providers with skip, expect zero kms healthz checks",
encryptionConfigPath: "testdata/encryption-configs/multiple-kms-providers.yaml",
wantChecks: nil,
skipHealth: true,
},
}
scheme := runtime.NewScheme()
@@ -225,8 +232,12 @@ func TestKMSHealthzEndpoint(t *testing.T) {
serverConfig := server.NewConfig(codecs)
etcdOptions := &EtcdOptions{
EncryptionProviderConfigFilepath: tc.encryptionConfigPath,
SkipHealthEndpoints: tc.skipHealth,
}
if err := etcdOptions.addEtcdHealthEndpoint(serverConfig); err != nil {
if err := etcdOptions.Complete(serverConfig.StorageObjectCountTracker, serverConfig.DrainedNotify()); err != nil {
t.Fatal(err)
}
if err := etcdOptions.ApplyTo(serverConfig); err != nil {
t.Fatalf("Failed to add healthz error: %v", err)
}
@@ -244,12 +255,19 @@ func TestReadinessCheck(t *testing.T) {
name string
wantReadyzChecks []string
wantHealthzChecks []string
skipHealth bool
}{
{
name: "Readyz should have etcd-readiness check",
wantReadyzChecks: []string{"etcd", "etcd-readiness"},
wantHealthzChecks: []string{"etcd"},
},
{
name: "skip health, Readyz should not have etcd-readiness check",
wantReadyzChecks: nil,
wantHealthzChecks: nil,
skipHealth: true,
},
}
scheme := runtime.NewScheme()
@@ -258,8 +276,11 @@ func TestReadinessCheck(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
serverConfig := server.NewConfig(codecs)
etcdOptions := &EtcdOptions{}
if err := etcdOptions.addEtcdHealthEndpoint(serverConfig); err != nil {
etcdOptions := &EtcdOptions{SkipHealthEndpoints: tc.skipHealth}
if err := etcdOptions.Complete(serverConfig.StorageObjectCountTracker, serverConfig.DrainedNotify()); err != nil {
t.Fatal(err)
}
if err := etcdOptions.ApplyTo(serverConfig); err != nil {
t.Fatalf("Failed to add healthz error: %v", err)
}

View File

@@ -101,6 +101,9 @@ func (o *RecommendedOptions) AddFlags(fs *pflag.FlagSet) {
// ApplyTo adds RecommendedOptions to the server configuration.
// pluginInitializers can be empty, it is only need for additional initializers.
func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
if err := o.Etcd.Complete(config.Config.StorageObjectCountTracker, config.Config.DrainedNotify()); err != nil {
return err
}
if err := o.Etcd.ApplyTo(&config.Config); err != nil {
return err
}

View File

@@ -29,7 +29,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
)
@@ -112,8 +111,6 @@ type groupResourceOverrides struct {
// decoderDecoratorFn is optional and may wrap the provided decoders (can add new decoders). The order of
// returned decoders will be priority for attempt to decode.
decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder
// transformer is optional and shall encrypt that resource at rest.
transformer value.Transformer
// disablePaging will prevent paging on the provided resource.
disablePaging bool
}
@@ -139,9 +136,6 @@ func (o groupResourceOverrides) Apply(config *storagebackend.Config, options *St
if o.decoderDecoratorFn != nil {
options.DecoderDecoratorFn = o.decoderDecoratorFn
}
if o.transformer != nil {
config.Transformer = o.transformer
}
if o.disablePaging {
config.Paging = false
}
@@ -210,12 +204,6 @@ func (s *DefaultStorageFactory) SetSerializer(groupResource schema.GroupResource
s.Overrides[groupResource] = overrides
}
func (s *DefaultStorageFactory) SetTransformer(groupResource schema.GroupResource, transformer value.Transformer) {
overrides := s.Overrides[groupResource]
overrides.transformer = transformer
s.Overrides[groupResource] = overrides
}
// AddCohabitatingResources links resources together the order of the slice matters! its the priority order of lookup for finding a storage location
func (s *DefaultStorageFactory) AddCohabitatingResources(groupResources ...schema.GroupResource) {
for _, groupResource := range groupResources {
@@ -291,25 +279,35 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*
// Backends returns all backends for all registered storage destinations.
// Used for getting all instances for health validations.
func (s *DefaultStorageFactory) Backends() []Backend {
servers := sets.NewString(s.StorageConfig.Transport.ServerList...)
return backends(s.StorageConfig, s.Overrides)
}
for _, overrides := range s.Overrides {
// Backends returns all backends for all registered storage destinations.
// Used for getting all instances for health validations.
func Backends(storageConfig storagebackend.Config) []Backend {
return backends(storageConfig, nil)
}
func backends(storageConfig storagebackend.Config, grOverrides map[schema.GroupResource]groupResourceOverrides) []Backend {
servers := sets.NewString(storageConfig.Transport.ServerList...)
for _, overrides := range grOverrides {
servers.Insert(overrides.etcdLocation...)
}
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
if len(s.StorageConfig.Transport.CertFile) > 0 && len(s.StorageConfig.Transport.KeyFile) > 0 {
cert, err := tls.LoadX509KeyPair(s.StorageConfig.Transport.CertFile, s.StorageConfig.Transport.KeyFile)
if len(storageConfig.Transport.CertFile) > 0 && len(storageConfig.Transport.KeyFile) > 0 {
cert, err := tls.LoadX509KeyPair(storageConfig.Transport.CertFile, storageConfig.Transport.KeyFile)
if err != nil {
klog.Errorf("failed to load key pair while getting backends: %s", err)
} else {
tlsConfig.Certificates = []tls.Certificate{cert}
}
}
if len(s.StorageConfig.Transport.TrustedCAFile) > 0 {
if caCert, err := ioutil.ReadFile(s.StorageConfig.Transport.TrustedCAFile); err != nil {
if len(storageConfig.Transport.TrustedCAFile) > 0 {
if caCert, err := ioutil.ReadFile(storageConfig.Transport.TrustedCAFile); err != nil {
klog.Errorf("failed to read ca file while getting backends: %s", err)
} else {
caPool := x509.NewCertPool()

View File

@@ -48,7 +48,7 @@ import (
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/value"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/metrics/legacyregistry"
tracing "k8s.io/component-base/tracing"
@@ -395,7 +395,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.
}
transformer := c.Transformer
if transformer == nil {
transformer = value.IdentityTransformer
transformer = identity.NewEncryptCheckTransformer()
}
return etcd3.New(client, c.Codec, newFunc, c.Prefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
}

View File

@@ -48,7 +48,7 @@ import (
"k8s.io/apiserver/pkg/storage/etcd3"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/apiserver/pkg/storage/value"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/clock"
@@ -107,7 +107,7 @@ func newPodList() runtime.Object { return &example.PodList{} }
func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, schema.GroupResource{Resource: "pods"}, value.IdentityTransformer, true, etcd3.NewDefaultLeaseManagerConfig())
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig())
return server, storage
}

View File

@@ -24,6 +24,12 @@ import (
"k8s.io/apiserver/pkg/storage/value"
)
var (
transformer = identityTransformer{}
encryptedPrefix = []byte("k8s:enc:")
errEncryptedData = fmt.Errorf("identity transformer tried to read encrypted data")
)
// identityTransformer performs no transformation on provided data, but validates
// that the data is not encrypted data during TransformFromStorage
type identityTransformer struct{}
@@ -31,7 +37,7 @@ type identityTransformer struct{}
// NewEncryptCheckTransformer returns an identityTransformer which returns an error
// on attempts to read encrypted data
func NewEncryptCheckTransformer() value.Transformer {
return identityTransformer{}
return transformer
}
// TransformFromStorage returns the input bytes if the data is not encrypted
@@ -39,8 +45,8 @@ func (identityTransformer) TransformFromStorage(ctx context.Context, data []byte
// identityTransformer has to return an error if the data is encoded using another transformer.
// JSON data starts with '{'. Protobuf data has a prefix 'k8s[\x00-\xFF]'.
// Prefix 'k8s:enc:' is reserved for encrypted data on disk.
if bytes.HasPrefix(data, []byte("k8s:enc:")) {
return []byte{}, false, fmt.Errorf("identity transformer tried to read encrypted data")
if bytes.HasPrefix(data, encryptedPrefix) {
return nil, false, errEncryptedData
}
return data, false, nil
}

View File

@@ -21,7 +21,6 @@ import (
"bytes"
"context"
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/errors"
@@ -51,54 +50,11 @@ type Transformer interface {
TransformToStorage(ctx context.Context, data []byte, dataCtx Context) (out []byte, err error)
}
type identityTransformer struct{}
// IdentityTransformer performs no transformation of the provided data.
var IdentityTransformer Transformer = identityTransformer{}
func (identityTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx Context) ([]byte, bool, error) {
return data, false, nil
}
func (identityTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx Context) ([]byte, error) {
return data, nil
}
// DefaultContext is a simple implementation of Context for a slice of bytes.
type DefaultContext []byte
// AuthenticatedData returns itself.
func (c DefaultContext) AuthenticatedData() []byte { return []byte(c) }
// MutableTransformer allows a transformer to be changed safely at runtime.
type MutableTransformer struct {
lock sync.RWMutex
transformer Transformer
}
// NewMutableTransformer creates a transformer that can be updated at any time by calling Set()
func NewMutableTransformer(transformer Transformer) *MutableTransformer {
return &MutableTransformer{transformer: transformer}
}
// Set updates the nested transformer.
func (t *MutableTransformer) Set(transformer Transformer) {
t.lock.Lock()
t.transformer = transformer
t.lock.Unlock()
}
func (t *MutableTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx Context) (out []byte, stale bool, err error) {
t.lock.RLock()
transformer := t.transformer
t.lock.RUnlock()
return transformer.TransformFromStorage(ctx, data, dataCtx)
}
func (t *MutableTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx Context) (out []byte, err error) {
t.lock.RLock()
transformer := t.transformer
t.lock.RUnlock()
return transformer.TransformToStorage(ctx, data, dataCtx)
}
func (c DefaultContext) AuthenticatedData() []byte { return c }
// PrefixTransformer holds a transformer interface and the prefix that the transformation is located under.
type PrefixTransformer struct {