Merge pull request #112050 from nilekhc/kms-hot-reload
Implements hot reload of the KMS `EncryptionConfiguration`
This commit is contained in:
@@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"crypto/aes"
|
||||
"crypto/cipher"
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -27,6 +28,7 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@@ -59,6 +61,7 @@ const (
|
||||
kmsPluginHealthzPositiveTTL = 20 * time.Second
|
||||
kmsAPIVersionV1 = "v1"
|
||||
kmsAPIVersionV2 = "v2"
|
||||
kmsReloadHealthCheckName = "kms-providers"
|
||||
)
|
||||
|
||||
type kmsPluginHealthzResponse struct {
|
||||
@@ -85,7 +88,7 @@ type kmsv2PluginProbe struct {
|
||||
type kmsHealthChecker []healthz.HealthChecker
|
||||
|
||||
func (k kmsHealthChecker) Name() string {
|
||||
return "kms-providers"
|
||||
return kmsReloadHealthCheckName
|
||||
}
|
||||
|
||||
func (k kmsHealthChecker) Check(req *http.Request) error {
|
||||
@@ -113,25 +116,51 @@ func (h *kmsv2PluginProbe) toHealthzCheck(idx int) healthz.HealthChecker {
|
||||
})
|
||||
}
|
||||
|
||||
// EncryptionConfiguration represents the parsed and normalized encryption configuration for the apiserver.
|
||||
type EncryptionConfiguration struct {
|
||||
// Transformers is a list of value.Transformer that will be used to encrypt and decrypt data.
|
||||
Transformers map[schema.GroupResource]value.Transformer
|
||||
|
||||
// HealthChecks is a list of healthz.HealthChecker that will be used to check the health of the encryption providers.
|
||||
HealthChecks []healthz.HealthChecker
|
||||
|
||||
// EncryptionFileContentHash is the hash of the encryption config file.
|
||||
EncryptionFileContentHash string
|
||||
|
||||
// KMSCloseGracePeriod is the duration we will wait before closing old transformers.
|
||||
// We wait for any in-flight requests to finish by using the duration which is longer than their timeout.
|
||||
KMSCloseGracePeriod time.Duration
|
||||
}
|
||||
|
||||
// LoadEncryptionConfig parses and validates the encryption config specified by filepath.
|
||||
// It may launch multiple go routines whose lifecycle is controlled by stopCh.
|
||||
// If reload is true, or KMS v2 plugins are used with no KMS v1 plugins, the returned slice of health checkers will always be of length 1.
|
||||
func LoadEncryptionConfig(filepath string, reload bool, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, []healthz.HealthChecker, error) {
|
||||
config, err := loadConfig(filepath, reload)
|
||||
func LoadEncryptionConfig(filepath string, reload bool, stopCh <-chan struct{}) (*EncryptionConfiguration, error) {
|
||||
config, contentHash, err := loadConfig(filepath, reload)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error while parsing file: %w", err)
|
||||
return nil, fmt.Errorf("error while parsing file: %w", err)
|
||||
}
|
||||
|
||||
transformers, kmsHealthChecks, kmsUsed, err := getTransformerOverridesAndKMSPluginHealthzCheckers(config, stopCh)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error while building transformers: %w", err)
|
||||
return nil, fmt.Errorf("error while building transformers: %w", err)
|
||||
}
|
||||
|
||||
if reload || (kmsUsed.v2Used && !kmsUsed.v1Used) {
|
||||
kmsHealthChecks = []healthz.HealthChecker{kmsHealthChecker(kmsHealthChecks)}
|
||||
}
|
||||
|
||||
return transformers, kmsHealthChecks, nil
|
||||
// KMSTimeout is the duration we will wait before closing old transformers.
|
||||
// The way we calculate is as follows:
|
||||
// 1. Sum all timeouts across all KMS plugins. (check kmsPrefixTransformer for differences between v1 and v2)
|
||||
// 2. Multiply that by 2 (to allow for some buffer)
|
||||
// The reason we sum all timeout is because kmsHealthChecker() will run all health checks serially
|
||||
return &EncryptionConfiguration{
|
||||
Transformers: transformers,
|
||||
HealthChecks: kmsHealthChecks,
|
||||
EncryptionFileContentHash: contentHash,
|
||||
KMSCloseGracePeriod: 2 * kmsUsed.kmsTimeoutSum,
|
||||
}, err
|
||||
}
|
||||
|
||||
func getTransformerOverridesAndKMSPluginHealthzCheckers(config *apiserverconfig.EncryptionConfiguration, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, []healthz.HealthChecker, *kmsState, error) {
|
||||
@@ -168,6 +197,8 @@ func getTransformerOverridesAndKMSPluginProbes(config *apiserverconfig.Encryptio
|
||||
kmsUsed.v1Used = kmsUsed.v1Used || used.v1Used
|
||||
kmsUsed.v2Used = kmsUsed.v2Used || used.v2Used
|
||||
|
||||
kmsUsed.kmsTimeoutSum += used.kmsTimeoutSum
|
||||
|
||||
// For each resource, create a list of providers to use
|
||||
for _, resource := range resourceConfig.Resources {
|
||||
resource := resource
|
||||
@@ -262,19 +293,20 @@ func isKMSv2ProviderHealthy(name string, response *envelopekmsv2.StatusResponse)
|
||||
return nil
|
||||
}
|
||||
|
||||
func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfiguration, error) {
|
||||
// loadConfig parses the encryption configuration file at filepath and returns the parsed config and hash of the file.
|
||||
func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfiguration, string, error) {
|
||||
f, err := os.Open(filepath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error opening encryption provider configuration file %q: %w", filepath, err)
|
||||
return nil, "", fmt.Errorf("error opening encryption provider configuration file %q: %w", filepath, err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
data, err := io.ReadAll(f)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not read contents: %w", err)
|
||||
return nil, "", fmt.Errorf("could not read contents: %w", err)
|
||||
}
|
||||
if len(data) == 0 {
|
||||
return nil, fmt.Errorf("encryption provider configuration file %q is empty", filepath)
|
||||
return nil, "", fmt.Errorf("encryption provider configuration file %q is empty", filepath)
|
||||
}
|
||||
|
||||
scheme := runtime.NewScheme()
|
||||
@@ -284,14 +316,14 @@ func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfig
|
||||
|
||||
configObj, gvk, err := codecs.UniversalDecoder().Decode(data, nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, "", err
|
||||
}
|
||||
config, ok := configObj.(*apiserverconfig.EncryptionConfiguration)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("got unexpected config type: %v", gvk)
|
||||
return nil, "", fmt.Errorf("got unexpected config type: %v", gvk)
|
||||
}
|
||||
|
||||
return config, validation.ValidateEncryptionConfiguration(config, reload).ToAggregate()
|
||||
return config, computeEncryptionConfigHash(data), validation.ValidateEncryptionConfiguration(config, reload).ToAggregate()
|
||||
}
|
||||
|
||||
func prefixTransformersAndProbes(config apiserverconfig.ResourceConfiguration, stopCh <-chan struct{}) ([]value.PrefixTransformer, []healthChecker, *kmsState, error) {
|
||||
@@ -324,6 +356,9 @@ func prefixTransformersAndProbes(config apiserverconfig.ResourceConfiguration, s
|
||||
probes = append(probes, probe)
|
||||
kmsUsed.v1Used = kmsUsed.v1Used || used.v1Used
|
||||
kmsUsed.v2Used = kmsUsed.v2Used || used.v2Used
|
||||
|
||||
// calculate the maximum timeout for all KMS providers
|
||||
kmsUsed.kmsTimeoutSum += used.kmsTimeoutSum
|
||||
}
|
||||
|
||||
case provider.Identity != nil:
|
||||
@@ -459,6 +494,7 @@ var (
|
||||
|
||||
type kmsState struct {
|
||||
v1Used, v2Used bool
|
||||
kmsTimeoutSum time.Duration
|
||||
}
|
||||
|
||||
func kmsPrefixTransformer(config *apiserverconfig.KMSConfiguration, stopCh <-chan struct{}) (value.PrefixTransformer, healthChecker, *kmsState, error) {
|
||||
@@ -483,7 +519,11 @@ func kmsPrefixTransformer(config *apiserverconfig.KMSConfiguration, stopCh <-cha
|
||||
|
||||
transformer := envelopePrefixTransformer(config, envelopeService, kmsTransformerPrefixV1)
|
||||
|
||||
return transformer, probe, &kmsState{v1Used: true}, nil
|
||||
return transformer, probe, &kmsState{
|
||||
v1Used: true,
|
||||
// for v1 we will do encrypt and decrypt for health check. Since these are serial operations, we will double the timeout.
|
||||
kmsTimeoutSum: 2 * config.Timeout.Duration,
|
||||
}, nil
|
||||
|
||||
case kmsAPIVersionV2:
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.KMSv2) {
|
||||
@@ -509,7 +549,10 @@ func kmsPrefixTransformer(config *apiserverconfig.KMSConfiguration, stopCh <-cha
|
||||
Prefix: []byte(kmsTransformerPrefixV2 + kmsName + ":"),
|
||||
}
|
||||
|
||||
return transformer, probe, &kmsState{v2Used: true}, nil
|
||||
return transformer, probe, &kmsState{
|
||||
v2Used: true,
|
||||
kmsTimeoutSum: config.Timeout.Duration,
|
||||
}, nil
|
||||
|
||||
default:
|
||||
return value.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMS plugin %q, unsupported KMS API version %q", kmsName, config.APIVersion)
|
||||
@@ -555,3 +598,133 @@ func (u unionTransformers) TransformFromStorage(ctx context.Context, data []byte
|
||||
func (u unionTransformers) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) (out []byte, err error) {
|
||||
return u[0].TransformToStorage(ctx, data, dataCtx)
|
||||
}
|
||||
|
||||
// computeEncryptionConfigHash returns the expected hash for an encryption config file that has been loaded as bytes.
|
||||
// We use a hash instead of the raw file contents when tracking changes to avoid holding any encryption keys in memory outside of their associated transformers.
|
||||
// This hash must be used in-memory and not externalized to the process because it has no cross-release stability guarantees.
|
||||
func computeEncryptionConfigHash(data []byte) string {
|
||||
return fmt.Sprintf("%x", sha256.Sum256(data))
|
||||
}
|
||||
|
||||
var _ healthz.HealthChecker = &DynamicTransformers{}
|
||||
|
||||
// DynamicTransformers holds transformers that may be dynamically updated via a single external actor, likely a controller.
|
||||
// This struct must avoid locks (even read write locks) as it is inline to all calls to storage.
|
||||
type DynamicTransformers struct {
|
||||
transformTracker *atomic.Value
|
||||
}
|
||||
|
||||
type transformTracker struct {
|
||||
transformerOverrides map[schema.GroupResource]value.Transformer
|
||||
kmsPluginHealthzCheck healthz.HealthChecker
|
||||
closeTransformers context.CancelFunc
|
||||
kmsCloseGracePeriod time.Duration
|
||||
}
|
||||
|
||||
// NewDynamicTransformers returns transformers, health checks for kms providers and an ability to close transformers.
|
||||
func NewDynamicTransformers(
|
||||
transformerOverrides map[schema.GroupResource]value.Transformer,
|
||||
kmsPluginHealthzCheck healthz.HealthChecker,
|
||||
closeTransformers context.CancelFunc,
|
||||
kmsCloseGracePeriod time.Duration,
|
||||
) *DynamicTransformers {
|
||||
dynamicTransformers := &DynamicTransformers{
|
||||
transformTracker: &atomic.Value{},
|
||||
}
|
||||
|
||||
tracker := &transformTracker{
|
||||
transformerOverrides: transformerOverrides,
|
||||
kmsPluginHealthzCheck: kmsPluginHealthzCheck,
|
||||
closeTransformers: closeTransformers,
|
||||
kmsCloseGracePeriod: kmsCloseGracePeriod,
|
||||
}
|
||||
dynamicTransformers.transformTracker.Store(tracker)
|
||||
|
||||
return dynamicTransformers
|
||||
}
|
||||
|
||||
// Check implements healthz.HealthChecker
|
||||
func (d *DynamicTransformers) Check(req *http.Request) error {
|
||||
return d.transformTracker.Load().(*transformTracker).kmsPluginHealthzCheck.Check(req)
|
||||
}
|
||||
|
||||
// Name implements healthz.HealthChecker
|
||||
func (d *DynamicTransformers) Name() string {
|
||||
return kmsReloadHealthCheckName
|
||||
}
|
||||
|
||||
// TransformerForResource returns the transformer for the given resource.
|
||||
func (d *DynamicTransformers) TransformerForResource(resource schema.GroupResource) value.Transformer {
|
||||
return &resourceTransformer{
|
||||
resource: resource,
|
||||
transformTracker: d.transformTracker,
|
||||
}
|
||||
}
|
||||
|
||||
// Set sets the transformer overrides. This method is not go routine safe and must only be called by the same, single caller throughout the lifetime of this object.
|
||||
func (d *DynamicTransformers) Set(
|
||||
transformerOverrides map[schema.GroupResource]value.Transformer,
|
||||
closeTransformers context.CancelFunc,
|
||||
kmsPluginHealthzCheck healthz.HealthChecker,
|
||||
kmsCloseGracePeriod time.Duration,
|
||||
) {
|
||||
// store new values
|
||||
newTransformTracker := &transformTracker{
|
||||
transformerOverrides: transformerOverrides,
|
||||
closeTransformers: closeTransformers,
|
||||
kmsPluginHealthzCheck: kmsPluginHealthzCheck,
|
||||
kmsCloseGracePeriod: kmsCloseGracePeriod,
|
||||
}
|
||||
|
||||
// update new transformer overrides
|
||||
oldTransformTracker := d.transformTracker.Swap(newTransformTracker).(*transformTracker)
|
||||
|
||||
// close old transformers once we wait for grpc request to finish any in-flight requests.
|
||||
// by the time we spawn this go routine, the new transformers have already been set and will be used for new requests.
|
||||
// if the server starts shutting down during sleep duration then the transformers will correctly closed early because their lifetime is tied to the api-server drain notifier.
|
||||
go func() {
|
||||
time.Sleep(oldTransformTracker.kmsCloseGracePeriod)
|
||||
oldTransformTracker.closeTransformers()
|
||||
}()
|
||||
}
|
||||
|
||||
var _ value.Transformer = &resourceTransformer{}
|
||||
|
||||
type resourceTransformer struct {
|
||||
resource schema.GroupResource
|
||||
transformTracker *atomic.Value
|
||||
}
|
||||
|
||||
func (r *resourceTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
|
||||
return r.transformer().TransformFromStorage(ctx, data, dataCtx)
|
||||
}
|
||||
|
||||
func (r *resourceTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
|
||||
return r.transformer().TransformToStorage(ctx, data, dataCtx)
|
||||
}
|
||||
|
||||
func (r *resourceTransformer) transformer() value.Transformer {
|
||||
transformer := r.transformTracker.Load().(*transformTracker).transformerOverrides[r.resource]
|
||||
if transformer == nil {
|
||||
return identity.NewEncryptCheckTransformer()
|
||||
}
|
||||
return transformer
|
||||
}
|
||||
|
||||
type ResourceTransformers interface {
|
||||
TransformerForResource(resource schema.GroupResource) value.Transformer
|
||||
}
|
||||
|
||||
var _ ResourceTransformers = &DynamicTransformers{}
|
||||
var _ ResourceTransformers = &StaticTransformers{}
|
||||
|
||||
type StaticTransformers map[schema.GroupResource]value.Transformer
|
||||
|
||||
// StaticTransformers
|
||||
func (s StaticTransformers) TransformerForResource(resource schema.GroupResource) value.Transformer {
|
||||
transformer := s[resource]
|
||||
if transformer == nil {
|
||||
return identity.NewEncryptCheckTransformer()
|
||||
}
|
||||
return transformer
|
||||
}
|
||||
|
@@ -114,7 +114,7 @@ func newMockErrorEnvelopeKMSv2Service(endpoint string, timeout time.Duration) (e
|
||||
|
||||
func TestLegacyConfig(t *testing.T) {
|
||||
legacyV1Config := "testdata/valid-configs/legacy.yaml"
|
||||
legacyConfigObject, err := loadConfig(legacyV1Config, false)
|
||||
legacyConfigObject, _, err := loadConfig(legacyV1Config, false)
|
||||
cacheSize := int32(10)
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, legacyV1Config)
|
||||
@@ -177,48 +177,48 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) {
|
||||
// Transforms data using one of them, and tries to untransform using the others.
|
||||
// Repeats this for all possible combinations.
|
||||
correctConfigWithIdentityFirst := "testdata/valid-configs/identity-first.yaml"
|
||||
identityFirstTransformerOverrides, _, err := LoadEncryptionConfig(correctConfigWithIdentityFirst, false, ctx.Done())
|
||||
identityFirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithIdentityFirst, false, ctx.Done())
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithIdentityFirst)
|
||||
}
|
||||
|
||||
correctConfigWithAesGcmFirst := "testdata/valid-configs/aes-gcm-first.yaml"
|
||||
aesGcmFirstTransformerOverrides, _, err := LoadEncryptionConfig(correctConfigWithAesGcmFirst, false, ctx.Done())
|
||||
aesGcmFirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithAesGcmFirst, false, ctx.Done())
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithAesGcmFirst)
|
||||
}
|
||||
|
||||
correctConfigWithAesCbcFirst := "testdata/valid-configs/aes-cbc-first.yaml"
|
||||
aesCbcFirstTransformerOverrides, _, err := LoadEncryptionConfig(correctConfigWithAesCbcFirst, false, ctx.Done())
|
||||
aesCbcFirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithAesCbcFirst, false, ctx.Done())
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithAesCbcFirst)
|
||||
}
|
||||
|
||||
correctConfigWithSecretboxFirst := "testdata/valid-configs/secret-box-first.yaml"
|
||||
secretboxFirstTransformerOverrides, _, err := LoadEncryptionConfig(correctConfigWithSecretboxFirst, false, ctx.Done())
|
||||
secretboxFirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithSecretboxFirst, false, ctx.Done())
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithSecretboxFirst)
|
||||
}
|
||||
|
||||
correctConfigWithKMSFirst := "testdata/valid-configs/kms-first.yaml"
|
||||
kmsFirstTransformerOverrides, _, err := LoadEncryptionConfig(correctConfigWithKMSFirst, false, ctx.Done())
|
||||
kmsFirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithKMSFirst, false, ctx.Done())
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSFirst)
|
||||
}
|
||||
|
||||
correctConfigWithKMSv2First := "testdata/valid-configs/kmsv2-first.yaml"
|
||||
kmsv2FirstTransformerOverrides, _, err := LoadEncryptionConfig(correctConfigWithKMSv2First, false, ctx.Done())
|
||||
kmsv2FirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithKMSv2First, false, ctx.Done())
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSv2First)
|
||||
}
|
||||
|
||||
// Pick the transformer for any of the returned resources.
|
||||
identityFirstTransformer := identityFirstTransformerOverrides[schema.ParseGroupResource("secrets")]
|
||||
aesGcmFirstTransformer := aesGcmFirstTransformerOverrides[schema.ParseGroupResource("secrets")]
|
||||
aesCbcFirstTransformer := aesCbcFirstTransformerOverrides[schema.ParseGroupResource("secrets")]
|
||||
secretboxFirstTransformer := secretboxFirstTransformerOverrides[schema.ParseGroupResource("secrets")]
|
||||
kmsFirstTransformer := kmsFirstTransformerOverrides[schema.ParseGroupResource("secrets")]
|
||||
kmsv2FirstTransformer := kmsv2FirstTransformerOverrides[schema.ParseGroupResource("secrets")]
|
||||
identityFirstTransformer := identityFirstEncryptionConfiguration.Transformers[schema.ParseGroupResource("secrets")]
|
||||
aesGcmFirstTransformer := aesGcmFirstEncryptionConfiguration.Transformers[schema.ParseGroupResource("secrets")]
|
||||
aesCbcFirstTransformer := aesCbcFirstEncryptionConfiguration.Transformers[schema.ParseGroupResource("secrets")]
|
||||
secretboxFirstTransformer := secretboxFirstEncryptionConfiguration.Transformers[schema.ParseGroupResource("secrets")]
|
||||
kmsFirstTransformer := kmsFirstEncryptionConfiguration.Transformers[schema.ParseGroupResource("secrets")]
|
||||
kmsv2FirstTransformer := kmsv2FirstEncryptionConfiguration.Transformers[schema.ParseGroupResource("secrets")]
|
||||
|
||||
dataCtx := value.DefaultContext([]byte(sampleContextText))
|
||||
originalText := []byte(sampleText)
|
||||
@@ -256,6 +256,222 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestKMSMaxTimeout(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)()
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
expectedTimeout time.Duration
|
||||
config apiserverconfig.EncryptionConfiguration
|
||||
}{
|
||||
{
|
||||
name: "default timeout",
|
||||
config: apiserverconfig.EncryptionConfiguration{
|
||||
Resources: []apiserverconfig.ResourceConfiguration{
|
||||
{
|
||||
Resources: []string{"secrets"},
|
||||
Providers: []apiserverconfig.ProviderConfiguration{
|
||||
{
|
||||
KMS: &apiserverconfig.KMSConfiguration{
|
||||
Name: "kms",
|
||||
APIVersion: "v1",
|
||||
Timeout: &metav1.Duration{
|
||||
// default timeout is 3s
|
||||
// this will be set automatically if not provided in config file
|
||||
Duration: 3 * time.Second,
|
||||
},
|
||||
Endpoint: "unix:///tmp/testprovider.sock",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedTimeout: 6 * time.Second,
|
||||
},
|
||||
{
|
||||
name: "with v1 provider",
|
||||
config: apiserverconfig.EncryptionConfiguration{
|
||||
Resources: []apiserverconfig.ResourceConfiguration{
|
||||
{
|
||||
Resources: []string{"secrets"},
|
||||
Providers: []apiserverconfig.ProviderConfiguration{
|
||||
{
|
||||
KMS: &apiserverconfig.KMSConfiguration{
|
||||
Name: "kms",
|
||||
APIVersion: "v1",
|
||||
Timeout: &metav1.Duration{
|
||||
// default timeout is 3s
|
||||
// this will be set automatically if not provided in config file
|
||||
Duration: 3 * time.Second,
|
||||
},
|
||||
Endpoint: "unix:///tmp/testprovider.sock",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Resources: []string{"configmaps"},
|
||||
Providers: []apiserverconfig.ProviderConfiguration{
|
||||
{
|
||||
KMS: &apiserverconfig.KMSConfiguration{
|
||||
Name: "kms",
|
||||
APIVersion: "v1",
|
||||
Timeout: &metav1.Duration{
|
||||
// default timeout is 3s
|
||||
// this will be set automatically if not provided in config file
|
||||
Duration: 3 * time.Second,
|
||||
},
|
||||
Endpoint: "unix:///tmp/testprovider.sock",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedTimeout: 12 * time.Second,
|
||||
},
|
||||
{
|
||||
name: "with v2 provider",
|
||||
config: apiserverconfig.EncryptionConfiguration{
|
||||
Resources: []apiserverconfig.ResourceConfiguration{
|
||||
{
|
||||
Resources: []string{"secrets"},
|
||||
Providers: []apiserverconfig.ProviderConfiguration{
|
||||
{
|
||||
KMS: &apiserverconfig.KMSConfiguration{
|
||||
Name: "kms",
|
||||
APIVersion: "v2",
|
||||
Timeout: &metav1.Duration{
|
||||
Duration: 15 * time.Second,
|
||||
},
|
||||
Endpoint: "unix:///tmp/testprovider.sock",
|
||||
},
|
||||
},
|
||||
{
|
||||
KMS: &apiserverconfig.KMSConfiguration{
|
||||
Name: "new-kms",
|
||||
APIVersion: "v2",
|
||||
Timeout: &metav1.Duration{
|
||||
Duration: 5 * time.Second,
|
||||
},
|
||||
Endpoint: "unix:///tmp/anothertestprovider.sock",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Resources: []string{"configmaps"},
|
||||
Providers: []apiserverconfig.ProviderConfiguration{
|
||||
{
|
||||
KMS: &apiserverconfig.KMSConfiguration{
|
||||
Name: "another-kms",
|
||||
APIVersion: "v2",
|
||||
Timeout: &metav1.Duration{
|
||||
Duration: 10 * time.Second,
|
||||
},
|
||||
Endpoint: "unix:///tmp/testprovider.sock",
|
||||
},
|
||||
},
|
||||
{
|
||||
KMS: &apiserverconfig.KMSConfiguration{
|
||||
Name: "yet-another-kms",
|
||||
APIVersion: "v2",
|
||||
Timeout: &metav1.Duration{
|
||||
Duration: 2 * time.Second,
|
||||
},
|
||||
Endpoint: "unix:///tmp/anothertestprovider.sock",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedTimeout: 32 * time.Second,
|
||||
},
|
||||
{
|
||||
name: "with v1 and v2 provider",
|
||||
config: apiserverconfig.EncryptionConfiguration{
|
||||
Resources: []apiserverconfig.ResourceConfiguration{
|
||||
{
|
||||
Resources: []string{"secrets"},
|
||||
Providers: []apiserverconfig.ProviderConfiguration{
|
||||
{
|
||||
KMS: &apiserverconfig.KMSConfiguration{
|
||||
Name: "kms",
|
||||
APIVersion: "v1",
|
||||
Timeout: &metav1.Duration{
|
||||
Duration: 1 * time.Second,
|
||||
},
|
||||
Endpoint: "unix:///tmp/testprovider.sock",
|
||||
},
|
||||
},
|
||||
{
|
||||
KMS: &apiserverconfig.KMSConfiguration{
|
||||
Name: "another-kms",
|
||||
APIVersion: "v2",
|
||||
Timeout: &metav1.Duration{
|
||||
Duration: 1 * time.Second,
|
||||
},
|
||||
Endpoint: "unix:///tmp/anothertestprovider.sock",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Resources: []string{"configmaps"},
|
||||
Providers: []apiserverconfig.ProviderConfiguration{
|
||||
{
|
||||
KMS: &apiserverconfig.KMSConfiguration{
|
||||
Name: "kms",
|
||||
APIVersion: "v1",
|
||||
Timeout: &metav1.Duration{
|
||||
Duration: 4 * time.Second,
|
||||
},
|
||||
Endpoint: "unix:///tmp/testprovider.sock",
|
||||
},
|
||||
},
|
||||
{
|
||||
KMS: &apiserverconfig.KMSConfiguration{
|
||||
Name: "yet-another-kms",
|
||||
APIVersion: "v1",
|
||||
Timeout: &metav1.Duration{
|
||||
Duration: 2 * time.Second,
|
||||
},
|
||||
Endpoint: "unix:///tmp/anothertestprovider.sock",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedTimeout: 15 * time.Second,
|
||||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
cacheSize := int32(1000)
|
||||
for _, resource := range testCase.config.Resources {
|
||||
for _, provider := range resource.Providers {
|
||||
if provider.KMS != nil {
|
||||
provider.KMS.CacheSize = &cacheSize
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_, _, kmsUsed, _ := getTransformerOverridesAndKMSPluginHealthzCheckers(&testCase.config, testContext(t).Done())
|
||||
if kmsUsed == nil {
|
||||
t.Fatal("kmsUsed should not be nil")
|
||||
}
|
||||
|
||||
if kmsUsed.kmsTimeoutSum != testCase.expectedTimeout {
|
||||
t.Fatalf("expected timeout %v, got %v", testCase.expectedTimeout, kmsUsed.kmsTimeoutSum)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestKMSPluginHealthz(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)()
|
||||
|
||||
@@ -323,7 +539,7 @@ func TestKMSPluginHealthz(t *testing.T) {
|
||||
|
||||
for _, tt := range testCases {
|
||||
t.Run(tt.desc, func(t *testing.T) {
|
||||
config, err := loadConfig(tt.config, false)
|
||||
config, _, err := loadConfig(tt.config, false)
|
||||
if errStr := errString(err); errStr != tt.wantErr {
|
||||
t.Fatalf("unexpected error state got=%s want=%s", errStr, tt.wantErr)
|
||||
}
|
||||
@@ -541,14 +757,14 @@ func getTransformerFromEncryptionConfig(t *testing.T, encryptionConfigPath strin
|
||||
ctx := testContext(t)
|
||||
|
||||
t.Helper()
|
||||
transformers, _, err := LoadEncryptionConfig(encryptionConfigPath, false, ctx.Done())
|
||||
encryptionConfiguration, err := LoadEncryptionConfig(encryptionConfigPath, false, ctx.Done())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(transformers) != 1 {
|
||||
if len(encryptionConfiguration.Transformers) != 1 {
|
||||
t.Fatalf("input config does not have exactly one resource: %s", encryptionConfigPath)
|
||||
}
|
||||
for _, transformer := range transformers {
|
||||
for _, transformer := range encryptionConfiguration.Transformers {
|
||||
return transformer
|
||||
}
|
||||
panic("unreachable")
|
||||
@@ -602,3 +818,12 @@ func errString(err error) string {
|
||||
|
||||
return err.Error()
|
||||
}
|
||||
|
||||
func TestComputeEncryptionConfigHash(t *testing.T) {
|
||||
// hash the empty string to be sure that sha256 is being used
|
||||
expect := "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
|
||||
sum := computeEncryptionConfigHash([]byte(""))
|
||||
if expect != sum {
|
||||
t.Errorf("expected hash %q but got %q", expect, sum)
|
||||
}
|
||||
}
|
||||
|
@@ -0,0 +1,265 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// workqueueKey is the dummy key used to process change in encryption config file.
|
||||
const workqueueKey = "key"
|
||||
|
||||
// DynamicKMSEncryptionConfigContent which can dynamically handle changes in encryption config file.
|
||||
type DynamicKMSEncryptionConfigContent struct {
|
||||
name string
|
||||
|
||||
// filePath is the path of the file to read.
|
||||
filePath string
|
||||
|
||||
// lastLoadedEncryptionConfigHash stores last successfully read encryption config file content.
|
||||
lastLoadedEncryptionConfigHash string
|
||||
|
||||
// queue for processing changes in encryption config file.
|
||||
queue workqueue.RateLimitingInterface
|
||||
|
||||
// dynamicTransformers updates the transformers when encryption config file changes.
|
||||
dynamicTransformers *encryptionconfig.DynamicTransformers
|
||||
|
||||
// stopCh used here is a lifecycle signal of genericapiserver already drained while shutting down.
|
||||
stopCh <-chan struct{}
|
||||
}
|
||||
|
||||
// NewDynamicKMSEncryptionConfiguration returns controller that dynamically reacts to changes in encryption config file.
|
||||
func NewDynamicKMSEncryptionConfiguration(
|
||||
name, filePath string,
|
||||
dynamicTransformers *encryptionconfig.DynamicTransformers,
|
||||
configContentHash string,
|
||||
stopCh <-chan struct{},
|
||||
) *DynamicKMSEncryptionConfigContent {
|
||||
encryptionConfig := &DynamicKMSEncryptionConfigContent{
|
||||
name: name,
|
||||
filePath: filePath,
|
||||
lastLoadedEncryptionConfigHash: configContentHash,
|
||||
dynamicTransformers: dynamicTransformers,
|
||||
stopCh: stopCh,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s-hot-reload", name)),
|
||||
}
|
||||
encryptionConfig.queue.Add(workqueueKey)
|
||||
|
||||
return encryptionConfig
|
||||
}
|
||||
|
||||
// Run starts the controller and blocks until stopCh is closed.
|
||||
func (d *DynamicKMSEncryptionConfigContent) Run(ctx context.Context) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer d.queue.ShutDown()
|
||||
|
||||
klog.InfoS("Starting controller", "name", d.name)
|
||||
defer klog.InfoS("Shutting down controller", "name", d.name)
|
||||
|
||||
// start worker for processing content
|
||||
go wait.Until(d.runWorker, time.Second, ctx.Done())
|
||||
|
||||
// start the loop that watches the encryption config file until stopCh is closed.
|
||||
go wait.Until(func() {
|
||||
if err := d.watchEncryptionConfigFile(ctx.Done()); err != nil {
|
||||
// if there is an error while setting up or handling the watches, this will ensure that we will process the config file.
|
||||
defer d.queue.Add(workqueueKey)
|
||||
klog.ErrorS(err, "Failed to watch encryption config file, will retry later")
|
||||
}
|
||||
}, time.Second, ctx.Done())
|
||||
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func (d *DynamicKMSEncryptionConfigContent) watchEncryptionConfigFile(stopCh <-chan struct{}) error {
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating fsnotify watcher: %w", err)
|
||||
}
|
||||
defer watcher.Close()
|
||||
|
||||
if err = watcher.Add(d.filePath); err != nil {
|
||||
return fmt.Errorf("error adding watch for file %s: %w", d.filePath, err)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
if err := d.handleWatchEvent(event, watcher); err != nil {
|
||||
return err
|
||||
}
|
||||
case err := <-watcher.Errors:
|
||||
return fmt.Errorf("received fsnotify error: %w", err)
|
||||
case <-stopCh:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DynamicKMSEncryptionConfigContent) handleWatchEvent(event fsnotify.Event, watcher *fsnotify.Watcher) error {
|
||||
// This should be executed after restarting the watch (if applicable) to ensure no file event will be missing.
|
||||
defer d.queue.Add(workqueueKey)
|
||||
|
||||
// return if file has not been removed or renamed.
|
||||
if event.Op&(fsnotify.Remove|fsnotify.Rename) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := watcher.Remove(d.filePath); err != nil {
|
||||
klog.V(2).InfoS("Failed to remove file watch, it may have been deleted", "file", d.filePath, "err", err)
|
||||
}
|
||||
if err := watcher.Add(d.filePath); err != nil {
|
||||
return fmt.Errorf("error adding watch for file %s: %w", d.filePath, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// runWorker to process file content
|
||||
func (d *DynamicKMSEncryptionConfigContent) runWorker() {
|
||||
for d.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
// processNextWorkItem processes file content when there is a message in the queue.
|
||||
func (d *DynamicKMSEncryptionConfigContent) processNextWorkItem() bool {
|
||||
// key here is dummy item in the queue to trigger file content processing.
|
||||
key, quit := d.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer d.queue.Done(key)
|
||||
|
||||
var (
|
||||
updatedEffectiveConfig bool
|
||||
err error
|
||||
encryptionConfiguration *encryptionconfig.EncryptionConfiguration
|
||||
configChanged bool
|
||||
)
|
||||
|
||||
// get context to close the new transformers.
|
||||
ctx, closeTransformers := wait.ContextForChannel(d.stopCh)
|
||||
|
||||
defer func() {
|
||||
// TODO: increment success metric when updatedEffectiveConfig=true
|
||||
|
||||
if !updatedEffectiveConfig {
|
||||
// avoid leaking if we're not using the newly constructed transformers (due to an error or them not being changed)
|
||||
closeTransformers()
|
||||
}
|
||||
if err != nil {
|
||||
// TODO: increment failure metric
|
||||
utilruntime.HandleError(fmt.Errorf("error processing encryption config file %s: %v", d.filePath, err))
|
||||
// add dummy item back to the queue to trigger file content processing.
|
||||
d.queue.AddRateLimited(key)
|
||||
}
|
||||
}()
|
||||
|
||||
encryptionConfiguration, configChanged, err = d.processEncryptionConfig(ctx)
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
if !configChanged {
|
||||
return true
|
||||
}
|
||||
|
||||
if len(encryptionConfiguration.HealthChecks) != 1 {
|
||||
err = fmt.Errorf("unexpected number of healthz checks: %d. Should have only one", len(encryptionConfiguration.HealthChecks))
|
||||
return true
|
||||
}
|
||||
// get healthz checks for all new KMS plugins.
|
||||
if err = d.validateNewTransformersHealth(ctx, encryptionConfiguration.HealthChecks[0], encryptionConfiguration.KMSCloseGracePeriod); err != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
// update transformers.
|
||||
// when reload=true there must always be one healthz check.
|
||||
d.dynamicTransformers.Set(
|
||||
encryptionConfiguration.Transformers,
|
||||
closeTransformers,
|
||||
encryptionConfiguration.HealthChecks[0],
|
||||
encryptionConfiguration.KMSCloseGracePeriod,
|
||||
)
|
||||
|
||||
// update local copy of recent config content once update is successful.
|
||||
d.lastLoadedEncryptionConfigHash = encryptionConfiguration.EncryptionFileContentHash
|
||||
klog.V(2).InfoS("Loaded new kms encryption config content", "name", d.name)
|
||||
|
||||
updatedEffectiveConfig = true
|
||||
return true
|
||||
}
|
||||
|
||||
// loadEncryptionConfig processes the next set of content from the file.
|
||||
func (d *DynamicKMSEncryptionConfigContent) processEncryptionConfig(ctx context.Context) (
|
||||
encryptionConfiguration *encryptionconfig.EncryptionConfiguration,
|
||||
configChanged bool,
|
||||
err error,
|
||||
) {
|
||||
// this code path will only execute if reload=true. So passing true explicitly.
|
||||
encryptionConfiguration, err = encryptionconfig.LoadEncryptionConfig(d.filePath, true, ctx.Done())
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// check if encryptionConfig is different from the current. Do nothing if they are the same.
|
||||
if encryptionConfiguration.EncryptionFileContentHash == d.lastLoadedEncryptionConfigHash {
|
||||
klog.V(4).InfoS("Encryption config has not changed", "name", d.name)
|
||||
return nil, false, nil
|
||||
}
|
||||
return encryptionConfiguration, true, nil
|
||||
}
|
||||
|
||||
func (d *DynamicKMSEncryptionConfigContent) validateNewTransformersHealth(
|
||||
ctx context.Context,
|
||||
kmsPluginHealthzCheck healthz.HealthChecker,
|
||||
kmsPluginCloseGracePeriod time.Duration,
|
||||
) error {
|
||||
// test if new transformers are healthy
|
||||
var healthCheckError error
|
||||
|
||||
if kmsPluginCloseGracePeriod < 10*time.Second {
|
||||
kmsPluginCloseGracePeriod = 10 * time.Second
|
||||
}
|
||||
|
||||
pollErr := wait.PollImmediate(100*time.Millisecond, kmsPluginCloseGracePeriod, func() (bool, error) {
|
||||
// create a fake http get request to health check endpoint
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("/healthz/%s", kmsPluginHealthzCheck.Name()), nil)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
healthCheckError = kmsPluginHealthzCheck.Check(req)
|
||||
return healthCheckError == nil, nil
|
||||
})
|
||||
if pollErr != nil {
|
||||
return fmt.Errorf("health check for new transformers failed, polling error %v: %w", pollErr, healthCheckError)
|
||||
}
|
||||
klog.V(2).InfoS("Health check succeeded")
|
||||
return nil
|
||||
}
|
@@ -0,0 +1,172 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestProcessEncryptionConfig(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
filePath string
|
||||
expectError bool
|
||||
}{
|
||||
{
|
||||
name: "empty config file",
|
||||
filePath: "testdata/empty_config.yaml",
|
||||
expectError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
d := NewDynamicKMSEncryptionConfiguration(
|
||||
testCase.name,
|
||||
testCase.filePath,
|
||||
nil,
|
||||
"",
|
||||
ctx.Done(),
|
||||
)
|
||||
|
||||
_, _, err := d.processEncryptionConfig(ctx)
|
||||
if testCase.expectError && err == nil {
|
||||
t.Fatalf("expected error but got none")
|
||||
}
|
||||
if !testCase.expectError && err != nil {
|
||||
t.Fatalf("expected no error but got %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchEncryptionConfigFile(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
generateEvent func(filePath string, cancel context.CancelFunc)
|
||||
expectError bool
|
||||
}{
|
||||
{
|
||||
name: "file not renamed or removed",
|
||||
expectError: false,
|
||||
generateEvent: func(filePath string, cancel context.CancelFunc) {
|
||||
os.Chtimes(filePath, time.Now(), time.Now())
|
||||
|
||||
// wait for the event to be handled
|
||||
time.Sleep(1 * time.Second)
|
||||
cancel()
|
||||
os.Remove(filePath)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "file renamed",
|
||||
expectError: true,
|
||||
generateEvent: func(filePath string, cancel context.CancelFunc) {
|
||||
os.Rename(filePath, filePath+"1")
|
||||
|
||||
// wait for the event to be handled
|
||||
time.Sleep(1 * time.Second)
|
||||
os.Remove(filePath + "1")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "file removed",
|
||||
expectError: true,
|
||||
generateEvent: func(filePath string, cancel context.CancelFunc) {
|
||||
// allow watcher handle to start
|
||||
time.Sleep(1 * time.Second)
|
||||
os.Remove(filePath)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
testFilePath := copyFileForTest(t, "testdata/ec_config.yaml")
|
||||
|
||||
d := NewDynamicKMSEncryptionConfiguration(
|
||||
testCase.name,
|
||||
testFilePath,
|
||||
nil,
|
||||
"",
|
||||
ctx.Done(),
|
||||
)
|
||||
|
||||
errs := make(chan error, 1)
|
||||
go func() {
|
||||
err := d.watchEncryptionConfigFile(d.stopCh)
|
||||
errs <- err
|
||||
}()
|
||||
|
||||
testCase.generateEvent(d.filePath, cancel)
|
||||
|
||||
err := <-errs
|
||||
if testCase.expectError && err == nil {
|
||||
t.Fatalf("expected error but got none")
|
||||
}
|
||||
if !testCase.expectError && err != nil {
|
||||
t.Fatalf("expected no error but got %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func copyFileForTest(t *testing.T, srcFilePath string) string {
|
||||
t.Helper()
|
||||
|
||||
// get directory from source file path
|
||||
srcDir := filepath.Dir(srcFilePath)
|
||||
|
||||
// get file name from source file path
|
||||
srcFileName := filepath.Base(srcFilePath)
|
||||
|
||||
// set new file path
|
||||
dstFilePath := filepath.Join(srcDir, "test_"+srcFileName)
|
||||
|
||||
// copy src file to dst file
|
||||
r, err := os.Open(srcFilePath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to open source file: %v", err)
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
w, err := os.Create(dstFilePath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create destination file: %v", err)
|
||||
}
|
||||
defer w.Close()
|
||||
|
||||
// copy the file
|
||||
_, err = io.Copy(w, r)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to copy file: %v", err)
|
||||
}
|
||||
|
||||
err = w.Close()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to close destination file: %v", err)
|
||||
}
|
||||
|
||||
return dstFilePath
|
||||
}
|
@@ -0,0 +1,9 @@
|
||||
kind: EncryptionConfiguration
|
||||
apiVersion: apiserver.config.k8s.io/v1
|
||||
resources:
|
||||
- resources:
|
||||
- secrets
|
||||
providers:
|
||||
- kms:
|
||||
name: foo
|
||||
endpoint: unix:///tmp/testprovider.sock
|
@@ -27,15 +27,16 @@ import (
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
||||
"k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
|
||||
kmsconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller"
|
||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
@@ -64,7 +65,7 @@ type EtcdOptions struct {
|
||||
|
||||
// complete guards fields that must be initialized via Complete before the Apply methods can be used.
|
||||
complete bool
|
||||
transformerOverrides map[schema.GroupResource]value.Transformer
|
||||
resourceTransformers encryptionconfig.ResourceTransformers
|
||||
kmsPluginHealthzChecks []healthz.HealthChecker
|
||||
|
||||
// SkipHealthEndpoints, when true, causes the Apply methods to not set up health endpoints.
|
||||
@@ -125,7 +126,7 @@ func (s *EtcdOptions) Validate() []error {
|
||||
return allErrors
|
||||
}
|
||||
|
||||
// AddEtcdFlags adds flags related to etcd storage for a specific APIServer to the specified FlagSet
|
||||
// AddFlags adds flags related to etcd storage for a specific APIServer to the specified FlagSet
|
||||
func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
|
||||
if s == nil {
|
||||
return
|
||||
@@ -213,7 +214,11 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
|
||||
// Complete must be called exactly once before using any of the Apply methods. It is responsible for setting
|
||||
// up objects that must be created once and reused across multiple invocations such as storage transformers.
|
||||
// This method mutates the receiver (EtcdOptions). It must never mutate the inputs.
|
||||
func (s *EtcdOptions) Complete(storageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker, stopCh <-chan struct{}) error {
|
||||
func (s *EtcdOptions) Complete(
|
||||
storageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker,
|
||||
stopCh <-chan struct{},
|
||||
addPostStartHook func(name string, hook server.PostStartHookFunc) error,
|
||||
) error {
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
@@ -223,12 +228,56 @@ func (s *EtcdOptions) Complete(storageObjectCountTracker flowcontrolrequest.Stor
|
||||
}
|
||||
|
||||
if len(s.EncryptionProviderConfigFilepath) != 0 {
|
||||
transformerOverrides, kmsPluginHealthzChecks, err := encryptionconfig.LoadEncryptionConfig(s.EncryptionProviderConfigFilepath, s.EncryptionProviderConfigAutomaticReload, stopCh)
|
||||
ctx, closeTransformers := wait.ContextForChannel(stopCh)
|
||||
|
||||
encryptionConfiguration, err := encryptionconfig.LoadEncryptionConfig(s.EncryptionProviderConfigFilepath, s.EncryptionProviderConfigAutomaticReload, ctx.Done())
|
||||
if err != nil {
|
||||
// in case of error, we want to close partially initialized (if any) transformers
|
||||
closeTransformers()
|
||||
return err
|
||||
}
|
||||
s.transformerOverrides = transformerOverrides
|
||||
s.kmsPluginHealthzChecks = kmsPluginHealthzChecks
|
||||
|
||||
// enable kms hot reload controller only if the config file is set to be automatically reloaded
|
||||
if s.EncryptionProviderConfigAutomaticReload {
|
||||
// with reload=true we will always have 1 health check
|
||||
if len(encryptionConfiguration.HealthChecks) != 1 {
|
||||
// in case of error, we want to close partially initialized (if any) transformers
|
||||
closeTransformers()
|
||||
return fmt.Errorf("failed to start kms encryption config hot reload controller. only 1 health check should be available when reload is enabled")
|
||||
}
|
||||
|
||||
dynamicTransformers := encryptionconfig.NewDynamicTransformers(encryptionConfiguration.Transformers, encryptionConfiguration.HealthChecks[0], closeTransformers, encryptionConfiguration.KMSCloseGracePeriod)
|
||||
|
||||
s.resourceTransformers = dynamicTransformers
|
||||
s.kmsPluginHealthzChecks = []healthz.HealthChecker{dynamicTransformers}
|
||||
|
||||
// add post start hook to start hot reload controller
|
||||
// adding this hook here will ensure that it gets configured exactly once
|
||||
err = addPostStartHook(
|
||||
"start-encryption-provider-config-automatic-reload",
|
||||
func(hookContext server.PostStartHookContext) error {
|
||||
kmsConfigController := kmsconfigcontroller.NewDynamicKMSEncryptionConfiguration(
|
||||
"kms-encryption-config",
|
||||
s.EncryptionProviderConfigFilepath,
|
||||
dynamicTransformers,
|
||||
encryptionConfiguration.EncryptionFileContentHash,
|
||||
ctx.Done(),
|
||||
)
|
||||
|
||||
go kmsConfigController.Run(ctx)
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
// in case of error, we want to close partially initialized (if any) transformers
|
||||
closeTransformers()
|
||||
return fmt.Errorf("failed to add post start hook for kms encryption config hot reload controller: %w", err)
|
||||
}
|
||||
} else {
|
||||
s.resourceTransformers = encryptionconfig.StaticTransformers(encryptionConfiguration.Transformers)
|
||||
s.kmsPluginHealthzChecks = encryptionConfiguration.HealthChecks
|
||||
}
|
||||
}
|
||||
|
||||
s.StorageConfig.StorageObjectCountTracker = storageObjectCountTracker
|
||||
@@ -263,10 +312,10 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac
|
||||
}
|
||||
}
|
||||
|
||||
if len(s.transformerOverrides) > 0 {
|
||||
if s.resourceTransformers != nil {
|
||||
factory = &transformerStorageFactory{
|
||||
delegate: factory,
|
||||
transformerOverrides: s.transformerOverrides,
|
||||
resourceTransformers: s.resourceTransformers,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -400,7 +449,7 @@ var _ serverstorage.StorageFactory = &transformerStorageFactory{}
|
||||
|
||||
type transformerStorageFactory struct {
|
||||
delegate serverstorage.StorageFactory
|
||||
transformerOverrides map[schema.GroupResource]value.Transformer
|
||||
resourceTransformers encryptionconfig.ResourceTransformers
|
||||
}
|
||||
|
||||
func (t *transformerStorageFactory) NewConfig(resource schema.GroupResource) (*storagebackend.ConfigForResource, error) {
|
||||
@@ -409,14 +458,9 @@ func (t *transformerStorageFactory) NewConfig(resource schema.GroupResource) (*s
|
||||
return nil, err
|
||||
}
|
||||
|
||||
transformer, ok := t.transformerOverrides[resource]
|
||||
if !ok {
|
||||
return config, nil
|
||||
}
|
||||
|
||||
configCopy := *config
|
||||
resourceConfig := configCopy.Config
|
||||
resourceConfig.Transformer = transformer
|
||||
resourceConfig.Transformer = t.resourceTransformers.TransformerForResource(resource)
|
||||
configCopy.Config = resourceConfig
|
||||
|
||||
return &configCopy, nil
|
||||
|
@@ -306,7 +306,7 @@ func TestKMSHealthzEndpoint(t *testing.T) {
|
||||
EncryptionProviderConfigAutomaticReload: tc.reload,
|
||||
SkipHealthEndpoints: tc.skipHealth,
|
||||
}
|
||||
if err := etcdOptions.Complete(serverConfig.StorageObjectCountTracker, serverConfig.DrainedNotify()); err != nil {
|
||||
if err := etcdOptions.Complete(serverConfig.StorageObjectCountTracker, serverConfig.DrainedNotify(), serverConfig.AddPostStartHook); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := etcdOptions.ApplyTo(serverConfig); err != nil {
|
||||
@@ -345,7 +345,7 @@ func TestReadinessCheck(t *testing.T) {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
serverConfig := server.NewConfig(codecs)
|
||||
etcdOptions := &EtcdOptions{SkipHealthEndpoints: tc.skipHealth}
|
||||
if err := etcdOptions.Complete(serverConfig.StorageObjectCountTracker, serverConfig.DrainedNotify()); err != nil {
|
||||
if err := etcdOptions.Complete(serverConfig.StorageObjectCountTracker, serverConfig.DrainedNotify(), serverConfig.AddPostStartHook); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := etcdOptions.ApplyTo(serverConfig); err != nil {
|
||||
|
@@ -101,7 +101,7 @@ func (o *RecommendedOptions) AddFlags(fs *pflag.FlagSet) {
|
||||
// ApplyTo adds RecommendedOptions to the server configuration.
|
||||
// pluginInitializers can be empty, it is only need for additional initializers.
|
||||
func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
|
||||
if err := o.Etcd.Complete(config.Config.StorageObjectCountTracker, config.Config.DrainedNotify()); err != nil {
|
||||
if err := o.Etcd.Complete(config.Config.StorageObjectCountTracker, config.Config.DrainedNotify(), config.Config.AddPostStartHook); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := o.Etcd.ApplyTo(&config.Config); err != nil {
|
||||
|
Reference in New Issue
Block a user