Extend Filter interface with Trigger() and use it for pods and nodes

This commit is contained in:
Wojciech Tyczynski 2016-06-03 15:23:26 +02:00
parent 7f7ef0879f
commit 1d9bc58328
46 changed files with 511 additions and 77 deletions

View File

@ -38,7 +38,7 @@ func NewREST(s storage.Interface, storageDecorator generic.StorageDecorator) *RE
// Usually you should reuse your RESTCreateStrategy.
strategy := &NotNamespaceScoped{}
storageInterface := storageDecorator(
s, 100, &testgroup.TestType{}, prefix, strategy, newListFunc)
s, 100, &testgroup.TestType{}, prefix, strategy, newListFunc, storage.NoTriggerPublisher)
store := &registry.Store{
NewFunc: func() runtime.Object { return &testgroup.TestType{} },
// NewListFunc returns an object capable of storing results of an etcd list.

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
type REST struct {
@ -49,7 +50,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &federation.ClusterList{} }
storageInterface := opts.Decorator(
opts.Storage, 100, &federation.Cluster{}, prefix, cluster.Strategy, newListFunc)
opts.Storage,
100,
&federation.Cluster{},
prefix,
cluster.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &federation.Cluster{} },

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// REST implements a RESTStorage for CertificateSigningRequest against etcd
@ -39,7 +40,15 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *ApprovalREST) {
prefix := "/certificatesigningrequests"
newListFunc := func() runtime.Object { return &certificates.CertificateSigningRequestList{} }
storageInterface := opts.Decorator(opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.CertificateSigningRequests), &certificates.CertificateSigningRequest{}, prefix, csrregistry.Strategy, newListFunc)
storageInterface := opts.Decorator(
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.CertificateSigningRequests),
&certificates.CertificateSigningRequest{},
prefix,
csrregistry.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &certificates.CertificateSigningRequest{} },

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// REST implements a RESTStorage for ClusterRole against etcd
@ -43,6 +44,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
clusterrole.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// REST implements a RESTStorage for ClusterRoleBinding against etcd
@ -43,6 +44,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
clusterrolebinding.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{

View File

@ -20,9 +20,9 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/registry/configmap"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// REST implements a RESTStorage for ConfigMap against etcd
@ -36,7 +36,7 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &api.ConfigMapList{} }
storageInterface := opts.Decorator(
opts.Storage, 100, &api.ConfigMap{}, prefix, configmap.Strategy, newListFunc)
opts.Storage, 100, &api.ConfigMap{}, prefix, configmap.Strategy, newListFunc, storage.NoTriggerPublisher)
store := &registry.Store{
NewFunc: func() runtime.Object {

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// ControllerStorage includes dummy storage for Replication Controllers and for Scale subresource.
@ -62,7 +63,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &api.ReplicationControllerList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Controllers), &api.ReplicationController{}, prefix, controller.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.Controllers),
&api.ReplicationController{},
prefix,
controller.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &api.ReplicationController{} },

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// rest implements a RESTStorage for DaemonSets against etcd
@ -38,7 +39,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &extensions.DaemonSetList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Daemonsets), &extensions.DaemonSet{}, prefix, daemonset.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.Daemonsets),
&extensions.DaemonSet{},
prefix,
daemonset.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &extensions.DaemonSet{} },

View File

@ -63,7 +63,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *RollbackREST) {
newListFunc := func() runtime.Object { return &extensions.DeploymentList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Deployments), &extensions.Deployment{}, prefix, deployment.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.Deployments),
&extensions.Deployment{},
prefix,
deployment.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &extensions.Deployment{} },

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
type REST struct {
@ -35,7 +36,14 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &api.EndpointsList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Endpoints), &api.Endpoints{}, prefix, endpoint.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.Endpoints),
&api.Endpoints{},
prefix,
endpoint.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &api.Endpoints{} },

View File

@ -21,6 +21,7 @@ import (
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// AttrFunc returns label and field sets for List or Watch to compare against, or an error.
@ -50,9 +51,10 @@ func MergeFieldsSets(source fields.Set, fragment fields.Set) fields.Set {
// SelectionPredicate implements a generic predicate that can be passed to
// GenericRegistry's List or Watch methods. Implements the Matcher interface.
type SelectionPredicate struct {
Label labels.Selector
Field fields.Selector
GetAttrs AttrFunc
Label labels.Selector
Field fields.Selector
GetAttrs AttrFunc
IndexFields []string
}
// Matches returns true if the given object's labels and fields (as
@ -79,6 +81,20 @@ func (s *SelectionPredicate) MatchesSingle() (string, bool) {
return "", false
}
// For any index defined by IndexFields, if a matcher can match only (a subset)
// of objects that return <value> for a given index, a pair (<index name>, <value>)
// wil be returned.
// TODO: Consider supporting also labels.
func (s *SelectionPredicate) MatcherIndex() []storage.MatchValue {
var result []storage.MatchValue
for _, field := range s.IndexFields {
if value, ok := s.Field.RequiresExactMatch(field); ok {
result = append(result, storage.MatchValue{IndexName: field, Value: value})
}
}
return result
}
// Matcher can return true if an object matches the Matcher's selection
// criteria. If it is known that the matcher will match only a single object
// then MatchesSingle should return the key of that object and true. This is an
@ -93,9 +109,10 @@ type Matcher interface {
// include the object's namespace.
MatchesSingle() (key string, matchesSingleObject bool)
// TODO: when we start indexing objects, add something like the below:
// MatchesIndices() (indexName []string, indexValue []string)
// where indexName/indexValue are the same length.
// For any known index, if a matcher can match only (a subset) of objects
// that return <value> for a given index, a pair (<index name>, <value>)
// will be returned.
MatcherIndex() []storage.MatchValue
}
// MatcherFunc makes a matcher from the provided function. For easy definition
@ -117,6 +134,11 @@ func (m matcherFunc) MatchesSingle() (string, bool) {
return "", false
}
// MatcherIndex always returns empty list.
func (m matcherFunc) MatcherIndex() []storage.MatchValue {
return nil
}
// MatchOnKey returns a matcher that will send only the object matching key
// through the matching function f. For testing!
// Note: use SelectionPredicate above for real code!

View File

@ -30,15 +30,17 @@ func StorageWithCacher(
objectType runtime.Object,
resourcePrefix string,
scopeStrategy rest.NamespaceScopedStrategy,
newListFunc func() runtime.Object) storage.Interface {
newListFunc func() runtime.Object,
triggerFunc storage.TriggerPublisherFunc) storage.Interface {
config := storage.CacherConfig{
CacheCapacity: capacity,
Storage: storageInterface,
Versioner: etcdstorage.APIObjectVersioner{},
Type: objectType,
ResourcePrefix: resourcePrefix,
NewListFunc: newListFunc,
CacheCapacity: capacity,
Storage: storageInterface,
Versioner: etcdstorage.APIObjectVersioner{},
Type: objectType,
ResourcePrefix: resourcePrefix,
NewListFunc: newListFunc,
TriggerPublisherFunc: triggerFunc,
}
if scopeStrategy.NamespaceScoped() {
config.KeyFunc = func(obj runtime.Object) (string, error) {

View File

@ -828,7 +828,7 @@ func (e *Store) createFilter(m generic.Matcher) storage.Filter {
}
return matches
}
return storage.NewSimpleFilter(filterFunc)
return storage.NewSimpleFilter(filterFunc, m.MatcherIndex)
}
// calculateTTL is a helper for retrieving the updated TTL for an object or returning an error

View File

@ -105,6 +105,10 @@ func (sm setMatcher) MatchesSingle() (string, bool) {
return "", false
}
func (sm setMatcher) MatcherIndex() []storage.MatchValue {
return nil
}
// everythingMatcher matches everything
type everythingMatcher struct{}
@ -116,6 +120,10 @@ func (everythingMatcher) MatchesSingle() (string, bool) {
return "", false
}
func (everythingMatcher) MatcherIndex() []storage.MatchValue {
return nil
}
func TestStoreList(t *testing.T) {
podA := &api.Pod{
ObjectMeta: api.ObjectMeta{Namespace: "test", Name: "bar"},

View File

@ -30,7 +30,8 @@ type StorageDecorator func(
objectType runtime.Object,
resourcePrefix string,
scopeStrategy rest.NamespaceScopedStrategy,
newListFunc func() runtime.Object) storage.Interface
newListFunc func() runtime.Object,
trigger storage.TriggerPublisherFunc) storage.Interface
// Returns given 'storageInterface' without any decoration.
func UndecoratedStorage(
@ -39,6 +40,7 @@ func UndecoratedStorage(
objectType runtime.Object,
resourcePrefix string,
scopeStrategy rest.NamespaceScopedStrategy,
newListFunc func() runtime.Object) storage.Interface {
newListFunc func() runtime.Object,
trigger storage.TriggerPublisherFunc) storage.Interface {
return storageInterface
}

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/horizontalpodautoscaler"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
type REST struct {
@ -37,7 +38,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &autoscaling.HorizontalPodAutoscalerList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.HorizontalPodAutoscalers), &autoscaling.HorizontalPodAutoscaler{}, prefix, horizontalpodautoscaler.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.HorizontalPodAutoscalers),
&autoscaling.HorizontalPodAutoscaler{},
prefix,
horizontalpodautoscaler.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &autoscaling.HorizontalPodAutoscaler{} },

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
ingress "k8s.io/kubernetes/pkg/registry/ingress"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// rest implements a RESTStorage for replication controllers against etcd
@ -38,7 +39,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &extensions.IngressList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Ingress), &extensions.Ingress{}, prefix, ingress.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.Ingress),
&extensions.Ingress{},
prefix,
ingress.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &extensions.Ingress{} },

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/job"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// REST implements a RESTStorage for jobs against etcd
@ -38,7 +39,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &batch.JobList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Jobs), &batch.Job{}, prefix, job.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.Jobs),
&batch.Job{},
prefix,
job.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &batch.Job{} },

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/limitrange"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
type REST struct {
@ -35,7 +36,14 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &api.LimitRangeList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.LimitRanges), &api.LimitRange{}, prefix, limitrange.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.LimitRanges),
&api.LimitRange{},
prefix,
limitrange.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &api.LimitRange{} },

View File

@ -54,7 +54,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST, *FinalizeREST) {
newListFunc := func() runtime.Object { return &api.NamespaceList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Namespaces), &api.Namespace{}, prefix, namespace.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.Namespaces),
&api.Namespace{},
prefix,
namespace.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &api.Namespace{} },

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/networkpolicy"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// rest implements a RESTStorage for network policies against etcd
@ -37,7 +38,14 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &extensionsapi.NetworkPolicyList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.NetworkPolicys), &extensionsapi.NetworkPolicy{}, prefix, networkpolicy.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.NetworkPolicys),
&extensionsapi.NetworkPolicy{},
prefix,
networkpolicy.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &extensionsapi.NetworkPolicy{} },

View File

@ -70,7 +70,13 @@ func NewStorage(opts generic.RESTOptions, connection client.ConnectionInfoGetter
newListFunc := func() runtime.Object { return &api.NodeList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Nodes), &api.Node{}, prefix, node.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.Nodes),
&api.Node{},
prefix,
node.Strategy,
newListFunc,
node.NodeNameTriggerFunc)
store := &registry.Store{
NewFunc: func() runtime.Object { return &api.Node{} },

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"
pkgstorage "k8s.io/kubernetes/pkg/storage"
utilnet "k8s.io/kubernetes/pkg/util/net"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/validation/field"
@ -157,9 +158,16 @@ func MatchNode(label labels.Selector, field fields.Selector) generic.Matcher {
}
return labels.Set(nodeObj.ObjectMeta.Labels), NodeToSelectableFields(nodeObj), nil
},
IndexFields: []string{"metadata.name"},
}
}
func NodeNameTriggerFunc(obj runtime.Object) []pkgstorage.MatchValue {
node := obj.(*api.Node)
result := pkgstorage.MatchValue{IndexName: "metadata.name", Value: node.ObjectMeta.Name}
return []pkgstorage.MatchValue{result}
}
// ResourceLocation returns an URL and transport which one can use to send traffic for the specified node.
func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGetter, proxyTransport http.RoundTripper, ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
schemeReq, name, portReq, valid := utilnet.SplitSchemeNamePort(id)

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/persistentvolume"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
type REST struct {
@ -36,7 +37,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &api.PersistentVolumeList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.PersistentVolumes), &api.PersistentVolume{}, prefix, persistentvolume.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.PersistentVolumes),
&api.PersistentVolume{},
prefix,
persistentvolume.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &api.PersistentVolume{} },

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/persistentvolumeclaim"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
type REST struct {
@ -36,7 +37,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &api.PersistentVolumeClaimList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.PersistentVolumeClaims), &api.PersistentVolumeClaim{}, prefix, persistentvolumeclaim.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.PersistentVolumeClaims),
&api.PersistentVolumeClaim{},
prefix,
persistentvolumeclaim.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &api.PersistentVolumeClaim{} },

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/petset"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// rest implements a RESTStorage for replication controllers against etcd
@ -38,7 +39,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &appsapi.PetSetList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.PetSet), &appsapi.PetSet{}, prefix, petset.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.PetSet),
&appsapi.PetSet{},
prefix,
petset.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &appsapi.PetSet{} },

View File

@ -61,7 +61,14 @@ func NewStorage(opts generic.RESTOptions, k client.ConnectionInfoGetter, proxyTr
newListFunc := func() runtime.Object { return &api.PodList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Pods), &api.Pod{}, prefix, pod.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.Pods),
&api.Pod{},
prefix,
pod.Strategy,
newListFunc,
pod.NodeNameTriggerFunc,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &api.Pod{} },

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/validation/field"
)
@ -177,9 +178,16 @@ func MatchPod(label labels.Selector, field fields.Selector) generic.Matcher {
}
return podLabels, podFields, nil
},
IndexFields: []string{"spec.nodeName"},
}
}
func NodeNameTriggerFunc(obj runtime.Object) []storage.MatchValue {
pod := obj.(*api.Pod)
result := storage.MatchValue{IndexName: "spec.nodeName", Value: pod.Spec.NodeName}
return []storage.MatchValue{result}
}
// PodToSelectableFields returns a field set that represents the object
// TODO: fields are not labels, and the validation rules for them do not apply.
func PodToSelectableFields(pod *api.Pod) fields.Set {

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/poddisruptionbudget"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// rest implements a RESTStorage for pod disruption budgets against etcd
@ -38,7 +39,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &policyapi.PodDisruptionBudgetList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.PodDisruptionBudget), &policyapi.PodDisruptionBudget{}, prefix, poddisruptionbudget.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.PodDisruptionBudget),
&policyapi.PodDisruptionBudget{},
prefix,
poddisruptionbudget.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &policyapi.PodDisruptionBudget{} },

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/podsecuritypolicy"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// REST implements a RESTStorage for PodSecurityPolicies against etcd.
@ -36,7 +37,14 @@ const Prefix = "/podsecuritypolicies"
func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &extensions.PodSecurityPolicyList{} }
storageInterface := opts.Decorator(
opts.Storage, 100, &extensions.PodSecurityPolicy{}, Prefix, podsecuritypolicy.Strategy, newListFunc)
opts.Storage,
100,
&extensions.PodSecurityPolicy{},
Prefix,
podsecuritypolicy.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &extensions.PodSecurityPolicy{} },

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/podtemplate"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
type REST struct {
@ -35,7 +36,14 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &api.PodTemplateList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.PodTemplates), &api.PodTemplate{}, prefix, podtemplate.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.PodTemplates),
&api.PodTemplate{},
prefix,
podtemplate.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &api.PodTemplate{} },

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/replicaset"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// ReplicaSetStorage includes dummy storage for ReplicaSets and for Scale subresource.
@ -61,7 +62,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &extensions.ReplicaSetList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Replicasets), &extensions.ReplicaSet{}, prefix, replicaset.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.Replicasets),
&extensions.ReplicaSet{},
prefix,
replicaset.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &extensions.ReplicaSet{} },

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/resourcequota"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
type REST struct {
@ -36,7 +37,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &api.ResourceQuotaList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.ResourceQuotas), &api.ResourceQuota{}, prefix, resourcequota.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.ResourceQuotas),
&api.ResourceQuota{},
prefix,
resourcequota.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &api.ResourceQuota{} },

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/role"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// REST implements a RESTStorage for Role against etcd
@ -43,6 +44,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
role.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/rolebinding"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
// REST implements a RESTStorage for RoleBinding against etcd
@ -43,6 +44,7 @@ func NewREST(opts generic.RESTOptions) *REST {
prefix,
rolebinding.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/secret"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
type REST struct {
@ -35,7 +36,14 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &api.SecretList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Secrets), &api.Secret{}, prefix, secret.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.Secrets),
&api.Secret{},
prefix,
secret.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &api.Secret{} },

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/service"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
type REST struct {
@ -36,7 +37,14 @@ func NewREST(opts generic.RESTOptions) (*REST, *StatusREST) {
newListFunc := func() runtime.Object { return &api.ServiceList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.Services), &api.Service{}, prefix, service.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.Services),
&api.Service{},
prefix,
service.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &api.Service{} },

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/registry/serviceaccount"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
)
type REST struct {
@ -35,7 +36,14 @@ func NewREST(opts generic.RESTOptions) *REST {
newListFunc := func() runtime.Object { return &api.ServiceAccountList{} }
storageInterface := opts.Decorator(
opts.Storage, cachesize.GetWatchCacheSizeByResource(cachesize.ServiceAccounts), &api.ServiceAccount{}, prefix, serviceaccount.Strategy, newListFunc)
opts.Storage,
cachesize.GetWatchCacheSizeByResource(cachesize.ServiceAccounts),
&api.ServiceAccount{},
prefix,
serviceaccount.Strategy,
newListFunc,
storage.NoTriggerPublisher,
)
store := &registry.Store{
NewFunc: func() runtime.Object { return &api.ServiceAccount{} },

View File

@ -59,11 +59,67 @@ type CacherConfig struct {
// KeyFunc is used to get a key in the underyling storage for a given object.
KeyFunc func(runtime.Object) (string, error)
// TriggerPublisherFunc is used for optimizing amount of watchers that
// needs to process an incoming event.
TriggerPublisherFunc TriggerPublisherFunc
// NewList is a function that creates new empty object storing a list of
// objects of type Type.
NewListFunc func() runtime.Object
}
type watchersMap map[int]*cacheWatcher
func (wm watchersMap) addWatcher(w *cacheWatcher, number int) {
wm[number] = w
}
func (wm watchersMap) deleteWatcher(number int) {
delete(wm, number)
}
func (wm watchersMap) terminateAll() {
for key, watcher := range wm {
delete(wm, key)
watcher.stop()
}
}
type indexedWatchers struct {
allWatchers watchersMap
valueWatchers map[string]watchersMap
}
func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, supported bool) {
if supported {
if _, ok := i.valueWatchers[value]; !ok {
i.valueWatchers[value] = watchersMap{}
}
i.valueWatchers[value].addWatcher(w, number)
} else {
i.allWatchers.addWatcher(w, number)
}
}
func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool) {
if supported {
i.valueWatchers[value].deleteWatcher(number)
if len(i.valueWatchers[value]) == 0 {
delete(i.valueWatchers, value)
}
} else {
i.allWatchers.deleteWatcher(number)
}
}
func (i *indexedWatchers) terminateAll() {
i.allWatchers.terminateAll()
for index, watchers := range i.valueWatchers {
watchers.terminateAll()
delete(i.valueWatchers, index)
}
}
// Cacher is responsible for serving WATCH and LIST requests for a given
// resource from its internal cache and updating its cache in the background
// based on the underlying storage contents.
@ -87,16 +143,20 @@ type Cacher struct {
watchCache *watchCache
reflector *cache.Reflector
// Registered watchers.
watcherIdx int
watchers map[int]*cacheWatcher
// Versioner is used to handle resource versions.
versioner Versioner
// keyFunc is used to get a key in the underyling storage for a given object.
keyFunc func(runtime.Object) (string, error)
// triggerFunc is used for optimizing amount of watchers that needs to process
// an incoming event.
triggerFunc TriggerPublisherFunc
// watchers is mapping from the value of trigger function that a
// watcher is interested into the watchers
watcherIdx int
watchers indexedWatchers
// Handling graceful termination.
stopLock sync.RWMutex
stopped bool
@ -120,13 +180,18 @@ func NewCacherFromConfig(config CacherConfig) *Cacher {
}
cacher := &Cacher{
ready: newReady(),
storage: config.Storage,
watchCache: watchCache,
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
watchers: make(map[int]*cacheWatcher),
versioner: config.Versioner,
keyFunc: config.KeyFunc,
ready: newReady(),
storage: config.Storage,
watchCache: watchCache,
reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
versioner: config.Versioner,
keyFunc: config.KeyFunc,
triggerFunc: config.TriggerPublisherFunc,
watcherIdx: 0,
watchers: indexedWatchers{
allWatchers: make(map[int]*cacheWatcher),
valueWatchers: make(map[string]watchersMap),
},
// We need to (potentially) stop both:
// - wait.Until go-routine
// - reflector.ListAndWatch
@ -223,10 +288,20 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
return newErrWatcher(err), nil
}
triggerValue, triggerSupported := "", false
// TODO: Currently we assume that in a given Cacher object, any <filter> that is
// passed here is aware of exactly the same trigger (at most one).
// Thus, either 0 or 1 values will be returned.
if matchValues := filter.Trigger(); len(matchValues) > 0 {
triggerValue, triggerSupported = matchValues[0].Value, true
}
c.Lock()
defer c.Unlock()
watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forgetWatcher(c, c.watcherIdx))
c.watchers[c.watcherIdx] = watcher
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forget)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watcherIdx++
return watcher, nil
}
@ -307,21 +382,68 @@ func (c *Cacher) Codec() runtime.Codec {
return c.storage.Codec()
}
func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
// TODO: Currently we assume that in a given Cacher object, its <c.triggerFunc>
// is aware of exactly the same trigger (at most one). Thus calling:
// c.triggerFunc(<some object>)
// can return only 0 or 1 values.
// That means, that triggerValues itself may return up to 2 different values.
if c.triggerFunc == nil {
return nil, false
}
result := make([]string, 0, 2)
matchValues := c.triggerFunc(event.Object)
if len(matchValues) > 0 {
result = append(result, matchValues[0].Value)
}
if event.PrevObject == nil {
return result, len(result) > 0
}
prevMatchValues := c.triggerFunc(event.PrevObject)
if len(prevMatchValues) > 0 {
if len(result) == 0 || result[0] != prevMatchValues[0].Value {
result = append(result, prevMatchValues[0].Value)
}
}
return result, len(result) > 0
}
func (c *Cacher) processEvent(event watchCacheEvent) {
triggerValues, supported := c.triggerValues(&event)
c.Lock()
defer c.Unlock()
for _, watcher := range c.watchers {
// Iterate over "allWatchers" no matter what the trigger function is.
for _, watcher := range c.watchers.allWatchers {
watcher.add(event)
}
if supported {
// Iterate over watchers interested in the given values of the trigger.
for _, triggerValue := range triggerValues {
for _, watcher := range c.watchers.valueWatchers[triggerValue] {
watcher.add(event)
}
}
} else {
// supported equal to false generally means that trigger function
// is not defined (or not aware of any indexes). In this case,
// watchers filters should generally also don't generate any
// trigger values, but can cause problems in case of some
// misconfiguration. Thus we paranoidly leave this branch.
// Iterate over watchers interested in exact values for all values.
for _, watchers := range c.watchers.valueWatchers {
for _, watcher := range watchers {
watcher.add(event)
}
}
}
}
func (c *Cacher) terminateAllWatchers() {
c.Lock()
defer c.Unlock()
for key, watcher := range c.watchers {
delete(c.watchers, key)
watcher.stop()
}
c.watchers.terminateAll()
}
func (c *Cacher) isStopped() bool {
@ -338,15 +460,15 @@ func (c *Cacher) Stop() {
c.stopWg.Wait()
}
func forgetWatcher(c *Cacher, index int) func(bool) {
func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func(bool) {
return func(lock bool) {
if lock {
c.Lock()
defer c.Unlock()
}
// It's possible that the watcher is already not in the map (e.g. in case of
// It's possible that the watcher is already not in the structure (e.g. in case of
// simulaneous Stop() and terminateAllWatchers(), but it doesn't break anything.
delete(c.watchers, index)
c.watchers.deleteWatcher(index, triggerValue, triggerSupported)
}
}
@ -362,7 +484,7 @@ func filterFunction(key string, keyFunc func(runtime.Object) (string, error), fi
}
return filter.Filter(obj)
}
return NewSimpleFilter(filterFunc)
return NewSimpleFilter(filterFunc, filter.Trigger)
}
// Returns resource version to which the underlying cache is synced.

View File

@ -340,7 +340,7 @@ func TestFiltering(t *testing.T) {
}
return selector.Matches(labels.Set(metadata.GetLabels()))
}
filter := storage.NewSimpleFilter(filterFunc)
filter := storage.NewSimpleFilter(filterFunc, storage.NoTriggerFunc)
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, filter)
if err != nil {
t.Fatalf("Unexpected error: %v", err)

View File

@ -158,7 +158,7 @@ func TestListFiltered(t *testing.T) {
pod := obj.(*api.Pod)
return pod.Name == "bar"
}
filter := storage.NewSimpleFilter(filterFunc)
filter := storage.NewSimpleFilter(filterFunc, storage.NoTriggerFunc)
var got api.PodList
err := helper.List(context.TODO(), key, "", filter, &got)

View File

@ -56,6 +56,10 @@ func (f *firstLetterIsB) Filter(obj runtime.Object) bool {
return obj.(*api.Pod).Name[0] == 'b'
}
func (f *firstLetterIsB) Trigger() []storage.MatchValue {
return nil
}
func TestWatchInterpretations(t *testing.T) {
codec := testapi.Default.Codec()
// Declare some pods to make the test cases compact.
@ -230,7 +234,7 @@ func TestSendResultDeleteEventHaveLatestIndex(t *testing.T) {
filterFunc := func(obj runtime.Object) bool {
return obj.(*api.Pod).Name != "bar"
}
filter := storage.NewSimpleFilter(filterFunc)
filter := storage.NewSimpleFilter(filterFunc, storage.NoTriggerFunc)
w := newEtcdWatcher(false, false, nil, filter, codec, versioner, nil, &fakeEtcdCache{})
eventChan := make(chan watch.Event, 1)

View File

@ -227,14 +227,17 @@ func TestGetToList(t *testing.T) {
tests := []struct {
key string
filter func(runtime.Object) bool
trigger func() []storage.MatchValue
expectedOut []*api.Pod
}{{ // test GetToList on existing key
key: key,
filter: storage.EverythingFunc,
trigger: storage.NoTriggerFunc,
expectedOut: []*api.Pod{storedObj},
}, { // test GetToList on non-existing key
key: "/non-existing",
filter: storage.EverythingFunc,
trigger: storage.NoTriggerFunc,
expectedOut: nil,
}, { // test GetToList with filter to reject the pod
key: "/non-existing",
@ -245,12 +248,13 @@ func TestGetToList(t *testing.T) {
}
return pod.Name != storedObj.Name
},
trigger: storage.NoTriggerFunc,
expectedOut: nil,
}}
for i, tt := range tests {
out := &api.PodList{}
filter := storage.NewSimpleFilter(tt.filter)
filter := storage.NewSimpleFilter(tt.filter, tt.trigger)
err := store.GetToList(ctx, tt.key, filter, out)
if err != nil {
t.Fatalf("GetToList failed: %v", err)
@ -489,14 +493,17 @@ func TestList(t *testing.T) {
tests := []struct {
prefix string
filter func(runtime.Object) bool
trigger func() []storage.MatchValue
expectedOut []*api.Pod
}{{ // test List on existing key
prefix: "/one-level/",
filter: storage.EverythingFunc,
trigger: storage.NoTriggerFunc,
expectedOut: []*api.Pod{preset[0].storedObj},
}, { // test List on non-existing key
prefix: "/non-existing/",
filter: storage.EverythingFunc,
trigger: storage.NoTriggerFunc,
expectedOut: nil,
}, { // test List with filter
prefix: "/one-level/",
@ -507,16 +514,18 @@ func TestList(t *testing.T) {
}
return pod.Name != preset[0].storedObj.Name
},
trigger: storage.NoTriggerFunc,
expectedOut: nil,
}, { // test List with multiple levels of directories and expect flattened result
prefix: "/two-level/",
filter: storage.EverythingFunc,
trigger: storage.NoTriggerFunc,
expectedOut: []*api.Pod{preset[1].storedObj, preset[2].storedObj},
}}
for i, tt := range tests {
out := &api.PodList{}
filter := storage.NewSimpleFilter(tt.filter)
filter := storage.NewSimpleFilter(tt.filter, tt.trigger)
err := store.List(ctx, tt.prefix, "0", filter, out)
if err != nil {
t.Fatalf("List failed: %v", err)

View File

@ -58,15 +58,18 @@ func testWatch(t *testing.T, recursive bool) {
tests := []struct {
key string
filter func(runtime.Object) bool
trigger func() []storage.MatchValue
watchTests []*testWatchStruct
}{{ // create a key
key: "/somekey-1",
watchTests: []*testWatchStruct{{podFoo, true, watch.Added}},
filter: storage.EverythingFunc,
trigger: storage.NoTriggerFunc,
}, { // create a key but obj gets filtered
key: "/somekey-2",
watchTests: []*testWatchStruct{{podFoo, false, ""}},
filter: func(runtime.Object) bool { return false },
trigger: storage.NoTriggerFunc,
}, { // create a key but obj gets filtered. Then update it with unfiltered obj
key: "/somekey-3",
watchTests: []*testWatchStruct{{podFoo, false, ""}, {podBar, true, watch.Added}},
@ -74,10 +77,12 @@ func testWatch(t *testing.T, recursive bool) {
pod := obj.(*api.Pod)
return pod.Name == "bar"
},
trigger: storage.NoTriggerFunc,
}, { // update
key: "/somekey-4",
watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Modified}},
filter: storage.EverythingFunc,
trigger: storage.NoTriggerFunc,
}, { // delete because of being filtered
key: "/somekey-5",
watchTests: []*testWatchStruct{{podFoo, true, watch.Added}, {podBar, true, watch.Deleted}},
@ -85,9 +90,10 @@ func testWatch(t *testing.T, recursive bool) {
pod := obj.(*api.Pod)
return pod.Name != "bar"
},
trigger: storage.NoTriggerFunc,
}}
for i, tt := range tests {
filter := storage.NewSimpleFilter(tt.filter)
filter := storage.NewSimpleFilter(tt.filter, tt.trigger)
w, err := store.watch(ctx, tt.key, "0", filter, recursive)
if err != nil {
t.Fatalf("Watch failed: %v", err)

View File

@ -51,11 +51,30 @@ type ResponseMeta struct {
ResourceVersion uint64
}
// MatchValue defines a pair (<index name>, <value for that index>).
type MatchValue struct {
IndexName string
Value string
}
// TriggerPublisherFunc is a function that takes an object, and returns a list of pairs
// (<index name>, <index value for the given object>) for all indexes known
// to that function.
type TriggerPublisherFunc func(obj runtime.Object) []MatchValue
// Filter is interface that is used to pass filtering mechanism.
type Filter interface {
// Filter is a predicate which takes an API object and returns true
// if and only if the object should remain in the set.
Filter(obj runtime.Object) bool
// For any triggers known to the Filter, if Filter() can return only
// (a subset of) objects for which indexing function returns <value>,
// (<index name>, <value> pair would be returned.
//
// This is optimization to avoid computing Filter() function (which are
// usually relatively expensive) in case we are sure they will return
// false anyway.
Trigger() []MatchValue
}
// Everything is a Filter which accepts all objects.
@ -65,10 +84,14 @@ var Everything Filter = everything{}
type everything struct {
}
func (e everything) Filter(_ runtime.Object) bool {
func (e everything) Filter(runtime.Object) bool {
return true
}
func (e everything) Trigger() []MatchValue {
return nil
}
// Pass an UpdateFunc to Interface.GuaranteedUpdate to make an update
// that is guaranteed to succeed.
// See the comment for GuaranteedUpdate for more details.

View File

@ -38,16 +38,24 @@ func SimpleUpdate(fn SimpleUpdateFunc) UpdateFunc {
// SimpleFilter implements Filter interface.
type SimpleFilter struct {
filterFunc func(runtime.Object) bool
filterFunc func(runtime.Object) bool
triggerFunc func() []MatchValue
}
func (s *SimpleFilter) Filter(obj runtime.Object) bool {
return s.filterFunc(obj)
}
func NewSimpleFilter(filterFunc func(runtime.Object) bool) Filter {
func (s *SimpleFilter) Trigger() []MatchValue {
return s.triggerFunc()
}
func NewSimpleFilter(
filterFunc func(runtime.Object) bool,
triggerFunc func() []MatchValue) Filter {
return &SimpleFilter{
filterFunc: filterFunc,
filterFunc: filterFunc,
triggerFunc: triggerFunc,
}
}
@ -55,6 +63,14 @@ func EverythingFunc(runtime.Object) bool {
return true
}
func NoTriggerFunc() []MatchValue {
return nil
}
func NoTriggerPublisher(runtime.Object) []MatchValue {
return nil
}
// ParseWatchResourceVersion takes a resource version argument and converts it to
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
// an opaque value, the default watch behavior for non-zero watch is to watch