Merge pull request #122919 from alexzielenski/apiserver/policy/mutating-initial

Refactor AdmissionPolicy for code sharing with mutating
This commit is contained in:
Kubernetes Prow Robot
2024-02-14 14:52:26 -08:00
committed by GitHub
43 changed files with 2361 additions and 1989 deletions

View File

@@ -20,7 +20,7 @@ import (
"context"
apiextensionsscheme "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme"
pluginvalidatingadmissionpolicy "k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
pluginvalidatingadmissionpolicy "k8s.io/apiserver/pkg/admission/plugin/policy/validating"
"k8s.io/apiserver/pkg/cel/openapi/resolver"
genericfeatures "k8s.io/apiserver/pkg/features"
k8sscheme "k8s.io/client-go/kubernetes/scheme"

View File

@@ -31,7 +31,7 @@ import (
utilvalidation "k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/validation/field"
plugincel "k8s.io/apiserver/pkg/admission/plugin/cel"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
validatingadmissionpolicy "k8s.io/apiserver/pkg/admission/plugin/policy/validating"
"k8s.io/apiserver/pkg/admission/plugin/webhook/matchconditions"
"k8s.io/apiserver/pkg/cel"
"k8s.io/apiserver/pkg/cel/environment"

View File

@@ -26,7 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
validatingadmissionpolicy "k8s.io/apiserver/pkg/admission/plugin/policy/validating"
admissionregistrationv1beta1apply "k8s.io/client-go/applyconfigurations/admissionregistration/v1beta1"
informerv1beta1 "k8s.io/client-go/informers/admissionregistration/v1beta1"
admissionregistrationv1beta1 "k8s.io/client-go/kubernetes/typed/admissionregistration/v1beta1"

View File

@@ -27,7 +27,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta/testrestmapper"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
validatingadmissionpolicy "k8s.io/apiserver/pkg/admission/plugin/policy/validating"
"k8s.io/apiserver/pkg/cel/openapi/resolver"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"

View File

@@ -20,7 +20,7 @@ package options
// This should probably be part of some configuration fed into the build for a
// given binary target.
import (
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
validatingadmissionpolicy "k8s.io/apiserver/pkg/admission/plugin/policy/validating"
// Admission policies
"k8s.io/kubernetes/plugin/pkg/admission/admit"
"k8s.io/kubernetes/plugin/pkg/admission/alwayspullimages"

View File

@@ -54,13 +54,22 @@ func NewCompositedCompiler(envSet *environment.EnvSet) (*CompositedCompiler, err
if err != nil {
return nil, err
}
compiler := NewCompiler(compositionContext.EnvSet)
filterCompiler := NewFilterCompiler(compositionContext.EnvSet)
return NewCompositedCompilerFromTemplate(compositionContext), nil
}
func NewCompositedCompilerFromTemplate(context *CompositionEnv) *CompositedCompiler {
context = &CompositionEnv{
MapType: context.MapType,
EnvSet: context.EnvSet,
CompiledVariables: map[string]CompilationResult{},
}
compiler := NewCompiler(context.EnvSet)
filterCompiler := NewFilterCompiler(context.EnvSet)
return &CompositedCompiler{
Compiler: compiler,
FilterCompiler: filterCompiler,
CompositionEnv: compositionContext,
}, nil
CompositionEnv: context,
}
}
func (c *CompositedCompiler) CompileAndStoreVariables(variables []NamedExpressionAccessor, options OptionalVariableDeclarations, mode environment.Type) {

View File

@@ -0,0 +1,38 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generic
import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
)
type PolicyAccessor interface {
GetName() string
GetNamespace() string
GetParamKind() *schema.GroupVersionKind
}
type BindingAccessor interface {
GetName() string
GetNamespace() string
// GetPolicyName returns the name of the (Validating/Mutating)AdmissionPolicy,
// which is cluster-scoped, so namespace is usually left blank.
// But we leave the door open to add a namespaced vesion in the future
GetPolicyName() types.NamespacedName
}

View File

@@ -0,0 +1,64 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generic
import (
"context"
"k8s.io/apiserver/pkg/admission"
)
// Hook represents a dynamic admission hook. The hook may be a webhook or a
// policy. For webhook, the Hook may describe how to contact the endpoint, expected
// cert, etc. For policies, the hook may describe a compiled policy-binding pair.
type Hook interface {
// All hooks are expected to contain zero or more match conditions, object
// selectors, namespace selectors to help the dispatcher decide when to apply
// the hook.
//
// Methods of matching logic is applied are specific to the hook and left up
// to the implementation.
}
// Source can list dynamic admission plugins.
type Source[H Hook] interface {
// Hooks returns the list of currently known admission hooks.
Hooks() []H
// Run the source. This method should be called only once at startup.
Run(ctx context.Context) error
// HasSynced returns true if the source has completed its initial sync.
HasSynced() bool
}
// Dispatcher dispatches evaluates an admission request against the currently
// active hooks returned by the source.
type Dispatcher[H Hook] interface {
// Dispatch a request to the policies. Dispatcher may choose not to
// call a hook, either because the rules of the hook does not match, or
// the namespaceSelector or the objectSelector of the hook does not
// match. A non-nil error means the request is rejected.
Dispatch(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces, hooks []H) error
}
// An evaluator represents a compiled CEL expression that can be evaluated a
// given a set of inputs used by the generic PolicyHook for Mutating and
// ValidatingAdmissionPolicy.
// Mutating and Validating may have different forms of evaluators
type Evaluator interface {
}

View File

@@ -0,0 +1,206 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generic
import (
"context"
"errors"
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/admission/plugin/policy/matching"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
)
// H is the Hook type generated by the source and consumed by the dispatcher.
type sourceFactory[H any] func(informers.SharedInformerFactory, kubernetes.Interface, dynamic.Interface, meta.RESTMapper) Source[H]
type dispatcherFactory[H any] func(authorizer.Authorizer, *matching.Matcher) Dispatcher[H]
type Invocation struct {
Resource schema.GroupVersionResource
Subresource string
Kind schema.GroupVersionKind
}
// AdmissionPolicyManager is an abstract admission plugin with all the
// infrastructure to define Admit or Validate on-top.
type Plugin[H any] struct {
*admission.Handler
sourceFactory sourceFactory[H]
dispatcherFactory dispatcherFactory[H]
source Source[H]
dispatcher Dispatcher[H]
matcher *matching.Matcher
informerFactory informers.SharedInformerFactory
client kubernetes.Interface
restMapper meta.RESTMapper
dynamicClient dynamic.Interface
stopCh <-chan struct{}
authorizer authorizer.Authorizer
enabled bool
}
var (
_ initializer.WantsExternalKubeInformerFactory = &Plugin[any]{}
_ initializer.WantsExternalKubeClientSet = &Plugin[any]{}
_ initializer.WantsRESTMapper = &Plugin[any]{}
_ initializer.WantsDynamicClient = &Plugin[any]{}
_ initializer.WantsDrainedNotification = &Plugin[any]{}
_ initializer.WantsAuthorizer = &Plugin[any]{}
_ admission.InitializationValidator = &Plugin[any]{}
)
func NewPlugin[H any](
handler *admission.Handler,
sourceFactory sourceFactory[H],
dispatcherFactory dispatcherFactory[H],
) *Plugin[H] {
return &Plugin[H]{
Handler: handler,
sourceFactory: sourceFactory,
dispatcherFactory: dispatcherFactory,
}
}
func (c *Plugin[H]) SetExternalKubeInformerFactory(f informers.SharedInformerFactory) {
c.informerFactory = f
}
func (c *Plugin[H]) SetExternalKubeClientSet(client kubernetes.Interface) {
c.client = client
}
func (c *Plugin[H]) SetRESTMapper(mapper meta.RESTMapper) {
c.restMapper = mapper
}
func (c *Plugin[H]) SetDynamicClient(client dynamic.Interface) {
c.dynamicClient = client
}
func (c *Plugin[H]) SetDrainedNotification(stopCh <-chan struct{}) {
c.stopCh = stopCh
}
func (c *Plugin[H]) SetAuthorizer(authorizer authorizer.Authorizer) {
c.authorizer = authorizer
}
func (c *Plugin[H]) SetMatcher(matcher *matching.Matcher) {
c.matcher = matcher
}
func (c *Plugin[H]) SetEnabled(enabled bool) {
c.enabled = enabled
}
// ValidateInitialization - once clientset and informer factory are provided, creates and starts the admission controller
func (c *Plugin[H]) ValidateInitialization() error {
// By default enabled is set to false. It is up to types which embed this
// struct to set it to true (if feature gate is enabled, or other conditions)
if !c.enabled {
return nil
}
if c.Handler == nil {
return errors.New("missing handler")
}
if c.informerFactory == nil {
return errors.New("missing informer factory")
}
if c.client == nil {
return errors.New("missing kubernetes client")
}
if c.restMapper == nil {
return errors.New("missing rest mapper")
}
if c.dynamicClient == nil {
return errors.New("missing dynamic client")
}
if c.stopCh == nil {
return errors.New("missing stop channel")
}
if c.authorizer == nil {
return errors.New("missing authorizer")
}
// Use default matcher
namespaceInformer := c.informerFactory.Core().V1().Namespaces()
c.matcher = matching.NewMatcher(namespaceInformer.Lister(), c.client)
if err := c.matcher.ValidateInitialization(); err != nil {
return err
}
c.source = c.sourceFactory(c.informerFactory, c.client, c.dynamicClient, c.restMapper)
c.dispatcher = c.dispatcherFactory(c.authorizer, c.matcher)
pluginContext, pluginContextCancel := context.WithCancel(context.Background())
go func() {
defer pluginContextCancel()
<-c.stopCh
}()
go func() {
err := c.source.Run(pluginContext)
if err != nil && !errors.Is(err, context.Canceled) {
utilruntime.HandleError(fmt.Errorf("policy source context unexpectedly closed: %v", err))
}
}()
c.SetReadyFunc(func() bool {
return namespaceInformer.Informer().HasSynced() && c.source.HasSynced()
})
return nil
}
func (c *Plugin[H]) Dispatch(
ctx context.Context,
a admission.Attributes,
o admission.ObjectInterfaces,
) (err error) {
if !c.enabled {
return nil
} else if isPolicyResource(a) {
return nil
} else if !c.WaitForReady() {
return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request"))
}
return c.dispatcher.Dispatch(ctx, a, o, c.source.Hooks())
}
func isPolicyResource(attr admission.Attributes) bool {
gvk := attr.GetResource()
if gvk.Group == "admissionregistration.k8s.io" {
if gvk.Resource == "validatingadmissionpolicies" || gvk.Resource == "validatingadmissionpolicybindings" {
return true
} else if gvk.Resource == "mutatingadmissionpolicies" || gvk.Resource == "mutatingadmissionpolicybindings" {
return true
}
}
return false
}

View File

@@ -0,0 +1,461 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generic
import (
"context"
goerrors "errors"
"fmt"
"sync"
"sync/atomic"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission/plugin/policy/internal/generic"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
type policySource[P runtime.Object, B runtime.Object, E Evaluator] struct {
ctx context.Context
policyInformer generic.Informer[P]
bindingInformer generic.Informer[B]
restMapper meta.RESTMapper
newPolicyAccessor func(P) PolicyAccessor
newBindingAccessor func(B) BindingAccessor
informerFactory informers.SharedInformerFactory
dynamicClient dynamic.Interface
compiler func(P) E
// Currently compiled list of valid/active policy-binding pairs
policies atomic.Pointer[[]PolicyHook[P, B, E]]
// Whether the cache of policies is dirty and needs to be recompiled
policiesDirty atomic.Bool
lock sync.Mutex
compiledPolicies map[types.NamespacedName]compiledPolicyEntry[E]
// Temporary until we use the dynamic informer factory
paramsCRDControllers map[schema.GroupVersionKind]*paramInfo
}
type paramInfo struct {
mapping meta.RESTMapping
// When the param is changed, or the informer is done being used, the cancel
// func should be called to stop/cleanup the original informer
cancelFunc func()
// The lister for this param
informer informers.GenericInformer
}
type compiledPolicyEntry[E Evaluator] struct {
policyVersion string
evaluator E
}
type PolicyHook[P runtime.Object, B runtime.Object, E Evaluator] struct {
Policy P
Bindings []B
ParamInformer informers.GenericInformer
ParamScope meta.RESTScope
Evaluator E
ConfigurationError error
}
var _ Source[PolicyHook[runtime.Object, runtime.Object, Evaluator]] = &policySource[runtime.Object, runtime.Object, Evaluator]{}
func NewPolicySource[P runtime.Object, B runtime.Object, E Evaluator](
policyInformer cache.SharedIndexInformer,
bindingInformer cache.SharedIndexInformer,
newPolicyAccessor func(P) PolicyAccessor,
newBindingAccessor func(B) BindingAccessor,
compiler func(P) E,
paramInformerFactory informers.SharedInformerFactory,
dynamicClient dynamic.Interface,
restMapper meta.RESTMapper,
) Source[PolicyHook[P, B, E]] {
res := &policySource[P, B, E]{
compiler: compiler,
policyInformer: generic.NewInformer[P](policyInformer),
bindingInformer: generic.NewInformer[B](bindingInformer),
compiledPolicies: map[types.NamespacedName]compiledPolicyEntry[E]{},
newPolicyAccessor: newPolicyAccessor,
newBindingAccessor: newBindingAccessor,
paramsCRDControllers: map[schema.GroupVersionKind]*paramInfo{},
informerFactory: paramInformerFactory,
dynamicClient: dynamicClient,
restMapper: restMapper,
}
return res
}
func (s *policySource[P, B, E]) Run(ctx context.Context) error {
if s.ctx != nil {
return fmt.Errorf("policy source already running")
}
// Wait for initial cache sync of policies and informers before reconciling
// any
if !cache.WaitForNamedCacheSync(fmt.Sprintf("%T", s), ctx.Done(), s.UpstreamHasSynced) {
err := ctx.Err()
if err == nil {
err = fmt.Errorf("initial cache sync for %T failed", s)
}
return err
}
s.ctx = ctx
// Perform initial policy compilation after initial list has finished
s.notify()
s.refreshPolicies()
notifyFuncs := cache.ResourceEventHandlerFuncs{
AddFunc: func(_ interface{}) {
s.notify()
},
UpdateFunc: func(_, _ interface{}) {
s.notify()
},
DeleteFunc: func(_ interface{}) {
s.notify()
},
}
handle, err := s.policyInformer.AddEventHandler(notifyFuncs)
if err != nil {
return err
}
defer func() {
if err := s.policyInformer.RemoveEventHandler(handle); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to remove policy event handler: %v", err))
}
}()
bindingHandle, err := s.bindingInformer.AddEventHandler(notifyFuncs)
if err != nil {
return err
}
defer func() {
if err := s.bindingInformer.RemoveEventHandler(bindingHandle); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to remove binding event handler: %v", err))
}
}()
// Start a worker that checks every second to see if policy data is dirty
// and needs to be recompiled
go func() {
// Loop every 1 second until context is cancelled, refreshing policies
wait.Until(s.refreshPolicies, 1*time.Second, ctx.Done())
}()
<-ctx.Done()
return nil
}
func (s *policySource[P, B, E]) UpstreamHasSynced() bool {
return s.policyInformer.HasSynced() && s.bindingInformer.HasSynced()
}
// HasSynced implements Source.
func (s *policySource[P, B, E]) HasSynced() bool {
// As an invariant we never store `nil` into the atomic list of processed
// policy hooks. If it is nil, then we haven't compiled all the policies
// and stored them yet.
return s.Hooks() != nil
}
// Hooks implements Source.
func (s *policySource[P, B, E]) Hooks() []PolicyHook[P, B, E] {
res := s.policies.Load()
// Error case should not happen since evaluation function never
// returns error
if res == nil {
// Not yet synced
return nil
}
return *res
}
func (s *policySource[P, B, E]) refreshPolicies() {
if !s.UpstreamHasSynced() {
return
} else if !s.policiesDirty.Swap(false) {
return
}
// It is ok the cache gets marked dirty again between us clearing the
// flag and us calculating the policies. The dirty flag would be marked again,
// and we'd have a no-op after comparing resource versions on the next sync.
klog.Infof("refreshing policies")
policies, err := s.calculatePolicyData()
// Intentionally store policy list regardless of error. There may be
// an error returned if there was a configuration error in one of the policies,
// but we would still want those policies evaluated
// (for instance to return error on failaction). Or if there was an error
// listing all policies at all, we would want to wipe the list.
s.policies.Store(&policies)
if err != nil {
// An error was generated while syncing policies. Mark it as dirty again
// so we can retry later
utilruntime.HandleError(fmt.Errorf("encountered error syncing policies: %v. Rescheduling policy sync", err))
s.notify()
}
}
func (s *policySource[P, B, E]) notify() {
s.policiesDirty.Store(true)
}
// calculatePolicyData calculates the list of policies and bindings for each
// policy. If there is an error in generation, it will return the error and
// the partial list of policies that were able to be generated. Policies that
// have an error will have a non-nil ConfigurationError field, but still be
// included in the result.
//
// This function caches the result of the intermediate compilations
func (s *policySource[P, B, E]) calculatePolicyData() ([]PolicyHook[P, B, E], error) {
if !s.UpstreamHasSynced() {
return nil, fmt.Errorf("cannot calculate policy data until upstream has synced")
}
// Fat-fingered lock that can be made more fine-tuned if required
s.lock.Lock()
defer s.lock.Unlock()
// Create a local copy of all policies and bindings
policiesToBindings := map[types.NamespacedName][]B{}
bindingList, err := s.bindingInformer.List(labels.Everything())
if err != nil {
// This should never happen unless types are misconfigured
// (can't use meta.accessor on them)
return nil, err
}
// Gather a list of all active policy bindings
for _, bindingSpec := range bindingList {
bindingAccessor := s.newBindingAccessor(bindingSpec)
policyKey := bindingAccessor.GetPolicyName()
// Add this binding to the list of bindings for this policy
policiesToBindings[policyKey] = append(policiesToBindings[policyKey], bindingSpec)
}
result := make([]PolicyHook[P, B, E], 0, len(bindingList))
usedParams := map[schema.GroupVersionKind]struct{}{}
var errs []error
for policyKey, bindingSpecs := range policiesToBindings {
var inf generic.NamespacedLister[P] = s.policyInformer
if len(policyKey.Namespace) > 0 {
inf = s.policyInformer.Namespaced(policyKey.Namespace)
}
policySpec, err := inf.Get(policyKey.Name)
if errors.IsNotFound(err) {
// Policy for bindings doesn't exist. This can happen if the policy
// was deleted before the binding, or the binding was created first.
//
// Just skip bindings that refer to non-existent policies
// If the policy is recreated, the cache will be marked dirty and
// this function will run again.
continue
} else if err != nil {
// This should never happen since fetching from a cache should never
// fail and this function checks that the cache was synced before
// even getting to this point.
errs = append(errs, err)
continue
}
policyAccessor := s.newPolicyAccessor(policySpec)
paramInformer, paramScope, configurationError := s.ensureParamsForPolicyLocked(policyAccessor.GetParamKind())
result = append(result, PolicyHook[P, B, E]{
Policy: policySpec,
Bindings: bindingSpecs,
Evaluator: s.compilePolicyLocked(policySpec),
ParamInformer: paramInformer,
ParamScope: paramScope,
ConfigurationError: configurationError,
})
// TEMPORARY UNTIL WE HAVE SHARED PARAM INFORMERS
if paramKind := policyAccessor.GetParamKind(); paramKind != nil {
usedParams[*paramKind] = struct{}{}
}
// Should queue a re-sync for policy sync error. If our shared param
// informer can notify us when CRD discovery changes we can remove this
// and just rely on the informer to notify us when the CRDs change
if configurationError != nil {
errs = append(errs, configurationError)
}
}
// Clean up orphaned policies by replacing the old cache of compiled policies
// (the map of used policies is updated by `compilePolicy`)
for policyKey := range s.compiledPolicies {
if _, wasSeen := policiesToBindings[policyKey]; !wasSeen {
delete(s.compiledPolicies, policyKey)
}
}
// Clean up orphaned param informers
for paramKind, info := range s.paramsCRDControllers {
if _, wasSeen := usedParams[paramKind]; !wasSeen {
info.cancelFunc()
delete(s.paramsCRDControllers, paramKind)
}
}
err = nil
if len(errs) > 0 {
err = goerrors.Join(errs...)
}
return result, err
}
// ensureParamsForPolicyLocked ensures that the informer for the paramKind is
// started and returns the informer and the scope of the paramKind.
//
// Must be called under write lock
func (s *policySource[P, B, E]) ensureParamsForPolicyLocked(paramSource *schema.GroupVersionKind) (informers.GenericInformer, meta.RESTScope, error) {
if paramSource == nil {
return nil, nil, nil
} else if info, ok := s.paramsCRDControllers[*paramSource]; ok {
return info.informer, info.mapping.Scope, nil
}
mapping, err := s.restMapper.RESTMapping(schema.GroupKind{
Group: paramSource.Group,
Kind: paramSource.Kind,
}, paramSource.Version)
if err != nil {
// Failed to resolve. Return error so we retry again (rate limited)
// Save a record of this definition with an evaluator that unconditionally
return nil, nil, fmt.Errorf("failed to find resource referenced by paramKind: '%v'", *paramSource)
}
// We are not watching this param. Start an informer for it.
instanceContext, instanceCancel := context.WithCancel(s.ctx)
var informer informers.GenericInformer
// Try to see if our provided informer factory has an informer for this type.
// We assume the informer is already started, and starts all types associated
// with it.
if genericInformer, err := s.informerFactory.ForResource(mapping.Resource); err == nil {
informer = genericInformer
// Start the informer
s.informerFactory.Start(instanceContext.Done())
} else {
// Dynamic JSON informer fallback.
// Cannot use shared dynamic informer since it would be impossible
// to clean CRD informers properly with multiple dependents
// (cannot start ahead of time, and cannot track dependencies via stopCh)
informer = dynamicinformer.NewFilteredDynamicInformer(
s.dynamicClient,
mapping.Resource,
corev1.NamespaceAll,
// Use same interval as is used for k8s typed sharedInformerFactory
// https://github.com/kubernetes/kubernetes/blob/7e0923899fed622efbc8679cca6b000d43633e38/cmd/kube-apiserver/app/server.go#L430
10*time.Minute,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
nil,
)
go informer.Informer().Run(instanceContext.Done())
}
klog.Infof("informer started for %v", *paramSource)
ret := &paramInfo{
mapping: *mapping,
cancelFunc: instanceCancel,
informer: informer,
}
s.paramsCRDControllers[*paramSource] = ret
return ret.informer, mapping.Scope, nil
}
// For testing
func (s *policySource[P, B, E]) getParamInformer(param schema.GroupVersionKind) (informers.GenericInformer, meta.RESTScope) {
s.lock.Lock()
defer s.lock.Unlock()
if info, ok := s.paramsCRDControllers[param]; ok {
return info.informer, info.mapping.Scope
}
return nil, nil
}
// compilePolicyLocked compiles the policy and returns the evaluator for it.
// If the policy has not changed since the last compilation, it will return
// the cached evaluator.
//
// Must be called under write lock
func (s *policySource[P, B, E]) compilePolicyLocked(policySpec P) E {
policyMeta, err := meta.Accessor(policySpec)
if err != nil {
// This should not happen if P, and B have ObjectMeta, but
// unfortunately there is no way to express "able to call
// meta.Accessor" as a type constraint
utilruntime.HandleError(err)
var emptyEvaluator E
return emptyEvaluator
}
key := types.NamespacedName{
Namespace: policyMeta.GetNamespace(),
Name: policyMeta.GetName(),
}
compiledPolicy, wasCompiled := s.compiledPolicies[key]
// If the policy or binding has changed since it was last compiled,
// and if there is no configuration error (like a missing param CRD)
// then we recompile
if !wasCompiled ||
compiledPolicy.policyVersion != policyMeta.GetResourceVersion() {
compiledPolicy = compiledPolicyEntry[E]{
policyVersion: policyMeta.GetResourceVersion(),
evaluator: s.compiler(policySpec),
}
s.compiledPolicies[key] = compiledPolicy
}
return compiledPolicy.evaluator
}

View File

@@ -0,0 +1,233 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generic_test
import (
"testing"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/admission/plugin/policy/generic"
"k8s.io/apiserver/pkg/admission/plugin/policy/matching"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/client-go/tools/cache"
)
func makeTestDispatcher(authorizer.Authorizer, *matching.Matcher) generic.Dispatcher[generic.PolicyHook[*FakePolicy, *FakeBinding, generic.Evaluator]] {
return nil
}
func TestPolicySourceHasSyncedEmpty(t *testing.T) {
testContext, testCancel, err := generic.NewPolicyTestContext(
func(fp *FakePolicy) generic.PolicyAccessor { return fp },
func(fb *FakeBinding) generic.BindingAccessor { return fb },
func(fp *FakePolicy) generic.Evaluator { return nil },
makeTestDispatcher,
nil,
nil,
)
require.NoError(t, err)
defer testCancel()
require.NoError(t, testContext.Start())
// Should be able to wait for cache sync
require.True(t, cache.WaitForCacheSync(testContext.Done(), testContext.Source.HasSynced), "cache should sync after informer running")
}
func TestPolicySourceHasSyncedInitialList(t *testing.T) {
// Create a list of fake policies
initialObjects := []runtime.Object{
&FakePolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "policy1",
},
},
&FakeBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "binding1",
},
PolicyName: "policy1",
},
}
testContext, testCancel, err := generic.NewPolicyTestContext(
func(fp *FakePolicy) generic.PolicyAccessor { return fp },
func(fb *FakeBinding) generic.BindingAccessor { return fb },
func(fp *FakePolicy) generic.Evaluator { return nil },
makeTestDispatcher,
initialObjects,
nil,
)
require.NoError(t, err)
defer testCancel()
require.NoError(t, testContext.Start())
// Should be able to wait for cache sync
require.Len(t, testContext.Source.Hooks(), 1, "should have one policy")
require.NoError(t, testContext.UpdateAndWait(
&FakePolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "policy2",
},
},
&FakeBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "binding2",
},
PolicyName: "policy2",
},
))
require.Len(t, testContext.Source.Hooks(), 2, "should have two policies")
require.NoError(t, testContext.UpdateAndWait(
&FakePolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "policy3",
},
},
&FakeBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "binding3",
},
PolicyName: "policy3",
},
&FakePolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "policy2",
},
ParamKind: &schema.GroupVersionKind{
Group: "policy.example.com",
Version: "v1",
Kind: "FakeParam",
},
},
))
require.Len(t, testContext.Source.Hooks(), 3, "should have 3 policies")
}
func TestPolicySourceBindsToPolicies(t *testing.T) {
// Create a list of fake policies
initialObjects := []runtime.Object{
&FakePolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "policy1",
},
},
&FakeBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "binding1",
},
PolicyName: "policy1",
},
}
testContext, testCancel, err := generic.NewPolicyTestContext(
func(fp *FakePolicy) generic.PolicyAccessor { return fp },
func(fb *FakeBinding) generic.BindingAccessor { return fb },
func(fp *FakePolicy) generic.Evaluator { return nil },
makeTestDispatcher,
initialObjects,
nil,
)
require.NoError(t, err)
defer testCancel()
require.NoError(t, testContext.Start())
require.Len(t, testContext.Source.Hooks(), 1, "should have one policy")
require.Len(t, testContext.Source.Hooks()[0].Bindings, 1, "should have one binding")
require.Equal(t, "binding1", testContext.Source.Hooks()[0].Bindings[0].GetName(), "should have one binding")
// Change the binding to another policy (policies without bindings should
// be ignored, so it should remove the first
require.NoError(t, testContext.UpdateAndWait(
&FakePolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "policy2",
},
},
&FakeBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "binding1",
},
PolicyName: "policy2",
}))
require.Len(t, testContext.Source.Hooks(), 1, "should have one policy")
require.Equal(t, "policy2", testContext.Source.Hooks()[0].Policy.GetName(), "policy name should be policy2")
require.Len(t, testContext.Source.Hooks()[0].Bindings, 1, "should have one binding")
require.Equal(t, "binding1", testContext.Source.Hooks()[0].Bindings[0].GetName(), "binding name should be binding1")
}
type FakePolicy struct {
metav1.TypeMeta
metav1.ObjectMeta
ParamKind *schema.GroupVersionKind
}
var _ generic.PolicyAccessor = &FakePolicy{}
type FakeBinding struct {
metav1.TypeMeta
metav1.ObjectMeta
PolicyName string
}
var _ generic.BindingAccessor = &FakeBinding{}
func (fp *FakePolicy) GetName() string {
return fp.Name
}
func (fp *FakePolicy) GetNamespace() string {
return fp.Namespace
}
func (fp *FakePolicy) GetParamKind() *schema.GroupVersionKind {
return fp.ParamKind
}
func (fb *FakeBinding) GetName() string {
return fb.Name
}
func (fb *FakeBinding) GetNamespace() string {
return fb.Namespace
}
func (fb *FakeBinding) GetPolicyName() types.NamespacedName {
return types.NamespacedName{
Name: fb.PolicyName,
}
}
func (fp *FakePolicy) DeepCopyObject() runtime.Object {
// totally fudged deepcopy
newFP := &FakePolicy{}
*newFP = *fp
return newFP
}
func (fb *FakeBinding) DeepCopyObject() runtime.Object {
// totally fudged deepcopy
newFB := &FakeBinding{}
*newFB = *fb
return newFB
}

View File

@@ -0,0 +1,608 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generic
import (
"context"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
dynamicfake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/featuregate"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/features"
)
// PolicyTestContext is everything you need to unit test a policy plugin
type PolicyTestContext[P runtime.Object, B runtime.Object, E Evaluator] struct {
context.Context
Plugin *Plugin[PolicyHook[P, B, E]]
Source Source[PolicyHook[P, B, E]]
Start func() error
scheme *runtime.Scheme
restMapper *meta.DefaultRESTMapper
policyGVR schema.GroupVersionResource
bindingGVR schema.GroupVersionResource
policyGVK schema.GroupVersionKind
bindingGVK schema.GroupVersionKind
nativeTracker clienttesting.ObjectTracker
policyAndBindingTracker clienttesting.ObjectTracker
unstructuredTracker clienttesting.ObjectTracker
}
func NewPolicyTestContext[P, B runtime.Object, E Evaluator](
newPolicyAccessor func(P) PolicyAccessor,
newBindingAccessor func(B) BindingAccessor,
compileFunc func(P) E,
dispatcher dispatcherFactory[PolicyHook[P, B, E]],
initialObjects []runtime.Object,
paramMappings []meta.RESTMapping,
) (*PolicyTestContext[P, B, E], func(), error) {
var Pexample P
var Bexample B
// Create a fake resource and kind for the provided policy and binding types
fakePolicyGVR := schema.GroupVersionResource{
Group: "policy.example.com",
Version: "v1",
Resource: "fakepolicies",
}
fakeBindingGVR := schema.GroupVersionResource{
Group: "policy.example.com",
Version: "v1",
Resource: "fakebindings",
}
fakePolicyGVK := fakePolicyGVR.GroupVersion().WithKind("FakePolicy")
fakeBindingGVK := fakeBindingGVR.GroupVersion().WithKind("FakeBinding")
policySourceTestScheme, err := func() (*runtime.Scheme, error) {
scheme := runtime.NewScheme()
if err := fake.AddToScheme(scheme); err != nil {
return nil, err
}
scheme.AddKnownTypeWithName(fakePolicyGVK, Pexample)
scheme.AddKnownTypeWithName(fakeBindingGVK, Bexample)
scheme.AddKnownTypeWithName(fakePolicyGVK.GroupVersion().WithKind(fakePolicyGVK.Kind+"List"), &FakeList[P]{})
scheme.AddKnownTypeWithName(fakeBindingGVK.GroupVersion().WithKind(fakeBindingGVK.Kind+"List"), &FakeList[B]{})
for _, mapping := range paramMappings {
// Skip if it is in the scheme already
if scheme.Recognizes(mapping.GroupVersionKind) {
continue
}
scheme.AddKnownTypeWithName(mapping.GroupVersionKind, &unstructured.Unstructured{})
scheme.AddKnownTypeWithName(mapping.GroupVersionKind.GroupVersion().WithKind(mapping.GroupVersionKind.Kind+"List"), &unstructured.UnstructuredList{})
}
return scheme, nil
}()
if err != nil {
return nil, nil, err
}
fakeRestMapper := func() *meta.DefaultRESTMapper {
res := meta.NewDefaultRESTMapper([]schema.GroupVersion{
{
Group: "",
Version: "v1",
},
})
res.Add(fakePolicyGVK, meta.RESTScopeRoot)
res.Add(fakeBindingGVK, meta.RESTScopeRoot)
res.Add(corev1.SchemeGroupVersion.WithKind("ConfigMap"), meta.RESTScopeNamespace)
for _, mapping := range paramMappings {
res.AddSpecific(mapping.GroupVersionKind, mapping.Resource, mapping.Resource, mapping.Scope)
}
return res
}()
nativeClient := fake.NewSimpleClientset()
dynamicClient := dynamicfake.NewSimpleDynamicClient(policySourceTestScheme)
fakeInformerFactory := informers.NewSharedInformerFactory(nativeClient, 30*time.Second)
// Make an object tracker specifically for our policies and bindings
policiesAndBindingsTracker := clienttesting.NewObjectTracker(
policySourceTestScheme,
serializer.NewCodecFactory(policySourceTestScheme).UniversalDecoder())
// Make an informer for our policies and bindings
policyInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return policiesAndBindingsTracker.List(fakePolicyGVR, fakePolicyGVK, "")
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return policiesAndBindingsTracker.Watch(fakePolicyGVR, "")
},
},
Pexample,
30*time.Second,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
bindingInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return policiesAndBindingsTracker.List(fakeBindingGVR, fakeBindingGVK, "")
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return policiesAndBindingsTracker.Watch(fakeBindingGVR, "")
},
},
Bexample,
30*time.Second,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
var source Source[PolicyHook[P, B, E]]
plugin := NewPlugin[PolicyHook[P, B, E]](
admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update),
func(sif informers.SharedInformerFactory, i1 kubernetes.Interface, i2 dynamic.Interface, r meta.RESTMapper) Source[PolicyHook[P, B, E]] {
source = NewPolicySource[P, B, E](
policyInformer,
bindingInformer,
newPolicyAccessor,
newBindingAccessor,
compileFunc,
sif,
i2,
r,
)
return source
}, dispatcher)
plugin.SetEnabled(true)
featureGate := featuregate.NewFeatureGate()
err = featureGate.Add(map[featuregate.Feature]featuregate.FeatureSpec{
//!TODO: move this to validating specific tests
features.ValidatingAdmissionPolicy: {
Default: true, PreRelease: featuregate.Beta}})
if err != nil {
return nil, nil, err
}
err = featureGate.SetFromMap(map[string]bool{string(features.ValidatingAdmissionPolicy): true})
if err != nil {
return nil, nil, err
}
testContext, testCancel := context.WithCancel(context.Background())
genericInitializer := initializer.New(
nativeClient,
dynamicClient,
fakeInformerFactory,
fakeAuthorizer{},
featureGate,
testContext.Done(),
)
genericInitializer.Initialize(plugin)
plugin.SetRESTMapper(fakeRestMapper)
if err := plugin.ValidateInitialization(); err != nil {
testCancel()
return nil, nil, err
}
res := &PolicyTestContext[P, B, E]{
Context: testContext,
Plugin: plugin,
Source: source,
restMapper: fakeRestMapper,
scheme: policySourceTestScheme,
policyGVK: fakePolicyGVK,
bindingGVK: fakeBindingGVK,
policyGVR: fakePolicyGVR,
bindingGVR: fakeBindingGVR,
nativeTracker: nativeClient.Tracker(),
policyAndBindingTracker: policiesAndBindingsTracker,
unstructuredTracker: dynamicClient.Tracker(),
}
for _, obj := range initialObjects {
err := res.updateOne(obj)
if err != nil {
testCancel()
return nil, nil, err
}
}
res.Start = func() error {
fakeInformerFactory.Start(res.Done())
go policyInformer.Run(res.Done())
go bindingInformer.Run(res.Done())
if !cache.WaitForCacheSync(res.Done(), res.Source.HasSynced) {
return fmt.Errorf("timed out waiting for initial cache sync")
}
return nil
}
return res, testCancel, nil
}
// UpdateAndWait updates the given object in the test, or creates it if it doesn't exist
// Depending upon object type, waits afterward until the object is synced
// by the policy source
//
// Be aware the UpdateAndWait will modify the ResourceVersion of the
// provided objects.
func (p *PolicyTestContext[P, B, E]) UpdateAndWait(objects ...runtime.Object) error {
return p.update(true, objects...)
}
// Update updates the given object in the test, or creates it if it doesn't exist
//
// Be aware the Update will modify the ResourceVersion of the
// provided objects.
func (p *PolicyTestContext[P, B, E]) Update(objects ...runtime.Object) error {
return p.update(false, objects...)
}
// Objects the given object in the test, or creates it if it doesn't exist
// Depending upon object type, waits afterward until the object is synced
// by the policy source
func (p *PolicyTestContext[P, B, E]) update(wait bool, objects ...runtime.Object) error {
for _, object := range objects {
if err := p.updateOne(object); err != nil {
return err
}
}
if wait {
timeoutCtx, timeoutCancel := context.WithTimeout(p, 3*time.Second)
defer timeoutCancel()
for _, object := range objects {
if err := p.WaitForReconcile(timeoutCtx, object); err != nil {
return fmt.Errorf("error waiting for reconcile of %v: %v", object, err)
}
}
}
return nil
}
// Depending upon object type, waits afterward until the object is synced
// by the policy source. Note that policies that are not bound are skipped,
// so you should not try to wait for an unbound policy. Create both the binding
// and policy, then wait.
func (p *PolicyTestContext[P, B, E]) WaitForReconcile(timeoutCtx context.Context, object runtime.Object) error {
if !p.Source.HasSynced() {
return nil
}
objectMeta, err := meta.Accessor(object)
if err != nil {
return err
}
objectGVK := object.GetObjectKind().GroupVersionKind()
switch objectGVK {
case p.policyGVK:
return wait.PollUntilContextCancel(timeoutCtx, 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) {
policies := p.Source.Hooks()
for _, policy := range policies {
policyMeta, err := meta.Accessor(policy.Policy)
if err != nil {
return true, err
} else if policyMeta.GetName() == objectMeta.GetName() && policyMeta.GetResourceVersion() == objectMeta.GetResourceVersion() {
return true, nil
}
}
return false, nil
})
case p.bindingGVK:
return wait.PollUntilContextCancel(timeoutCtx, 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) {
policies := p.Source.Hooks()
for _, policy := range policies {
for _, binding := range policy.Bindings {
bindingMeta, err := meta.Accessor(binding)
if err != nil {
return true, err
} else if bindingMeta.GetName() == objectMeta.GetName() && bindingMeta.GetResourceVersion() == objectMeta.GetResourceVersion() {
return true, nil
}
}
}
return false, nil
})
default:
// Do nothing, params are visible immediately
// Loop until one of the params is visible via get of the param informer
return wait.PollUntilContextCancel(timeoutCtx, 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) {
informer, scope := p.Source.(*policySource[P, B, E]).getParamInformer(objectGVK)
if informer == nil {
// Informer does not exist yet, keep waiting for sync
return false, nil
}
if !cache.WaitForCacheSync(timeoutCtx.Done(), informer.Informer().HasSynced) {
return false, fmt.Errorf("timed out waiting for cache sync of param informer")
}
var lister cache.GenericNamespaceLister = informer.Lister()
if scope == meta.RESTScopeNamespace {
lister = informer.Lister().ByNamespace(objectMeta.GetNamespace())
}
fetched, err := lister.Get(objectMeta.GetName())
if err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return true, err
}
// Ensure RV matches
fetchedMeta, err := meta.Accessor(fetched)
if err != nil {
return true, err
} else if fetchedMeta.GetResourceVersion() != objectMeta.GetResourceVersion() {
return false, nil
}
return true, nil
})
}
}
func (p *PolicyTestContext[P, B, E]) waitForDelete(ctx context.Context, objectGVK schema.GroupVersionKind, name types.NamespacedName) error {
srce := p.Source.(*policySource[P, B, E])
return wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) {
switch objectGVK {
case p.policyGVK:
for _, hook := range p.Source.Hooks() {
accessor := srce.newPolicyAccessor(hook.Policy)
if accessor.GetName() == name.Name && accessor.GetNamespace() == name.Namespace {
return false, nil
}
}
return true, nil
case p.bindingGVK:
for _, hook := range p.Source.Hooks() {
for _, binding := range hook.Bindings {
accessor := srce.newBindingAccessor(binding)
if accessor.GetName() == name.Name && accessor.GetNamespace() == name.Namespace {
return false, nil
}
}
}
return true, nil
default:
// Do nothing, params are visible immediately
// Loop until one of the params is visible via get of the param informer
informer, scope := p.Source.(*policySource[P, B, E]).getParamInformer(objectGVK)
if informer == nil {
return true, nil
}
var lister cache.GenericNamespaceLister = informer.Lister()
if scope == meta.RESTScopeNamespace {
lister = informer.Lister().ByNamespace(name.Namespace)
}
_, err = lister.Get(name.Name)
if err != nil {
if errors.IsNotFound(err) {
return true, nil
}
return false, err
}
return false, nil
}
})
}
func (p *PolicyTestContext[P, B, E]) updateOne(object runtime.Object) error {
objectMeta, err := meta.Accessor(object)
if err != nil {
return err
}
objectMeta.SetResourceVersion(string(uuid.NewUUID()))
objectGVK := object.GetObjectKind().GroupVersionKind()
if objectGVK.Empty() {
// If the object doesn't have a GVK, ask the schema for preferred GVK
knownKinds, _, err := p.scheme.ObjectKinds(object)
if err != nil {
return err
} else if len(knownKinds) == 0 {
return fmt.Errorf("no known GVKs for object in schema: %T", object)
}
toTake := 0
// Prefer GVK if it is our fake policy or binding
for i, knownKind := range knownKinds {
if knownKind == p.policyGVK || knownKind == p.bindingGVK {
toTake = i
break
}
}
objectGVK = knownKinds[toTake]
object.GetObjectKind().SetGroupVersionKind(objectGVK)
}
// Make sure GVK is known to the fake rest mapper. To prevent cryptic error
mapping, err := p.restMapper.RESTMapping(objectGVK.GroupKind(), objectGVK.Version)
if err != nil {
return err
}
switch objectGVK {
case p.policyGVK:
err := p.policyAndBindingTracker.Update(p.policyGVR, object, objectMeta.GetNamespace())
if errors.IsNotFound(err) {
err = p.policyAndBindingTracker.Create(p.policyGVR, object, objectMeta.GetNamespace())
}
return err
case p.bindingGVK:
err := p.policyAndBindingTracker.Update(p.bindingGVR, object, objectMeta.GetNamespace())
if errors.IsNotFound(err) {
err = p.policyAndBindingTracker.Create(p.bindingGVR, object, objectMeta.GetNamespace())
}
return err
default:
if _, ok := object.(*unstructured.Unstructured); ok {
if err := p.unstructuredTracker.Create(mapping.Resource, object, objectMeta.GetNamespace()); err != nil {
if errors.IsAlreadyExists(err) {
return p.unstructuredTracker.Update(mapping.Resource, object, objectMeta.GetNamespace())
}
return err
}
return nil
} else if err := p.nativeTracker.Create(mapping.Resource, object, objectMeta.GetNamespace()); err != nil {
if errors.IsAlreadyExists(err) {
return p.nativeTracker.Update(mapping.Resource, object, objectMeta.GetNamespace())
}
}
return nil
}
}
// Depending upon object type, waits afterward until the object is synced
// by the policy source
func (p *PolicyTestContext[P, B, E]) DeleteAndWait(object ...runtime.Object) error {
for _, object := range object {
if err := p.deleteOne(object); err != nil && !errors.IsNotFound(err) {
return err
}
}
timeoutCtx, timeoutCancel := context.WithTimeout(p, 3*time.Second)
defer timeoutCancel()
for _, object := range object {
accessor, err := meta.Accessor(object)
if err != nil {
return err
}
if err := p.waitForDelete(
timeoutCtx,
object.GetObjectKind().GroupVersionKind(),
types.NamespacedName{Name: accessor.GetName(), Namespace: accessor.GetNamespace()}); err != nil {
return err
}
}
return nil
}
func (p *PolicyTestContext[P, B, E]) deleteOne(object runtime.Object) error {
objectMeta, err := meta.Accessor(object)
if err != nil {
return err
}
objectMeta.SetResourceVersion(string(uuid.NewUUID()))
objectGVK := object.GetObjectKind().GroupVersionKind()
if objectGVK.Empty() {
// If the object doesn't have a GVK, ask the schema for preferred GVK
knownKinds, _, err := p.scheme.ObjectKinds(object)
if err != nil {
return err
} else if len(knownKinds) == 0 {
return fmt.Errorf("no known GVKs for object in schema: %T", object)
}
toTake := 0
// Prefer GVK if it is our fake policy or binding
for i, knownKind := range knownKinds {
if knownKind == p.policyGVK || knownKind == p.bindingGVK {
toTake = i
break
}
}
objectGVK = knownKinds[toTake]
object.GetObjectKind().SetGroupVersionKind(objectGVK)
}
// Make sure GVK is known to the fake rest mapper. To prevent cryptic error
mapping, err := p.restMapper.RESTMapping(objectGVK.GroupKind(), objectGVK.Version)
if err != nil {
return err
}
switch objectGVK {
case p.policyGVK:
return p.policyAndBindingTracker.Delete(p.policyGVR, objectMeta.GetNamespace(), objectMeta.GetName())
case p.bindingGVK:
return p.policyAndBindingTracker.Delete(p.bindingGVR, objectMeta.GetNamespace(), objectMeta.GetName())
default:
if _, ok := object.(*unstructured.Unstructured); ok {
return p.unstructuredTracker.Delete(mapping.Resource, objectMeta.GetNamespace(), objectMeta.GetName())
}
return p.nativeTracker.Delete(mapping.Resource, objectMeta.GetNamespace(), objectMeta.GetName())
}
}
type FakeList[T runtime.Object] struct {
metav1.TypeMeta
metav1.ListMeta
Items []T
}
func (fl *FakeList[P]) DeepCopyObject() runtime.Object {
copiedItems := make([]P, len(fl.Items))
for i, item := range fl.Items {
copiedItems[i] = item.DeepCopyObject().(P)
}
return &FakeList[P]{
TypeMeta: fl.TypeMeta,
ListMeta: fl.ListMeta,
Items: copiedItems,
}
}
type fakeAuthorizer struct{}
func (f fakeAuthorizer) Authorize(ctx context.Context, a authorizer.Attributes) (authorizer.Decision, string, error) {
return authorizer.DecisionAllow, "", nil
}

View File

@@ -39,7 +39,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic"
"k8s.io/apiserver/pkg/admission/plugin/policy/internal/generic"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"

View File

@@ -28,6 +28,10 @@ type informer[T runtime.Object] struct {
lister[T]
}
// Creates a generic informer around a type-erased cache.SharedIndexInformer
// It is incumbent on the caller to ensure that the generic type argument is
// consistent with the type of the objects stored inside the SharedIndexInformer
// as they will be casted.
func NewInformer[T runtime.Object](informe cache.SharedIndexInformer) Informer[T] {
return informer[T]{
SharedIndexInformer: informe,

View File

@@ -0,0 +1,95 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package validating
import (
"k8s.io/api/admissionregistration/v1beta1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/admission/plugin/policy/generic"
)
func NewValidatingAdmissionPolicyAccessor(obj *v1beta1.ValidatingAdmissionPolicy) generic.PolicyAccessor {
return &validatingAdmissionPolicyAccessor{
ValidatingAdmissionPolicy: obj,
}
}
func NewValidatingAdmissionPolicyBindingAccessor(obj *v1beta1.ValidatingAdmissionPolicyBinding) generic.BindingAccessor {
return &validatingAdmissionPolicyBindingAccessor{
ValidatingAdmissionPolicyBinding: obj,
}
}
type validatingAdmissionPolicyAccessor struct {
*v1beta1.ValidatingAdmissionPolicy
}
func (v *validatingAdmissionPolicyAccessor) GetNamespace() string {
return v.Namespace
}
func (v *validatingAdmissionPolicyAccessor) GetName() string {
return v.Name
}
func (v *validatingAdmissionPolicyAccessor) GetParamKind() *schema.GroupVersionKind {
paramKind := v.Spec.ParamKind
if paramKind == nil {
return nil
}
groupVersion, err := schema.ParseGroupVersion(paramKind.APIVersion)
if err != nil {
// A validatingadmissionpolicy which passes validation should have
// a parseable APIVersion for its ParamKind, so this should never happen
// if the policy is valid.
//
// Return a bogus but non-nil GVK that will throw an error about the
// invalid APIVersion when the param is looked up.
return &schema.GroupVersionKind{
Group: paramKind.APIVersion,
Version: "",
Kind: paramKind.Kind,
}
}
return &schema.GroupVersionKind{
Group: groupVersion.Group,
Version: groupVersion.Version,
Kind: paramKind.Kind,
}
}
type validatingAdmissionPolicyBindingAccessor struct {
*v1beta1.ValidatingAdmissionPolicyBinding
}
func (v *validatingAdmissionPolicyBindingAccessor) GetNamespace() string {
return v.Namespace
}
func (v *validatingAdmissionPolicyBindingAccessor) GetName() string {
return v.Name
}
func (v *validatingAdmissionPolicyBindingAccessor) GetPolicyName() types.NamespacedName {
return types.NamespacedName{
Namespace: "",
Name: v.Spec.PolicyName,
}
}

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package validatingadmissionpolicy
package validating
import (
"context"

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package validatingadmissionpolicy
package validating
import (
"context"

View File

@@ -14,15 +14,13 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package validatingadmissionpolicy
package validating
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"k8s.io/api/admissionregistration/v1beta1"
@@ -34,47 +32,32 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
utiljson "k8s.io/apimachinery/pkg/util/json"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/admission"
celmetrics "k8s.io/apiserver/pkg/admission/cel"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/matching"
"k8s.io/apiserver/pkg/admission/plugin/policy/generic"
celmetrics "k8s.io/apiserver/pkg/admission/plugin/policy/validating/metrics"
celconfig "k8s.io/apiserver/pkg/apis/cel"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/warning"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
var _ CELPolicyEvaluator = &celAdmissionController{}
// celAdmissionController is the top-level controller for admission control using CEL
// it is responsible for watching policy definitions, bindings, and config param CRDs
type celAdmissionController struct {
// Controller which manages book-keeping for the cluster's dynamic policy
// information.
policyController *policyController
// atomic []policyData
// list of every known policy definition, and all informatoin required to
// validate its bindings against an object.
// A snapshot of the current policy configuration is synced with this field
// asynchronously
definitions atomic.Value
authz authorizer.Authorizer
type dispatcher struct {
matcher Matcher
authz authorizer.Authorizer
}
// Everything someone might need to validate a single ValidatingPolicyDefinition
// against all of its registered bindings.
type policyData struct {
definitionInfo
paramInfo
bindings []bindingInfo
var _ generic.Dispatcher[PolicyHook] = &dispatcher{}
func NewDispatcher(
authorizer authorizer.Authorizer,
matcher Matcher,
) generic.Dispatcher[PolicyHook] {
return &dispatcher{
matcher: matcher,
authz: authorizer,
}
}
// contains the cel PolicyDecisions along with the ValidatingAdmissionPolicy and ValidatingAdmissionPolicyBinding
@@ -85,110 +68,8 @@ type policyDecisionWithMetadata struct {
Binding *v1beta1.ValidatingAdmissionPolicyBinding
}
// namespaceName is used as a key in definitionInfo and bindingInfos
type namespacedName struct {
namespace, name string
}
type definitionInfo struct {
// Error about the state of the definition's configuration and the cluster
// preventing its enforcement or compilation.
// Reset every reconciliation
configurationError error
// Last value seen by this controller to be used in policy enforcement
// May not be nil
lastReconciledValue *v1beta1.ValidatingAdmissionPolicy
}
type bindingInfo struct {
// Compiled CEL expression turned into an validator
validator Validator
// Last value seen by this controller to be used in policy enforcement
// May not be nil
lastReconciledValue *v1beta1.ValidatingAdmissionPolicyBinding
}
type paramInfo struct {
// Controller which is watching this param CRD
controller generic.Controller[runtime.Object]
// Function to call to stop the informer and clean up the controller
stop func()
// Whether this param is cluster or namespace scoped
scope meta.RESTScope
// Policy Definitions which refer to this param CRD
dependentDefinitions sets.Set[namespacedName]
}
func NewAdmissionController(
// Injected Dependencies
informerFactory informers.SharedInformerFactory,
client kubernetes.Interface,
restMapper meta.RESTMapper,
dynamicClient dynamic.Interface,
authz authorizer.Authorizer,
) CELPolicyEvaluator {
return &celAdmissionController{
definitions: atomic.Value{},
policyController: newPolicyController(
restMapper,
client,
dynamicClient,
informerFactory,
nil,
NewMatcher(matching.NewMatcher(informerFactory.Core().V1().Namespaces().Lister(), client)),
generic.NewInformer[*v1beta1.ValidatingAdmissionPolicy](
informerFactory.Admissionregistration().V1beta1().ValidatingAdmissionPolicies().Informer()),
generic.NewInformer[*v1beta1.ValidatingAdmissionPolicyBinding](
informerFactory.Admissionregistration().V1beta1().ValidatingAdmissionPolicyBindings().Informer()),
),
authz: authz,
}
}
func (c *celAdmissionController) Run(stopCh <-chan struct{}) {
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
c.policyController.Run(ctx)
}()
wg.Add(1)
go func() {
defer wg.Done()
// Wait indefinitely until policies/bindings are listed & handled before
// allowing policies to be refreshed
if !cache.WaitForNamedCacheSync("cel-admission-controller", ctx.Done(), c.policyController.HasSynced) {
return
}
// Loop every 1 second until context is cancelled, refreshing policies
wait.Until(c.refreshPolicies, 1*time.Second, ctx.Done())
}()
<-stopCh
cancel()
wg.Wait()
}
const maxAuditAnnotationValueLength = 10 * 1024
func (c *celAdmissionController) Validate(
ctx context.Context,
a admission.Attributes,
o admission.ObjectInterfaces,
) (err error) {
if !c.HasSynced() {
return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request"))
}
// Dispatch implements generic.Dispatcher.
func (c *dispatcher) Dispatch(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces, hooks []PolicyHook) error {
var deniedDecisions []policyDecisionWithMetadata
@@ -232,19 +113,18 @@ func (c *celAdmissionController) Validate(
})
}
}
policyDatas := c.definitions.Load().([]policyData)
authz := newCachingAuthorizer(c.authz)
for _, definitionInfo := range policyDatas {
for _, hook := range hooks {
// versionedAttributes will be set to non-nil inside of the loop, but
// is scoped outside of the param loop so we only convert once. We defer
// conversion so that it is only performed when we know a policy matches,
// saving the cost of converting non-matching requests.
var versionedAttr *admission.VersionedAttributes
definition := definitionInfo.lastReconciledValue
matches, matchResource, matchKind, err := c.policyController.matcher.DefinitionMatches(a, o, definition)
definition := hook.Policy
matches, matchResource, matchKind, err := c.matcher.DefinitionMatches(a, o, definition)
if err != nil {
// Configuration error.
addConfigError(err, definition, nil)
@@ -253,18 +133,17 @@ func (c *celAdmissionController) Validate(
if !matches {
// Policy definition does not match request
continue
} else if definitionInfo.configurationError != nil {
} else if hook.ConfigurationError != nil {
// Configuration error.
addConfigError(definitionInfo.configurationError, definition, nil)
addConfigError(hook.ConfigurationError, definition, nil)
continue
}
auditAnnotationCollector := newAuditAnnotationCollector()
for _, bindingInfo := range definitionInfo.bindings {
for _, binding := range hook.Bindings {
// If the key is inside dependentBindings, there is guaranteed to
// be a bindingInfo for it
binding := bindingInfo.lastReconciledValue
matches, err := c.policyController.matcher.BindingMatches(a, o, binding)
matches, err := c.matcher.BindingMatches(a, o, binding)
if err != nil {
// Configuration error.
addConfigError(err, definition, binding)
@@ -274,7 +153,14 @@ func (c *celAdmissionController) Validate(
continue
}
params, err := c.collectParams(definition.Spec.ParamKind, definitionInfo.paramInfo, binding.Spec.ParamRef, a.GetNamespace())
params, err := c.collectParams(
definition.Spec.ParamKind,
hook.ParamInformer,
hook.ParamScope,
binding.Spec.ParamRef,
a.GetNamespace(),
)
if err != nil {
addConfigError(err, definition, binding)
continue
@@ -303,7 +189,7 @@ func (c *celAdmissionController) Validate(
// if it is cluster scoped, namespaceName will be empty
// Otherwise, get the Namespace resource.
if namespaceName != "" {
namespace, err = c.policyController.matcher.GetNamespace(namespaceName)
namespace, err = c.matcher.GetNamespace(namespaceName)
if err != nil {
return err
}
@@ -323,7 +209,18 @@ func (c *celAdmissionController) Validate(
nested: param,
}
}
validationResults = append(validationResults, bindingInfo.validator.Validate(ctx, matchResource, versionedAttr, p, namespace, celconfig.RuntimeCELCostBudget, authz))
validationResults = append(validationResults,
hook.Evaluator.Validate(
ctx,
matchResource,
versionedAttr,
p,
namespace,
celconfig.RuntimeCELCostBudget,
authz,
),
)
}
for _, validationResult := range validationResults {
@@ -344,7 +241,7 @@ func (c *celAdmissionController) Validate(
})
celmetrics.Metrics.ObserveRejection(ctx, decision.Elapsed, definition.Name, binding.Name, "active")
case v1beta1.Audit:
c.publishValidationFailureAnnotation(binding, i, decision, versionedAttr)
publishValidationFailureAnnotation(binding, i, decision, versionedAttr)
celmetrics.Metrics.ObserveAudit(ctx, decision.Elapsed, definition.Name, binding.Name, "active")
case v1beta1.Warn:
warning.AddWarning(ctx, "", fmt.Sprintf("Validation failed for ValidatingAdmissionPolicy '%s' with binding '%s': %s", definition.Name, binding.Name, decision.Message))
@@ -411,28 +308,30 @@ func (c *celAdmissionController) Validate(
}
// Returns objects to use to evaluate the policy
func (c *celAdmissionController) collectParams(
// Copied with minor modification to account for slightly different arguments
func (c *dispatcher) collectParams(
paramKind *v1beta1.ParamKind,
info paramInfo,
paramInformer informers.GenericInformer,
paramScope meta.RESTScope,
paramRef *v1beta1.ParamRef,
namespace string,
) ([]runtime.Object, error) {
// If definition has paramKind, paramRef is required in binding.
// If definition has no paramKind, paramRef set in binding will be ignored.
var params []runtime.Object
var paramStore generic.NamespacedLister[runtime.Object]
var paramStore cache.GenericNamespaceLister
// Make sure the param kind is ready to use
if paramKind != nil && paramRef != nil {
if info.controller == nil {
if paramInformer == nil {
return nil, fmt.Errorf("paramKind kind `%v` not known",
paramKind.String())
}
// Set up cluster-scoped, or namespaced access to the params
// "default" if not provided, and paramKind is namespaced
paramStore = info.controller.Informer()
if info.scope.Name() == meta.RESTScopeNameNamespace {
paramStore = paramInformer.Lister()
if paramScope.Name() == meta.RESTScopeNameNamespace {
paramsNamespace := namespace
if len(paramRef.Namespace) > 0 {
paramsNamespace = paramRef.Namespace
@@ -442,16 +341,16 @@ func (c *celAdmissionController) collectParams(
return nil, fmt.Errorf("cannot use namespaced paramRef in policy binding that matches cluster-scoped resources")
}
paramStore = info.controller.Informer().Namespaced(paramsNamespace)
paramStore = paramInformer.Lister().ByNamespace(paramsNamespace)
}
// If the param informer for this admission policy has not yet
// had time to perform an initial listing, don't attempt to use
// it.
timeoutCtx, cancel := context.WithTimeout(c.policyController.context, 1*time.Second)
timeoutCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
if !cache.WaitForCacheSync(timeoutCtx.Done(), info.controller.HasSynced) {
if !cache.WaitForCacheSync(timeoutCtx.Done(), paramInformer.Informer().HasSynced) {
return nil, fmt.Errorf("paramKind kind `%v` not yet synced to use for admission",
paramKind.String())
}
@@ -467,7 +366,7 @@ func (c *celAdmissionController) collectParams(
// Policy ParamKind is set, but binding does not use it.
// Validate with nil params
return []runtime.Object{nil}, nil
case len(paramRef.Namespace) > 0 && info.scope.Name() == meta.RESTScopeRoot.Name():
case len(paramRef.Namespace) > 0 && paramScope.Name() == meta.RESTScopeRoot.Name():
// Not allowed to set namespace for cluster-scoped param
return nil, fmt.Errorf("paramRef.namespace must not be provided for a cluster-scoped `paramKind`")
@@ -527,10 +426,10 @@ func (c *celAdmissionController) collectParams(
return params, nil
}
func (c *celAdmissionController) publishValidationFailureAnnotation(binding *v1beta1.ValidatingAdmissionPolicyBinding, expressionIndex int, decision PolicyDecision, attributes admission.Attributes) {
func publishValidationFailureAnnotation(binding *v1beta1.ValidatingAdmissionPolicyBinding, expressionIndex int, decision PolicyDecision, attributes admission.Attributes) {
key := "validation.policy.admission.k8s.io/validation_failure"
// Marshal to a list of failures since, in the future, we may need to support multiple failures
valueJson, err := utiljson.Marshal([]validationFailureValue{{
valueJSON, err := utiljson.Marshal([]ValidationFailureValue{{
ExpressionIndex: expressionIndex,
Message: decision.Message,
ValidationActions: binding.Spec.ValidationActions,
@@ -540,27 +439,17 @@ func (c *celAdmissionController) publishValidationFailureAnnotation(binding *v1b
if err != nil {
klog.Warningf("Failed to set admission audit annotation %s for ValidatingAdmissionPolicy %s and ValidatingAdmissionPolicyBinding %s: %v", key, binding.Spec.PolicyName, binding.Name, err)
}
value := string(valueJson)
value := string(valueJSON)
if err := attributes.AddAnnotation(key, value); err != nil {
klog.Warningf("Failed to set admission audit annotation %s to %s for ValidatingAdmissionPolicy %s and ValidatingAdmissionPolicyBinding %s: %v", key, value, binding.Spec.PolicyName, binding.Name, err)
}
}
func (c *celAdmissionController) HasSynced() bool {
return c.policyController.HasSynced() && c.definitions.Load() != nil
}
func (c *celAdmissionController) ValidateInitialization() error {
return c.policyController.matcher.ValidateInitialization()
}
func (c *celAdmissionController) refreshPolicies() {
c.definitions.Store(c.policyController.latestPolicyData())
}
const maxAuditAnnotationValueLength = 10 * 1024
// validationFailureValue defines the JSON format of a "validation.policy.admission.k8s.io/validation_failure" audit
// annotation value.
type validationFailureValue struct {
type ValidationFailureValue struct {
Message string `json:"message"`
Policy string `json:"policy"`
Binding string `json:"binding"`

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package validatingadmissionpolicy
package validating
import (
"context"

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package validatingadmissionpolicy
package validating
import (
"context"

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package validatingadmissionpolicy
package validating
import (
"k8s.io/api/admissionregistration/v1beta1"
@@ -23,7 +23,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/matching"
"k8s.io/apiserver/pkg/admission/plugin/policy/matching"
)
var _ matching.MatchCriteria = &matchCriteria{}

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package validatingadmissionpolicy
package validating
import (
celgo "github.com/google/cel-go/cel"

View File

@@ -0,0 +1,202 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package validating
import (
"context"
"io"
v1 "k8s.io/api/admissionregistration/v1"
"k8s.io/api/admissionregistration/v1beta1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/apiserver/pkg/admission/plugin/cel"
"k8s.io/apiserver/pkg/admission/plugin/policy/generic"
"k8s.io/apiserver/pkg/admission/plugin/policy/matching"
"k8s.io/apiserver/pkg/admission/plugin/webhook/matchconditions"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/cel/environment"
"k8s.io/apiserver/pkg/features"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/component-base/featuregate"
)
const (
// PluginName indicates the name of admission plug-in
PluginName = "ValidatingAdmissionPolicy"
)
var (
compositionEnvTemplate *cel.CompositionEnv = func() *cel.CompositionEnv {
compositionEnvTemplate, err := cel.NewCompositionEnv(cel.VariablesTypeName, environment.MustBaseEnvSet(environment.DefaultCompatibilityVersion()))
if err != nil {
panic(err)
}
return compositionEnvTemplate
}()
)
// Register registers a plugin
func Register(plugins *admission.Plugins) {
plugins.Register(PluginName, func(configFile io.Reader) (admission.Interface, error) {
return NewPlugin(configFile), nil
})
}
// Plugin is an implementation of admission.Interface.
type Policy = v1beta1.ValidatingAdmissionPolicy
type PolicyBinding = v1beta1.ValidatingAdmissionPolicyBinding
type PolicyEvaluator = Validator
type PolicyHook = generic.PolicyHook[*Policy, *PolicyBinding, PolicyEvaluator]
type Plugin struct {
*generic.Plugin[PolicyHook]
}
var _ admission.Interface = &Plugin{}
var _ admission.ValidationInterface = &Plugin{}
var _ initializer.WantsFeatures = &Plugin{}
func NewPlugin(_ io.Reader) *Plugin {
handler := admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update)
return &Plugin{
Plugin: generic.NewPlugin(
handler,
func(f informers.SharedInformerFactory, client kubernetes.Interface, dynamicClient dynamic.Interface, restMapper meta.RESTMapper) generic.Source[PolicyHook] {
return generic.NewPolicySource(
f.Admissionregistration().V1beta1().ValidatingAdmissionPolicies().Informer(),
f.Admissionregistration().V1beta1().ValidatingAdmissionPolicyBindings().Informer(),
NewValidatingAdmissionPolicyAccessor,
NewValidatingAdmissionPolicyBindingAccessor,
compilePolicy,
f,
dynamicClient,
restMapper,
)
},
func(a authorizer.Authorizer, m *matching.Matcher) generic.Dispatcher[PolicyHook] {
return NewDispatcher(a, NewMatcher(m))
},
),
}
}
// Validate makes an admission decision based on the request attributes.
func (a *Plugin) Validate(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {
return a.Plugin.Dispatch(ctx, attr, o)
}
func (a *Plugin) InspectFeatureGates(featureGates featuregate.FeatureGate) {
a.Plugin.SetEnabled(featureGates.Enabled(features.ValidatingAdmissionPolicy))
}
func compilePolicy(policy *Policy) Validator {
hasParam := false
if policy.Spec.ParamKind != nil {
hasParam = true
}
optionalVars := cel.OptionalVariableDeclarations{HasParams: hasParam, HasAuthorizer: true}
expressionOptionalVars := cel.OptionalVariableDeclarations{HasParams: hasParam, HasAuthorizer: false}
failurePolicy := convertv1beta1FailurePolicyTypeTov1FailurePolicyType(policy.Spec.FailurePolicy)
var matcher matchconditions.Matcher = nil
matchConditions := policy.Spec.MatchConditions
filterCompiler := cel.NewCompositedCompilerFromTemplate(compositionEnvTemplate)
filterCompiler.CompileAndStoreVariables(convertv1beta1Variables(policy.Spec.Variables), optionalVars, environment.StoredExpressions)
if len(matchConditions) > 0 {
matchExpressionAccessors := make([]cel.ExpressionAccessor, len(matchConditions))
for i := range matchConditions {
matchExpressionAccessors[i] = (*matchconditions.MatchCondition)(&matchConditions[i])
}
matcher = matchconditions.NewMatcher(filterCompiler.Compile(matchExpressionAccessors, optionalVars, environment.StoredExpressions), failurePolicy, "policy", "validate", policy.Name)
}
res := NewValidator(
filterCompiler.Compile(convertv1beta1Validations(policy.Spec.Validations), optionalVars, environment.StoredExpressions),
matcher,
filterCompiler.Compile(convertv1beta1AuditAnnotations(policy.Spec.AuditAnnotations), optionalVars, environment.StoredExpressions),
filterCompiler.Compile(convertv1beta1MessageExpressions(policy.Spec.Validations), expressionOptionalVars, environment.StoredExpressions),
failurePolicy,
)
return res
}
func convertv1beta1FailurePolicyTypeTov1FailurePolicyType(policyType *v1beta1.FailurePolicyType) *v1.FailurePolicyType {
if policyType == nil {
return nil
}
var v1FailPolicy v1.FailurePolicyType
if *policyType == v1beta1.Fail {
v1FailPolicy = v1.Fail
} else if *policyType == v1beta1.Ignore {
v1FailPolicy = v1.Ignore
}
return &v1FailPolicy
}
func convertv1beta1Validations(inputValidations []v1beta1.Validation) []cel.ExpressionAccessor {
celExpressionAccessor := make([]cel.ExpressionAccessor, len(inputValidations))
for i, validation := range inputValidations {
validation := ValidationCondition{
Expression: validation.Expression,
Message: validation.Message,
Reason: validation.Reason,
}
celExpressionAccessor[i] = &validation
}
return celExpressionAccessor
}
func convertv1beta1MessageExpressions(inputValidations []v1beta1.Validation) []cel.ExpressionAccessor {
celExpressionAccessor := make([]cel.ExpressionAccessor, len(inputValidations))
for i, validation := range inputValidations {
if validation.MessageExpression != "" {
condition := MessageExpressionCondition{
MessageExpression: validation.MessageExpression,
}
celExpressionAccessor[i] = &condition
}
}
return celExpressionAccessor
}
func convertv1beta1AuditAnnotations(inputValidations []v1beta1.AuditAnnotation) []cel.ExpressionAccessor {
celExpressionAccessor := make([]cel.ExpressionAccessor, len(inputValidations))
for i, validation := range inputValidations {
validation := AuditAnnotationCondition{
Key: validation.Key,
ValueExpression: validation.ValueExpression,
}
celExpressionAccessor[i] = &validation
}
return celExpressionAccessor
}
func convertv1beta1Variables(variables []v1beta1.Variable) []cel.NamedExpressionAccessor {
namedExpressions := make([]cel.NamedExpressionAccessor, len(variables))
for i, variable := range variables {
namedExpressions[i] = &Variable{Name: variable.Name, Expression: variable.Expression}
}
return namedExpressions
}

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package validatingadmissionpolicy
package validating
import (
"net/http"

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package validatingadmissionpolicy
package validating
import (
"errors"

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package validatingadmissionpolicy
package validating
import (
"fmt"
@@ -26,11 +26,35 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/cel/openapi/resolver"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kube-openapi/pkg/validation/spec"
)
var (
scheme *runtime.Scheme = func() *runtime.Scheme {
res := runtime.NewScheme()
if err := v1beta1.AddToScheme(res); err != nil {
panic(err)
}
if err := fake.AddToScheme(res); err != nil {
panic(err)
}
return res
}()
)
func must3[T any, I any](val T, _ I, err error) T {
if err != nil {
panic(err)
}
return val
}
func TestExtractTypeNames(t *testing.T) {
for _, tc := range []struct {
name string

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package validatingadmissionpolicy
package validating
import (
"context"

View File

@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package validatingadmissionpolicy
package validating
import (
"context"

View File

@@ -1,197 +0,0 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package validatingadmissionpolicy
import (
"context"
"errors"
"fmt"
"io"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/features"
"k8s.io/client-go/dynamic"
"k8s.io/component-base/featuregate"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
)
////////////////////////////////////////////////////////////////////////////////
// Plugin Definition
////////////////////////////////////////////////////////////////////////////////
// Definition for CEL admission plugin. This is the entry point into the
// CEL admission control system.
//
// Each plugin is asked to validate every object update.
const (
// PluginName indicates the name of admission plug-in
PluginName = "ValidatingAdmissionPolicy"
)
// Register registers a plugin
func Register(plugins *admission.Plugins) {
plugins.Register(PluginName, func(config io.Reader) (admission.Interface, error) {
return NewPlugin()
})
}
////////////////////////////////////////////////////////////////////////////////
// Plugin Initialization & Dependency Injection
////////////////////////////////////////////////////////////////////////////////
type celAdmissionPlugin struct {
*admission.Handler
evaluator CELPolicyEvaluator
inspectedFeatureGates bool
enabled bool
// Injected Dependencies
informerFactory informers.SharedInformerFactory
client kubernetes.Interface
restMapper meta.RESTMapper
dynamicClient dynamic.Interface
stopCh <-chan struct{}
authorizer authorizer.Authorizer
}
var _ initializer.WantsExternalKubeInformerFactory = &celAdmissionPlugin{}
var _ initializer.WantsExternalKubeClientSet = &celAdmissionPlugin{}
var _ initializer.WantsRESTMapper = &celAdmissionPlugin{}
var _ initializer.WantsDynamicClient = &celAdmissionPlugin{}
var _ initializer.WantsDrainedNotification = &celAdmissionPlugin{}
var _ initializer.WantsAuthorizer = &celAdmissionPlugin{}
var _ admission.InitializationValidator = &celAdmissionPlugin{}
var _ admission.ValidationInterface = &celAdmissionPlugin{}
func NewPlugin() (admission.Interface, error) {
return &celAdmissionPlugin{
Handler: admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update),
}, nil
}
func (c *celAdmissionPlugin) SetExternalKubeInformerFactory(f informers.SharedInformerFactory) {
c.informerFactory = f
}
func (c *celAdmissionPlugin) SetExternalKubeClientSet(client kubernetes.Interface) {
c.client = client
}
func (c *celAdmissionPlugin) SetRESTMapper(mapper meta.RESTMapper) {
c.restMapper = mapper
}
func (c *celAdmissionPlugin) SetDynamicClient(client dynamic.Interface) {
c.dynamicClient = client
}
func (c *celAdmissionPlugin) SetDrainedNotification(stopCh <-chan struct{}) {
c.stopCh = stopCh
}
func (c *celAdmissionPlugin) SetAuthorizer(authorizer authorizer.Authorizer) {
c.authorizer = authorizer
}
func (c *celAdmissionPlugin) InspectFeatureGates(featureGates featuregate.FeatureGate) {
if featureGates.Enabled(features.ValidatingAdmissionPolicy) {
c.enabled = true
}
c.inspectedFeatureGates = true
}
// ValidateInitialization - once clientset and informer factory are provided, creates and starts the admission controller
func (c *celAdmissionPlugin) ValidateInitialization() error {
if !c.inspectedFeatureGates {
return fmt.Errorf("%s did not see feature gates", PluginName)
}
if !c.enabled {
return nil
}
if c.informerFactory == nil {
return errors.New("missing informer factory")
}
if c.client == nil {
return errors.New("missing kubernetes client")
}
if c.restMapper == nil {
return errors.New("missing rest mapper")
}
if c.dynamicClient == nil {
return errors.New("missing dynamic client")
}
if c.stopCh == nil {
return errors.New("missing stop channel")
}
if c.authorizer == nil {
return errors.New("missing authorizer")
}
c.evaluator = NewAdmissionController(c.informerFactory, c.client, c.restMapper, c.dynamicClient, c.authorizer)
if err := c.evaluator.ValidateInitialization(); err != nil {
return err
}
c.SetReadyFunc(c.evaluator.HasSynced)
go c.evaluator.Run(c.stopCh)
return nil
}
////////////////////////////////////////////////////////////////////////////////
// admission.ValidationInterface
////////////////////////////////////////////////////////////////////////////////
func (c *celAdmissionPlugin) Handles(operation admission.Operation) bool {
return true
}
func (c *celAdmissionPlugin) Validate(
ctx context.Context,
a admission.Attributes,
o admission.ObjectInterfaces,
) (err error) {
if !c.enabled {
return nil
}
// isPolicyResource determines if an admission.Attributes object is describing
// the admission of a ValidatingAdmissionPolicy or a ValidatingAdmissionPolicyBinding
if isPolicyResource(a) {
return
}
if !c.WaitForReady() {
return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request"))
}
return c.evaluator.Validate(ctx, a, o)
}
func isPolicyResource(attr admission.Attributes) bool {
gvk := attr.GetResource()
if gvk.Group == "admissionregistration.k8s.io" {
if gvk.Resource == "validatingadmissionpolicies" || gvk.Resource == "validatingadmissionpolicybindings" {
return true
}
}
return false
}

View File

@@ -1,551 +0,0 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package validatingadmissionpolicy
import (
"context"
"fmt"
"sync"
"time"
v1 "k8s.io/api/admissionregistration/v1"
"k8s.io/api/admissionregistration/v1beta1"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
celmetrics "k8s.io/apiserver/pkg/admission/cel"
"k8s.io/apiserver/pkg/admission/plugin/cel"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic"
"k8s.io/apiserver/pkg/admission/plugin/webhook/matchconditions"
"k8s.io/apiserver/pkg/cel/environment"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
type policyController struct {
once sync.Once
context context.Context
dynamicClient dynamic.Interface
informerFactory informers.SharedInformerFactory
restMapper meta.RESTMapper
policyDefinitionsController generic.Controller[*v1beta1.ValidatingAdmissionPolicy]
policyBindingController generic.Controller[*v1beta1.ValidatingAdmissionPolicyBinding]
// Provided to the policy's Compile function as an injected dependency to
// assist with compiling its expressions to CEL
// pass nil to create filter compiler in demand
filterCompiler cel.FilterCompiler
matcher Matcher
newValidator
client kubernetes.Interface
// Lock which protects
// All Below fields
// All above fields should be assumed constant
mutex sync.RWMutex
cachedPolicies []policyData
// controller and metadata
paramsCRDControllers map[v1beta1.ParamKind]*paramInfo
// Index for each definition namespace/name, contains all binding
// namespace/names known to exist for that definition
definitionInfo map[namespacedName]*definitionInfo
// Index for each bindings namespace/name. Contains compiled templates
// for the binding depending on the policy/param combination.
bindingInfos map[namespacedName]*bindingInfo
// Map from namespace/name of a definition to a set of namespace/name
// of bindings which depend on it.
// All keys must have at least one dependent binding
// All binding names MUST exist as a key bindingInfos
definitionsToBindings map[namespacedName]sets.Set[namespacedName]
}
type newValidator func(validationFilter cel.Filter, celMatcher matchconditions.Matcher, auditAnnotationFilter, messageFilter cel.Filter, failurePolicy *v1.FailurePolicyType) Validator
func newPolicyController(
restMapper meta.RESTMapper,
client kubernetes.Interface,
dynamicClient dynamic.Interface,
informerFactory informers.SharedInformerFactory,
filterCompiler cel.FilterCompiler,
matcher Matcher,
policiesInformer generic.Informer[*v1beta1.ValidatingAdmissionPolicy],
bindingsInformer generic.Informer[*v1beta1.ValidatingAdmissionPolicyBinding],
) *policyController {
res := &policyController{}
*res = policyController{
filterCompiler: filterCompiler,
definitionInfo: make(map[namespacedName]*definitionInfo),
bindingInfos: make(map[namespacedName]*bindingInfo),
paramsCRDControllers: make(map[v1beta1.ParamKind]*paramInfo),
definitionsToBindings: make(map[namespacedName]sets.Set[namespacedName]),
matcher: matcher,
newValidator: NewValidator,
policyDefinitionsController: generic.NewController(
policiesInformer,
res.reconcilePolicyDefinition,
generic.ControllerOptions{
Workers: 1,
Name: "cel-policy-definitions",
},
),
policyBindingController: generic.NewController(
bindingsInformer,
res.reconcilePolicyBinding,
generic.ControllerOptions{
Workers: 1,
Name: "cel-policy-bindings",
},
),
restMapper: restMapper,
dynamicClient: dynamicClient,
informerFactory: informerFactory,
client: client,
}
return res
}
func (c *policyController) Run(ctx context.Context) {
// Only support being run once
c.once.Do(func() {
c.context = ctx
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
c.policyDefinitionsController.Run(ctx)
}()
wg.Add(1)
go func() {
defer wg.Done()
c.policyBindingController.Run(ctx)
}()
<-ctx.Done()
wg.Wait()
})
}
func (c *policyController) HasSynced() bool {
return c.policyDefinitionsController.HasSynced() && c.policyBindingController.HasSynced()
}
func (c *policyController) reconcilePolicyDefinition(namespace, name string, definition *v1beta1.ValidatingAdmissionPolicy) error {
c.mutex.Lock()
defer c.mutex.Unlock()
err := c.reconcilePolicyDefinitionSpec(namespace, name, definition)
return err
}
func (c *policyController) reconcilePolicyDefinitionSpec(namespace, name string, definition *v1beta1.ValidatingAdmissionPolicy) error {
c.cachedPolicies = nil // invalidate cachedPolicies
// Namespace for policydefinition is empty.
nn := getNamespaceName(namespace, name)
info, ok := c.definitionInfo[nn]
if !ok {
info = &definitionInfo{}
c.definitionInfo[nn] = info
// TODO(DangerOnTheRanger): add support for "warn" being a valid enforcementAction
celmetrics.Metrics.ObserveDefinition(context.TODO(), "active", "deny")
}
// Skip reconcile if the spec of the definition is unchanged and had a
// successful previous sync
if info.configurationError == nil && info.lastReconciledValue != nil && definition != nil &&
apiequality.Semantic.DeepEqual(info.lastReconciledValue.Spec, definition.Spec) {
return nil
}
var paramSource *v1beta1.ParamKind
if definition != nil {
paramSource = definition.Spec.ParamKind
}
// If param source has changed, remove definition as dependent of old params
// If there are no more dependents of old param, stop and clean up controller
if info.lastReconciledValue != nil && info.lastReconciledValue.Spec.ParamKind != nil {
oldParamSource := *info.lastReconciledValue.Spec.ParamKind
// If we are:
// - switching from having a param to not having a param (includes deletion)
// - or from having a param to a different one
// we remove dependency on the controller.
if paramSource == nil || *paramSource != oldParamSource {
if oldParamInfo, ok := c.paramsCRDControllers[oldParamSource]; ok {
oldParamInfo.dependentDefinitions.Delete(nn)
if len(oldParamInfo.dependentDefinitions) == 0 {
oldParamInfo.stop()
delete(c.paramsCRDControllers, oldParamSource)
}
}
}
}
// Reset all previously compiled evaluators in case something relevant in
// definition has changed.
for key := range c.definitionsToBindings[nn] {
bindingInfo := c.bindingInfos[key]
bindingInfo.validator = nil
c.bindingInfos[key] = bindingInfo
}
if definition == nil {
delete(c.definitionInfo, nn)
return nil
}
// Update definition info
info.lastReconciledValue = definition
info.configurationError = nil
if paramSource == nil {
// Skip setting up controller for empty param type
return nil
}
// find GVR for params
// Parse param source into a GVK
paramSourceGV, err := schema.ParseGroupVersion(paramSource.APIVersion)
if err != nil {
// Failed to resolve. Return error so we retry again (rate limited)
// Save a record of this definition with an evaluator that unconditionally
info.configurationError = fmt.Errorf("failed to parse apiVersion of paramKind '%v' with error: %w", paramSource.String(), err)
// Return nil, since this error cannot be resolved by waiting more time
return nil
}
paramsGVR, err := c.restMapper.RESTMapping(schema.GroupKind{
Group: paramSourceGV.Group,
Kind: paramSource.Kind,
}, paramSourceGV.Version)
if err != nil {
// Failed to resolve. Return error so we retry again (rate limited)
// Save a record of this definition with an evaluator that unconditionally
//
info.configurationError = fmt.Errorf("failed to find resource referenced by paramKind: '%v'", paramSourceGV.WithKind(paramSource.Kind))
return info.configurationError
}
paramInfo := c.ensureParamInfo(paramSource, paramsGVR)
paramInfo.dependentDefinitions.Insert(nn)
return nil
}
// Ensures that there is an informer started for the given GVK to be used as a
// param
func (c *policyController) ensureParamInfo(paramSource *v1beta1.ParamKind, mapping *meta.RESTMapping) *paramInfo {
if info, ok := c.paramsCRDControllers[*paramSource]; ok {
return info
}
// We are not watching this param. Start an informer for it.
instanceContext, instanceCancel := context.WithCancel(c.context)
var informer cache.SharedIndexInformer
// Try to see if our provided informer factory has an informer for this type.
// We assume the informer is already started, and starts all types associated
// with it.
if genericInformer, err := c.informerFactory.ForResource(mapping.Resource); err == nil {
informer = genericInformer.Informer()
// Ensure the informer is started
// Use policyController's context rather than the instance context.
// PolicyController context is expected to last until app shutdown
// This is due to behavior of informerFactory which would cause the
// informer to stop running once the context is cancelled, and
// never started again.
c.informerFactory.Start(c.context.Done())
} else {
// Dynamic JSON informer fallback.
// Cannot use shared dynamic informer since it would be impossible
// to clean CRD informers properly with multiple dependents
// (cannot start ahead of time, and cannot track dependencies via stopCh)
informer = dynamicinformer.NewFilteredDynamicInformer(
c.dynamicClient,
mapping.Resource,
corev1.NamespaceAll,
// Use same interval as is used for k8s typed sharedInformerFactory
// https://github.com/kubernetes/kubernetes/blob/7e0923899fed622efbc8679cca6b000d43633e38/cmd/kube-apiserver/app/server.go#L430
10*time.Minute,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
nil,
).Informer()
go informer.Run(instanceContext.Done())
}
controller := generic.NewController(
generic.NewInformer[runtime.Object](informer),
c.reconcileParams,
generic.ControllerOptions{
Workers: 1,
Name: paramSource.String() + "-controller",
},
)
ret := &paramInfo{
controller: controller,
stop: instanceCancel,
scope: mapping.Scope,
dependentDefinitions: sets.New[namespacedName](),
}
c.paramsCRDControllers[*paramSource] = ret
go controller.Run(instanceContext)
return ret
}
func (c *policyController) reconcilePolicyBinding(namespace, name string, binding *v1beta1.ValidatingAdmissionPolicyBinding) error {
c.mutex.Lock()
defer c.mutex.Unlock()
c.cachedPolicies = nil // invalidate cachedPolicies
// Namespace for PolicyBinding is empty. In the future a namespaced binding
// may be added
// https://github.com/kubernetes/enhancements/blob/bf5c3c81ea2081d60c1dc7c832faa98479e06209/keps/sig-api-machinery/3488-cel-admission-control/README.md?plain=1#L1042
nn := getNamespaceName(namespace, name)
info, ok := c.bindingInfos[nn]
if !ok {
info = &bindingInfo{}
c.bindingInfos[nn] = info
}
// Skip if the spec of the binding is unchanged.
if info.lastReconciledValue != nil && binding != nil &&
apiequality.Semantic.DeepEqual(info.lastReconciledValue.Spec, binding.Spec) {
return nil
}
var oldNamespacedDefinitionName namespacedName
if info.lastReconciledValue != nil {
// All validating policies are cluster-scoped so have empty namespace
oldNamespacedDefinitionName = getNamespaceName("", info.lastReconciledValue.Spec.PolicyName)
}
var namespacedDefinitionName namespacedName
if binding != nil {
// All validating policies are cluster-scoped so have empty namespace
namespacedDefinitionName = getNamespaceName("", binding.Spec.PolicyName)
}
// Remove record of binding from old definition if the referred policy
// has changed
if oldNamespacedDefinitionName != namespacedDefinitionName {
if dependentBindings, ok := c.definitionsToBindings[oldNamespacedDefinitionName]; ok {
dependentBindings.Delete(nn)
// if there are no more dependent bindings, remove knowledge of the
// definition altogether
if len(dependentBindings) == 0 {
delete(c.definitionsToBindings, oldNamespacedDefinitionName)
}
}
}
if binding == nil {
delete(c.bindingInfos, nn)
return nil
}
// Add record of binding to new definition
if dependentBindings, ok := c.definitionsToBindings[namespacedDefinitionName]; ok {
dependentBindings.Insert(nn)
} else {
c.definitionsToBindings[namespacedDefinitionName] = sets.New(nn)
}
// Remove compiled template for old binding
info.validator = nil
info.lastReconciledValue = binding
return nil
}
func (c *policyController) reconcileParams(namespace, name string, params runtime.Object) error {
// Do nothing.
// When we add informational type checking we will need to compile in the
// reconcile loops instead of lazily so we can add compiler errors / type
// checker errors to the status of the resources.
return nil
}
// Fetches the latest set of policy data or recalculates it if it has changed
// since it was last fetched
func (c *policyController) latestPolicyData() []policyData {
existing := func() []policyData {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.cachedPolicies
}()
if existing != nil {
return existing
}
c.mutex.Lock()
defer c.mutex.Unlock()
var res []policyData
for definitionNN, definitionInfo := range c.definitionInfo {
var bindingInfos []bindingInfo
for bindingNN := range c.definitionsToBindings[definitionNN] {
bindingInfo := c.bindingInfos[bindingNN]
if bindingInfo.validator == nil && definitionInfo.configurationError == nil {
hasParam := false
if definitionInfo.lastReconciledValue.Spec.ParamKind != nil {
hasParam = true
}
optionalVars := cel.OptionalVariableDeclarations{HasParams: hasParam, HasAuthorizer: true}
expressionOptionalVars := cel.OptionalVariableDeclarations{HasParams: hasParam, HasAuthorizer: false}
failurePolicy := convertv1beta1FailurePolicyTypeTov1FailurePolicyType(definitionInfo.lastReconciledValue.Spec.FailurePolicy)
var matcher matchconditions.Matcher = nil
matchConditions := definitionInfo.lastReconciledValue.Spec.MatchConditions
filterCompiler := c.filterCompiler
if filterCompiler == nil {
compositedCompiler, err := cel.NewCompositedCompiler(environment.MustBaseEnvSet(environment.DefaultCompatibilityVersion()))
if err == nil {
filterCompiler = compositedCompiler
compositedCompiler.CompileAndStoreVariables(convertv1beta1Variables(definitionInfo.lastReconciledValue.Spec.Variables), optionalVars, environment.StoredExpressions)
} else {
utilruntime.HandleError(err)
}
}
if len(matchConditions) > 0 {
matchExpressionAccessors := make([]cel.ExpressionAccessor, len(matchConditions))
for i := range matchConditions {
matchExpressionAccessors[i] = (*matchconditions.MatchCondition)(&matchConditions[i])
}
matcher = matchconditions.NewMatcher(filterCompiler.Compile(matchExpressionAccessors, optionalVars, environment.StoredExpressions), failurePolicy, "policy", "validate", definitionInfo.lastReconciledValue.Name)
}
bindingInfo.validator = c.newValidator(
filterCompiler.Compile(convertv1beta1Validations(definitionInfo.lastReconciledValue.Spec.Validations), optionalVars, environment.StoredExpressions),
matcher,
filterCompiler.Compile(convertv1beta1AuditAnnotations(definitionInfo.lastReconciledValue.Spec.AuditAnnotations), optionalVars, environment.StoredExpressions),
filterCompiler.Compile(convertv1beta1MessageExpressions(definitionInfo.lastReconciledValue.Spec.Validations), expressionOptionalVars, environment.StoredExpressions),
failurePolicy,
)
}
bindingInfos = append(bindingInfos, *bindingInfo)
}
var pInfo paramInfo
if paramKind := definitionInfo.lastReconciledValue.Spec.ParamKind; paramKind != nil {
if info, ok := c.paramsCRDControllers[*paramKind]; ok {
pInfo = *info
}
}
res = append(res, policyData{
definitionInfo: *definitionInfo,
paramInfo: pInfo,
bindings: bindingInfos,
})
}
c.cachedPolicies = res
return res
}
func convertv1beta1FailurePolicyTypeTov1FailurePolicyType(policyType *v1beta1.FailurePolicyType) *v1.FailurePolicyType {
if policyType == nil {
return nil
}
var v1FailPolicy v1.FailurePolicyType
if *policyType == v1beta1.Fail {
v1FailPolicy = v1.Fail
} else if *policyType == v1beta1.Ignore {
v1FailPolicy = v1.Ignore
}
return &v1FailPolicy
}
func convertv1beta1Validations(inputValidations []v1beta1.Validation) []cel.ExpressionAccessor {
celExpressionAccessor := make([]cel.ExpressionAccessor, len(inputValidations))
for i, validation := range inputValidations {
validation := ValidationCondition{
Expression: validation.Expression,
Message: validation.Message,
Reason: validation.Reason,
}
celExpressionAccessor[i] = &validation
}
return celExpressionAccessor
}
func convertv1beta1MessageExpressions(inputValidations []v1beta1.Validation) []cel.ExpressionAccessor {
celExpressionAccessor := make([]cel.ExpressionAccessor, len(inputValidations))
for i, validation := range inputValidations {
if validation.MessageExpression != "" {
condition := MessageExpressionCondition{
MessageExpression: validation.MessageExpression,
}
celExpressionAccessor[i] = &condition
}
}
return celExpressionAccessor
}
func convertv1beta1AuditAnnotations(inputValidations []v1beta1.AuditAnnotation) []cel.ExpressionAccessor {
celExpressionAccessor := make([]cel.ExpressionAccessor, len(inputValidations))
for i, validation := range inputValidations {
validation := AuditAnnotationCondition{
Key: validation.Key,
ValueExpression: validation.ValueExpression,
}
celExpressionAccessor[i] = &validation
}
return celExpressionAccessor
}
func convertv1beta1Variables(variables []v1beta1.Variable) []cel.NamedExpressionAccessor {
namedExpressions := make([]cel.NamedExpressionAccessor, len(variables))
for i, variable := range variables {
namedExpressions[i] = &Variable{Name: variable.Name, Expression: variable.Expression}
}
return namedExpressions
}
func getNamespaceName(namespace, name string) namespacedName {
return namespacedName{
namespace: namespace,
name: name,
}
}

View File

@@ -29,7 +29,7 @@ import (
"k8s.io/apiserver/pkg/admission/initializer"
admissionmetrics "k8s.io/apiserver/pkg/admission/metrics"
"k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
validatingadmissionpolicy "k8s.io/apiserver/pkg/admission/plugin/policy/validating"
mutatingwebhook "k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
validatingwebhook "k8s.io/apiserver/pkg/admission/plugin/webhook/validating"
apiserverapi "k8s.io/apiserver/pkg/apis/apiserver"

View File

@@ -20,7 +20,7 @@ package server
import (
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
"k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy"
validatingadmissionpolicy "k8s.io/apiserver/pkg/admission/plugin/policy/validating"
mutatingwebhook "k8s.io/apiserver/pkg/admission/plugin/webhook/mutating"
validatingwebhook "k8s.io/apiserver/pkg/admission/plugin/webhook/validating"
)

9
vendor/modules.txt vendored
View File

@@ -1360,12 +1360,16 @@ k8s.io/apimachinery/third_party/forked/golang/reflect
# k8s.io/apiserver v0.0.0 => ./staging/src/k8s.io/apiserver
## explicit; go 1.21
k8s.io/apiserver/pkg/admission
k8s.io/apiserver/pkg/admission/cel
k8s.io/apiserver/pkg/admission/configuration
k8s.io/apiserver/pkg/admission/initializer
k8s.io/apiserver/pkg/admission/metrics
k8s.io/apiserver/pkg/admission/plugin/cel
k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle
k8s.io/apiserver/pkg/admission/plugin/policy/generic
k8s.io/apiserver/pkg/admission/plugin/policy/internal/generic
k8s.io/apiserver/pkg/admission/plugin/policy/matching
k8s.io/apiserver/pkg/admission/plugin/policy/validating
k8s.io/apiserver/pkg/admission/plugin/policy/validating/metrics
k8s.io/apiserver/pkg/admission/plugin/resourcequota
k8s.io/apiserver/pkg/admission/plugin/resourcequota/apis/resourcequota
k8s.io/apiserver/pkg/admission/plugin/resourcequota/apis/resourcequota/install
@@ -1373,9 +1377,6 @@ k8s.io/apiserver/pkg/admission/plugin/resourcequota/apis/resourcequota/v1
k8s.io/apiserver/pkg/admission/plugin/resourcequota/apis/resourcequota/v1alpha1
k8s.io/apiserver/pkg/admission/plugin/resourcequota/apis/resourcequota/v1beta1
k8s.io/apiserver/pkg/admission/plugin/resourcequota/apis/resourcequota/validation
k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy
k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/internal/generic
k8s.io/apiserver/pkg/admission/plugin/validatingadmissionpolicy/matching
k8s.io/apiserver/pkg/admission/plugin/webhook
k8s.io/apiserver/pkg/admission/plugin/webhook/config
k8s.io/apiserver/pkg/admission/plugin/webhook/config/apis/webhookadmission