Add ResourceVersionMatch parameter to make Resource Version semantics consistent for list
This commit is contained in:
parent
cb37c08846
commit
e214f2408b
@ -64,8 +64,9 @@ var _ Leases = &storageLeases{}
|
||||
func (s *storageLeases) ListLeases() ([]string, error) {
|
||||
ipInfoList := &corev1.EndpointsList{}
|
||||
storageOpts := storage.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
Predicate: storage.Everything,
|
||||
ResourceVersion: "0",
|
||||
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
Predicate: storage.Everything,
|
||||
}
|
||||
if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, storageOpts, ipInfoList); err != nil {
|
||||
return nil, err
|
||||
|
BIN
staging/src/k8s.io/api/testdata/v1.17.0/core.v1.ListOptions.after_roundtrip.pb
vendored
Normal file
BIN
staging/src/k8s.io/api/testdata/v1.17.0/core.v1.ListOptions.after_roundtrip.pb
vendored
Normal file
Binary file not shown.
BIN
staging/src/k8s.io/api/testdata/v1.18.0/core.v1.ListOptions.after_roundtrip.pb
vendored
Normal file
BIN
staging/src/k8s.io/api/testdata/v1.18.0/core.v1.ListOptions.after_roundtrip.pb
vendored
Normal file
Binary file not shown.
@ -215,6 +215,10 @@ func v1FuzzerFuncs(codecs runtimeserializer.CodecFactory) []interface{} {
|
||||
j.Finalizers = nil
|
||||
}
|
||||
},
|
||||
func(j *metav1.ResourceVersionMatch, c fuzz.Continue) {
|
||||
matches := []metav1.ResourceVersionMatch{"", metav1.ResourceVersionMatchExact, metav1.ResourceVersionMatchNotOlderThan}
|
||||
*j = matches[c.Rand.Intn(len(matches))]
|
||||
},
|
||||
func(j *metav1.ListMeta, c fuzz.Continue) {
|
||||
j.ResourceVersion = strconv.FormatUint(c.RandUint64(), 10)
|
||||
j.SelfLink = c.RandString()
|
||||
|
@ -44,13 +44,17 @@ type ListOptions struct {
|
||||
// If the feature gate WatchBookmarks is not enabled in apiserver,
|
||||
// this field is ignored.
|
||||
AllowWatchBookmarks bool
|
||||
// When specified with a watch call, shows changes that occur after that particular version of a resource.
|
||||
// Defaults to changes from the beginning of history.
|
||||
// When specified for list:
|
||||
// - if unset, then the result is returned from remote storage based on quorum-read flag;
|
||||
// - if it's 0, then we simply return what we currently have in cache, no guarantee;
|
||||
// - if set to non zero, then the result is at least as fresh as given rv.
|
||||
// resourceVersion sets a constraint on what resource versions a request may be served from.
|
||||
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for
|
||||
// details.
|
||||
ResourceVersion string
|
||||
// resourceVersionMatch determines how resourceVersion is applied to list calls.
|
||||
// It is highly recommended that resourceVersionMatch be set for list calls where
|
||||
// resourceVersion is set.
|
||||
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for
|
||||
// details.
|
||||
ResourceVersionMatch metav1.ResourceVersionMatch
|
||||
|
||||
// Timeout for the list/watch call.
|
||||
TimeoutSeconds *int64
|
||||
// Limit specifies the maximum number of results to return from the server. The server may
|
||||
|
@ -0,0 +1,38 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["validation.go"],
|
||||
importmap = "k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation",
|
||||
importpath = "k8s.io/apimachinery/pkg/apis/meta/internalversion/validation",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["validation_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
@ -0,0 +1,46 @@
|
||||
/*
|
||||
Copyright 2020 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package validation
|
||||
|
||||
import (
|
||||
"k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
)
|
||||
|
||||
// ValidateListOptions returns all validation errors found while validating the ListOptions.
|
||||
func ValidateListOptions(options *internalversion.ListOptions) field.ErrorList {
|
||||
allErrs := field.ErrorList{}
|
||||
if match := options.ResourceVersionMatch; len(match) > 0 {
|
||||
if options.Watch {
|
||||
allErrs = append(allErrs, field.Forbidden(field.NewPath("resourceVersionMatch"), "resourceVersionMatch is forbidden for watch"))
|
||||
}
|
||||
if len(options.ResourceVersion) == 0 {
|
||||
allErrs = append(allErrs, field.Forbidden(field.NewPath("resourceVersionMatch"), "resourceVersionMatch is forbidden unless resourceVersion is provided"))
|
||||
}
|
||||
if len(options.Continue) > 0 {
|
||||
allErrs = append(allErrs, field.Forbidden(field.NewPath("resourceVersionMatch"), "resourceVersionMatch is forbidden when continue is provided"))
|
||||
}
|
||||
if match != metav1.ResourceVersionMatchExact && match != metav1.ResourceVersionMatchNotOlderThan {
|
||||
allErrs = append(allErrs, field.NotSupported(field.NewPath("resourceVersionMatch"), match, []string{string(metav1.ResourceVersionMatchExact), string(metav1.ResourceVersionMatchNotOlderThan), ""}))
|
||||
}
|
||||
if match == metav1.ResourceVersionMatchExact && options.ResourceVersion == "0" {
|
||||
allErrs = append(allErrs, field.Forbidden(field.NewPath("resourceVersionMatch"), "resourceVersionMatch \"exact\" is forbidden for resourceVersion \"0\""))
|
||||
}
|
||||
}
|
||||
return allErrs
|
||||
}
|
@ -0,0 +1,83 @@
|
||||
/*
|
||||
Copyright 2020 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package validation
|
||||
|
||||
import (
|
||||
"k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestValidateListOptions(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
opts internalversion.ListOptions
|
||||
expectError string
|
||||
}{
|
||||
{
|
||||
name: "valid-default",
|
||||
opts: internalversion.ListOptions{},
|
||||
},
|
||||
{
|
||||
name: "valid-resourceversionmatch-exact",
|
||||
opts: internalversion.ListOptions{
|
||||
ResourceVersion: "1",
|
||||
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "invalid-resourceversionmatch-exact",
|
||||
opts: internalversion.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
ResourceVersionMatch: metav1.ResourceVersionMatchExact,
|
||||
},
|
||||
expectError: "resourceVersionMatch: Forbidden: resourceVersionMatch \"exact\" is forbidden for resourceVersion \"0\"",
|
||||
},
|
||||
{
|
||||
name: "valid-resourceversionmatch-notolderthan",
|
||||
opts: internalversion.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "invalid-resourceversionmatch",
|
||||
opts: internalversion.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
ResourceVersionMatch: "foo",
|
||||
},
|
||||
expectError: "resourceVersionMatch: Unsupported value: \"foo\": supported values: \"Exact\", \"NotOlderThan\", \"\"",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
errs := ValidateListOptions(&tc.opts)
|
||||
if tc.expectError != "" {
|
||||
if len(errs) != 1 {
|
||||
t.Errorf("expected an error but got %d errors", len(errs))
|
||||
} else if errs[0].Error() != tc.expectError {
|
||||
t.Errorf("expected error '%s' but got '%s'", tc.expectError, errs[0].Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
if len(errs) != 0 {
|
||||
t.Errorf("expected no errors, but got: %v", errs)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -345,3 +345,11 @@ func Convert_url_Values_To_v1_DeleteOptions(in *url.Values, out *DeleteOptions,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Convert_Slice_string_To_v1_ResourceVersionMatch allows converting a URL query parameter to ResourceVersionMatch
|
||||
func Convert_Slice_string_To_v1_ResourceVersionMatch(in *[]string, out *ResourceVersionMatch, s conversion.Scope) error {
|
||||
if len(*in) > 0 {
|
||||
*out = ResourceVersionMatch((*in)[0])
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -355,14 +355,23 @@ type ListOptions struct {
|
||||
// +optional
|
||||
AllowWatchBookmarks bool `json:"allowWatchBookmarks,omitempty" protobuf:"varint,9,opt,name=allowWatchBookmarks"`
|
||||
|
||||
// When specified with a watch call, shows changes that occur after that particular version of a resource.
|
||||
// Defaults to changes from the beginning of history.
|
||||
// When specified for list:
|
||||
// - if unset, then the result is returned from remote storage based on quorum-read flag;
|
||||
// - if it's 0, then we simply return what we currently have in cache, no guarantee;
|
||||
// - if set to non zero, then the result is at least as fresh as given rv.
|
||||
// resourceVersion sets a constraint on what resource versions a request may be served from.
|
||||
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for
|
||||
// details.
|
||||
//
|
||||
// Defaults to unset
|
||||
// +optional
|
||||
ResourceVersion string `json:"resourceVersion,omitempty" protobuf:"bytes,4,opt,name=resourceVersion"`
|
||||
|
||||
// resourceVersionMatch determines how resourceVersion is applied to list calls.
|
||||
// It is highly recommended that resourceVersionMatch be set for list calls where
|
||||
// resourceVersion is set
|
||||
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for
|
||||
// details.
|
||||
//
|
||||
// Defaults to unset
|
||||
// +optional
|
||||
ResourceVersionMatch ResourceVersionMatch `json:"resourceVersionMatch,omitempty" protobuf:"bytes,10,opt,name=resourceVersionMatch,casttype=ResourceVersionMatch"`
|
||||
// Timeout for the list/watch call.
|
||||
// This limits the duration of the call, regardless of any activity or inactivity.
|
||||
// +optional
|
||||
@ -402,6 +411,25 @@ type ListOptions struct {
|
||||
Continue string `json:"continue,omitempty" protobuf:"bytes,8,opt,name=continue"`
|
||||
}
|
||||
|
||||
// resourceVersionMatch specifies how the resourceVersion parameter is applied. resourceVersionMatch
|
||||
// may only be set if resourceVersion is also set.
|
||||
//
|
||||
// "NotOlderThan" matches data at least as new as the provided resourceVersion.
|
||||
// "Exact" matches data at the exact resourceVersion provided.
|
||||
//
|
||||
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for
|
||||
// details.
|
||||
type ResourceVersionMatch string
|
||||
|
||||
const (
|
||||
// ResourceVersionMatchNotOlderThan matches data at least as new as the provided
|
||||
// resourceVersion.
|
||||
ResourceVersionMatchNotOlderThan ResourceVersionMatch = "NotOlderThan"
|
||||
// ResourceVersionMatchExact matches data at the exact resourceVersion
|
||||
// provided.
|
||||
ResourceVersionMatchExact ResourceVersionMatch = "Exact"
|
||||
)
|
||||
|
||||
// +k8s:conversion-gen:explicit-from=net/url.Values
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
|
||||
@ -423,10 +451,12 @@ type ExportOptions struct {
|
||||
// GetOptions is the standard query options to the standard REST get call.
|
||||
type GetOptions struct {
|
||||
TypeMeta `json:",inline"`
|
||||
// When specified:
|
||||
// - if unset, then the result is returned from remote storage based on quorum-read flag;
|
||||
// - if it's 0, then we simply return what we currently have in cache, no guarantee;
|
||||
// - if set to non zero, then the result is at least as fresh as given rv.
|
||||
// resourceVersion sets a constraint on what resource versions a request may be served from.
|
||||
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for
|
||||
// details.
|
||||
//
|
||||
// Defaults to unset
|
||||
// +optional
|
||||
ResourceVersion string `json:"resourceVersion,omitempty" protobuf:"bytes,1,opt,name=resourceVersion"`
|
||||
// +k8s:deprecated=includeUninitialized,protobuf=2
|
||||
}
|
||||
|
@ -69,6 +69,7 @@ go_library(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/validation:go_default_library",
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||
metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme"
|
||||
metainternalversionvalidation "k8s.io/apimachinery/pkg/apis/meta/internalversion/validation"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/validation"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@ -198,6 +199,12 @@ func DeleteCollection(r rest.CollectionDeleter, checkBody bool, scope *RequestSc
|
||||
return
|
||||
}
|
||||
|
||||
if errs := metainternalversionvalidation.ValidateListOptions(&listOptions); len(errs) > 0 {
|
||||
err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "ListOptions"}, "", errs)
|
||||
scope.err(err, w, req)
|
||||
return
|
||||
}
|
||||
|
||||
// transform fields
|
||||
// TODO: DecodeParametersInto should do this.
|
||||
if listOptions.FieldSelector != nil {
|
||||
|
@ -19,6 +19,8 @@ package handlers
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
metainternalversionvalidation "k8s.io/apimachinery/pkg/apis/meta/internalversion/validation"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@ -198,6 +200,12 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
|
||||
return
|
||||
}
|
||||
|
||||
if errs := metainternalversionvalidation.ValidateListOptions(&opts); len(errs) > 0 {
|
||||
err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "ListOptions"}, "", errs)
|
||||
scope.err(err, w, req)
|
||||
return
|
||||
}
|
||||
|
||||
// transform fields
|
||||
// TODO: DecodeParametersInto should do this.
|
||||
if opts.FieldSelector != nil {
|
||||
|
@ -1078,6 +1078,10 @@ func typeToJSON(typeName string) string {
|
||||
return "string"
|
||||
case "v1.DeletionPropagation", "*v1.DeletionPropagation":
|
||||
return "string"
|
||||
case "v1.ResourceVersionMatch", "*v1.ResourceVersionMatch":
|
||||
return "string"
|
||||
case "v1.IncludeObjectPolicy", "*v1.IncludeObjectPolicy":
|
||||
return "string"
|
||||
|
||||
// TODO: Fix these when go-restful supports a way to specify an array query param:
|
||||
// https://github.com/emicklei/go-restful/issues/225
|
||||
|
@ -322,7 +322,7 @@ func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate,
|
||||
p.Continue = options.Continue
|
||||
list := e.NewListFunc()
|
||||
qualifiedResource := e.qualifiedResourceFromContext(ctx)
|
||||
storageOpts := storage.ListOptions{ResourceVersion: options.ResourceVersion, Predicate: p}
|
||||
storageOpts := storage.ListOptions{ResourceVersion: options.ResourceVersion, ResourceVersionMatch: options.ResourceVersionMatch, Predicate: p}
|
||||
if name, ok := p.MatchesSingle(); ok {
|
||||
if key, err := e.KeyFunc(ctx, name); err == nil {
|
||||
err := e.Storage.GetToList(ctx, key, storageOpts, list)
|
||||
|
@ -580,7 +580,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, opts storage.ListOpt
|
||||
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
|
||||
hasContinuation := pagingEnabled && len(pred.Continue) > 0
|
||||
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
|
||||
if resourceVersion == "" || hasContinuation || hasLimit {
|
||||
if resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact {
|
||||
// If resourceVersion is not specified, serve it from underlying
|
||||
// storage (for backward compatibility). If a continuation is
|
||||
// requested, serve it from the underlying storage as well.
|
||||
@ -654,7 +654,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions,
|
||||
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
|
||||
hasContinuation := pagingEnabled && len(pred.Continue) > 0
|
||||
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
|
||||
if resourceVersion == "" || hasContinuation || hasLimit {
|
||||
if resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact {
|
||||
// If resourceVersion is not specified, serve it from underlying
|
||||
// storage (for backward compatibility). If a continuation is
|
||||
// requested, serve it from the underlying storage as well.
|
||||
@ -1090,7 +1090,7 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
|
||||
Continue: options.Continue,
|
||||
}
|
||||
|
||||
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, storage.ListOptions{Predicate: pred}, list); err != nil {
|
||||
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, storage.ListOptions{ResourceVersionMatch: options.ResourceVersionMatch, Predicate: pred}, list); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return list, nil
|
||||
|
@ -68,6 +68,7 @@ go_library(
|
||||
deps = [
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/conversion:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
|
@ -32,6 +32,8 @@ import (
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/conversion"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@ -62,10 +64,7 @@ func (d authenticatedDataString) AuthenticatedData() []byte {
|
||||
var _ value.Context = authenticatedDataString("")
|
||||
|
||||
type store struct {
|
||||
client *clientv3.Client
|
||||
// getOps contains additional options that should be passed
|
||||
// to all Get() calls.
|
||||
getOps []clientv3.OpOption
|
||||
client *clientv3.Client
|
||||
codec runtime.Codec
|
||||
versioner storage.Versioner
|
||||
transformer value.Transformer
|
||||
@ -115,13 +114,12 @@ func (s *store) Versioner() storage.Versioner {
|
||||
func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
startTime := time.Now()
|
||||
callOpts := s.getOps
|
||||
getResp, err := s.client.KV.Get(ctx, key, callOpts...)
|
||||
getResp, err := s.client.KV.Get(ctx, key)
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = s.ensureMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
|
||||
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -252,7 +250,7 @@ func (s *store) GuaranteedUpdate(
|
||||
|
||||
getCurrentState := func() (*objState, error) {
|
||||
startTime := time.Now()
|
||||
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
||||
getResp, err := s.client.KV.Get(ctx, key)
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -382,10 +380,12 @@ func (s *store) GuaranteedUpdate(
|
||||
// GetToList implements storage.Interface.GetToList.
|
||||
func (s *store) GetToList(ctx context.Context, key string, listOpts storage.ListOptions, listObj runtime.Object) error {
|
||||
resourceVersion := listOpts.ResourceVersion
|
||||
match := listOpts.ResourceVersionMatch
|
||||
pred := listOpts.Predicate
|
||||
trace := utiltrace.New("GetToList etcd3",
|
||||
utiltrace.Field{"key", key},
|
||||
utiltrace.Field{"resourceVersion", resourceVersion},
|
||||
utiltrace.Field{"resourceVersionMatch", match},
|
||||
utiltrace.Field{"limit", pred.Limit},
|
||||
utiltrace.Field{"continue", pred.Continue})
|
||||
defer trace.LogIfLong(500 * time.Millisecond)
|
||||
@ -402,12 +402,21 @@ func (s *store) GetToList(ctx context.Context, key string, listOpts storage.List
|
||||
|
||||
key = path.Join(s.pathPrefix, key)
|
||||
startTime := time.Now()
|
||||
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
||||
var opts []clientv3.OpOption
|
||||
if len(resourceVersion) > 0 && match == metav1.ResourceVersionMatchExact {
|
||||
rv, err := s.versioner.ParseResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
||||
}
|
||||
opts = append(opts, clientv3.WithRev(int64(rv)))
|
||||
}
|
||||
|
||||
getResp, err := s.client.KV.Get(ctx, key, opts...)
|
||||
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
|
||||
if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -515,10 +524,12 @@ func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error
|
||||
// List implements storage.Interface.List.
|
||||
func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||
resourceVersion := opts.ResourceVersion
|
||||
match := opts.ResourceVersionMatch
|
||||
pred := opts.Predicate
|
||||
trace := utiltrace.New("List etcd3",
|
||||
utiltrace.Field{"key", key},
|
||||
utiltrace.Field{"resourceVersion", resourceVersion},
|
||||
utiltrace.Field{"resourceVersionMatch", match},
|
||||
utiltrace.Field{"limit", pred.Limit},
|
||||
utiltrace.Field{"continue", pred.Continue})
|
||||
defer trace.LogIfLong(500 * time.Millisecond)
|
||||
@ -552,6 +563,15 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
|
||||
|
||||
newItemFunc := getNewItemFunc(listObj, v)
|
||||
|
||||
var fromRV *uint64
|
||||
if len(resourceVersion) > 0 {
|
||||
parsedRV, err := s.versioner.ParseResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
||||
}
|
||||
fromRV = &parsedRV
|
||||
}
|
||||
|
||||
var returnedRV, continueRV int64
|
||||
var continueKey string
|
||||
switch {
|
||||
@ -577,20 +597,41 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
|
||||
returnedRV = continueRV
|
||||
}
|
||||
case s.pagingEnabled && pred.Limit > 0:
|
||||
if len(resourceVersion) > 0 {
|
||||
fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
|
||||
if err != nil {
|
||||
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
||||
if fromRV != nil {
|
||||
switch match {
|
||||
case metav1.ResourceVersionMatchNotOlderThan:
|
||||
// The not older than constraint is checked after we get a response from etcd,
|
||||
// and returnedRV is then set to the revision we get from the etcd response.
|
||||
case metav1.ResourceVersionMatchExact:
|
||||
returnedRV = int64(*fromRV)
|
||||
options = append(options, clientv3.WithRev(returnedRV))
|
||||
case "": // legacy case
|
||||
if *fromRV > 0 {
|
||||
returnedRV = int64(*fromRV)
|
||||
options = append(options, clientv3.WithRev(returnedRV))
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
|
||||
}
|
||||
if fromRV > 0 {
|
||||
options = append(options, clientv3.WithRev(int64(fromRV)))
|
||||
}
|
||||
returnedRV = int64(fromRV)
|
||||
}
|
||||
|
||||
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
|
||||
options = append(options, clientv3.WithRange(rangeEnd))
|
||||
default:
|
||||
if fromRV != nil {
|
||||
switch match {
|
||||
case metav1.ResourceVersionMatchNotOlderThan:
|
||||
// The not older than constraint is checked after we get a response from etcd,
|
||||
// and returnedRV is then set to the revision we get from the etcd response.
|
||||
case metav1.ResourceVersionMatchExact:
|
||||
returnedRV = int64(*fromRV)
|
||||
options = append(options, clientv3.WithRev(returnedRV))
|
||||
case "": // legacy case
|
||||
default:
|
||||
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
|
||||
}
|
||||
}
|
||||
|
||||
options = append(options, clientv3.WithPrefix())
|
||||
}
|
||||
|
||||
@ -605,7 +646,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
|
||||
if err != nil {
|
||||
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
|
||||
}
|
||||
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
|
||||
if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
|
||||
return err
|
||||
}
|
||||
hasMore = getResp.More
|
||||
@ -822,9 +863,9 @@ func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, er
|
||||
return []clientv3.OpOption{clientv3.WithLease(id)}, nil
|
||||
}
|
||||
|
||||
// ensureMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is
|
||||
// validateMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is
|
||||
// greater than the most recent actualRevision available from storage.
|
||||
func (s *store) ensureMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error {
|
||||
func (s *store) validateMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error {
|
||||
if minimumResourceVersion == "" {
|
||||
return nil
|
||||
}
|
||||
|
@ -373,6 +373,10 @@ func TestConditionalDelete(t *testing.T) {
|
||||
func TestGetToList(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
prevKey, prevStoredObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "prev"}})
|
||||
|
||||
prevRV, _ := strconv.Atoi(prevStoredObj.ResourceVersion)
|
||||
|
||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
|
||||
currentRV, _ := strconv.Atoi(storedObj.ResourceVersion)
|
||||
@ -382,6 +386,7 @@ func TestGetToList(t *testing.T) {
|
||||
pred storage.SelectionPredicate
|
||||
expectedOut []*example.Pod
|
||||
rv string
|
||||
rvMatch metav1.ResourceVersionMatch
|
||||
expectRVTooLarge bool
|
||||
}{{ // test GetToList on existing key
|
||||
key: key,
|
||||
@ -392,11 +397,41 @@ func TestGetToList(t *testing.T) {
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{storedObj},
|
||||
rv: "0",
|
||||
}, { // test GetToList on existing key with minimum resource version set to 0, match=minimum
|
||||
key: key,
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{storedObj},
|
||||
rv: "0",
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
}, { // test GetToList on existing key with minimum resource version set to current resource version
|
||||
key: key,
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{storedObj},
|
||||
rv: fmt.Sprintf("%d", currentRV),
|
||||
}, { // test GetToList on existing key with minimum resource version set to current resource version, match=minimum
|
||||
key: key,
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{storedObj},
|
||||
rv: fmt.Sprintf("%d", currentRV),
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
}, { // test GetToList on existing key with minimum resource version set to previous resource version, match=minimum
|
||||
key: key,
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{storedObj},
|
||||
rv: fmt.Sprintf("%d", prevRV),
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
}, { // test GetToList on existing key with resource version set to current resource version, match=exact
|
||||
key: key,
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{storedObj},
|
||||
rv: fmt.Sprintf("%d", currentRV),
|
||||
rvMatch: metav1.ResourceVersionMatchExact,
|
||||
}, { // test GetToList on existing key with resource version set to previous resource version, match=exact
|
||||
key: prevKey,
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{prevStoredObj},
|
||||
rv: fmt.Sprintf("%d", prevRV),
|
||||
rvMatch: metav1.ResourceVersionMatchExact,
|
||||
}, { // test GetToList on existing key with minimum resource version set too high
|
||||
key: key,
|
||||
pred: storage.Everything,
|
||||
@ -422,7 +457,7 @@ func TestGetToList(t *testing.T) {
|
||||
|
||||
for i, tt := range tests {
|
||||
out := &example.PodList{}
|
||||
err := store.GetToList(ctx, tt.key, storage.ListOptions{ResourceVersion: tt.rv, Predicate: tt.pred}, out)
|
||||
err := store.GetToList(ctx, tt.key, storage.ListOptions{ResourceVersion: tt.rv, ResourceVersionMatch: tt.rvMatch, Predicate: tt.pred}, out)
|
||||
|
||||
if tt.expectRVTooLarge {
|
||||
if err == nil || !storage.IsTooLargeResourceVersion(err) {
|
||||
@ -934,6 +969,7 @@ func TestList(t *testing.T) {
|
||||
name string
|
||||
disablePaging bool
|
||||
rv string
|
||||
rvMatch metav1.ResourceVersionMatch
|
||||
prefix string
|
||||
pred storage.SelectionPredicate
|
||||
expectedOut []*example.Pod
|
||||
@ -981,6 +1017,31 @@ func TestList(t *testing.T) {
|
||||
expectedOut: []*example.Pod{preset[0].storedObj},
|
||||
rv: "0",
|
||||
},
|
||||
{
|
||||
name: "test List on existing key with resource version set to 1, match=Exact",
|
||||
prefix: "/one-level/",
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{},
|
||||
rv: "1",
|
||||
rvMatch: metav1.ResourceVersionMatchExact,
|
||||
expectRV: "1",
|
||||
},
|
||||
{
|
||||
name: "test List on existing key with resource version set to 1, match=NotOlderThan",
|
||||
prefix: "/one-level/",
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{preset[0].storedObj},
|
||||
rv: "0",
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
},
|
||||
{
|
||||
name: "test List on existing key with resource version set to 1, match=Invalid",
|
||||
prefix: "/one-level/",
|
||||
pred: storage.Everything,
|
||||
rv: "0",
|
||||
rvMatch: "Invalid",
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "test List on existing key with resource version set to current resource version",
|
||||
prefix: "/one-level/",
|
||||
@ -988,6 +1049,23 @@ func TestList(t *testing.T) {
|
||||
expectedOut: []*example.Pod{preset[0].storedObj},
|
||||
rv: list.ResourceVersion,
|
||||
},
|
||||
{
|
||||
name: "test List on existing key with resource version set to current resource version, match=Exact",
|
||||
prefix: "/one-level/",
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{preset[0].storedObj},
|
||||
rv: list.ResourceVersion,
|
||||
rvMatch: metav1.ResourceVersionMatchExact,
|
||||
expectRV: list.ResourceVersion,
|
||||
},
|
||||
{
|
||||
name: "test List on existing key with resource version set to current resource version, match=NotOlderThan",
|
||||
prefix: "/one-level/",
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{preset[0].storedObj},
|
||||
rv: list.ResourceVersion,
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
},
|
||||
{
|
||||
name: "test List on non-existing key",
|
||||
prefix: "/non-existing/",
|
||||
@ -1029,6 +1107,21 @@ func TestList(t *testing.T) {
|
||||
rv: list.ResourceVersion,
|
||||
expectRV: list.ResourceVersion,
|
||||
},
|
||||
{
|
||||
name: "test List with limit at current resource version and match=Exact",
|
||||
prefix: "/two-level/",
|
||||
pred: storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.Everything(),
|
||||
Limit: 1,
|
||||
},
|
||||
expectedOut: []*example.Pod{preset[1].storedObj},
|
||||
expectContinue: true,
|
||||
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
|
||||
rv: list.ResourceVersion,
|
||||
rvMatch: metav1.ResourceVersionMatchExact,
|
||||
expectRV: list.ResourceVersion,
|
||||
},
|
||||
{
|
||||
name: "test List with limit at resource version 0",
|
||||
prefix: "/two-level/",
|
||||
@ -1043,6 +1136,49 @@ func TestList(t *testing.T) {
|
||||
rv: "0",
|
||||
expectRV: list.ResourceVersion,
|
||||
},
|
||||
{
|
||||
name: "test List with limit at resource version 0 match=NotOlderThan",
|
||||
prefix: "/two-level/",
|
||||
pred: storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.Everything(),
|
||||
Limit: 1,
|
||||
},
|
||||
expectedOut: []*example.Pod{preset[1].storedObj},
|
||||
expectContinue: true,
|
||||
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
|
||||
rv: "0",
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
expectRV: list.ResourceVersion,
|
||||
},
|
||||
{
|
||||
name: "test List with limit at resource version 1 and match=Exact",
|
||||
prefix: "/two-level/",
|
||||
pred: storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.Everything(),
|
||||
Limit: 1,
|
||||
},
|
||||
expectedOut: []*example.Pod{},
|
||||
expectContinue: false,
|
||||
rv: "1",
|
||||
rvMatch: metav1.ResourceVersionMatchExact,
|
||||
expectRV: "1",
|
||||
},
|
||||
{
|
||||
name: "test List with limit at old resource version and match=Exact",
|
||||
prefix: "/two-level/",
|
||||
pred: storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.Everything(),
|
||||
Limit: 1,
|
||||
},
|
||||
expectedOut: []*example.Pod{},
|
||||
expectContinue: false,
|
||||
rv: "1",
|
||||
rvMatch: metav1.ResourceVersionMatchExact,
|
||||
expectRV: "1",
|
||||
},
|
||||
{
|
||||
name: "test List with limit when paging disabled",
|
||||
disablePaging: true,
|
||||
@ -1201,7 +1337,7 @@ func TestList(t *testing.T) {
|
||||
}
|
||||
|
||||
out := &example.PodList{}
|
||||
storageOpts := storage.ListOptions{ResourceVersion: tt.rv, Predicate: tt.pred}
|
||||
storageOpts := storage.ListOptions{ResourceVersion: tt.rv, ResourceVersionMatch: tt.rvMatch, Predicate: tt.pred}
|
||||
var err error
|
||||
if tt.disablePaging {
|
||||
err = disablePagingStore.List(ctx, tt.prefix, storageOpts, out)
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@ -189,17 +190,20 @@ type Interface interface {
|
||||
// Get unmarshals json found at key into objPtr. On a not found error, will either
|
||||
// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
|
||||
// Treats empty responses and nil response nodes exactly like a not found error.
|
||||
// The returned contents may be delayed according to the semantics of GetOptions.ResourceVersion.
|
||||
// The returned contents may be delayed, but it is guaranteed that they will
|
||||
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
||||
Get(ctx context.Context, key string, opts GetOptions, objPtr runtime.Object) error
|
||||
|
||||
// GetToList unmarshals json found at key and opaque it into *List api object
|
||||
// (an object that satisfies the runtime.IsList definition).
|
||||
// The returned contents may be delayed according to the semantics of ListOptions.ResourceVersion.
|
||||
// The returned contents may be delayed, but it is guaranteed that they will
|
||||
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
||||
GetToList(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
|
||||
|
||||
// List unmarshalls jsons found at directory defined by key and opaque them
|
||||
// into *List api object (an object that satisfies runtime.IsList definition).
|
||||
// The returned contents may be delayed according to the semantics of ListOptions.ResourceVersion.
|
||||
// The returned contents may be delayed, but it is guaranteed that they will
|
||||
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
||||
List(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
|
||||
|
||||
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
|
||||
@ -260,6 +264,9 @@ type ListOptions struct {
|
||||
// ResourceVersion. The newest available data is preferred, but any data not older than this
|
||||
// ResourceVersion may be served.
|
||||
ResourceVersion string
|
||||
// ResourceVersionMatch provides the rule for how the resource version constraint applies. If set
|
||||
// to the default value "" the legacy resource version semantic apply.
|
||||
ResourceVersionMatch metav1.ResourceVersionMatch
|
||||
// Predicate provides the selection rules for the list operation.
|
||||
Predicate SelectionPredicate
|
||||
}
|
||||
|
@ -626,7 +626,7 @@ func schema_pkg_apis_meta_v1_GetOptions(ref common.ReferenceCallback) common.Ope
|
||||
},
|
||||
"resourceVersion": {
|
||||
SchemaProps: spec.SchemaProps{
|
||||
Description: "When specified: - if unset, then the result is returned from remote storage based on quorum-read flag; - if it's 0, then we simply return what we currently have in cache, no guarantee; - if set to non zero, then the result is at least as fresh as given rv.",
|
||||
Description: "resourceVersion sets a constraint on what resource versions a request may be served from. See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for details.\n\nDefaults to unset",
|
||||
Type: []string{"string"},
|
||||
Format: "",
|
||||
},
|
||||
@ -1067,7 +1067,14 @@ func schema_pkg_apis_meta_v1_ListOptions(ref common.ReferenceCallback) common.Op
|
||||
},
|
||||
"resourceVersion": {
|
||||
SchemaProps: spec.SchemaProps{
|
||||
Description: "When specified with a watch call, shows changes that occur after that particular version of a resource. Defaults to changes from the beginning of history. When specified for list: - if unset, then the result is returned from remote storage based on quorum-read flag; - if it's 0, then we simply return what we currently have in cache, no guarantee; - if set to non zero, then the result is at least as fresh as given rv.",
|
||||
Description: "resourceVersion sets a constraint on what resource versions a request may be served from. See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for details.\n\nDefaults to unset",
|
||||
Type: []string{"string"},
|
||||
Format: "",
|
||||
},
|
||||
},
|
||||
"resourceVersionMatch": {
|
||||
SchemaProps: spec.SchemaProps{
|
||||
Description: "resourceVersionMatch determines how resourceVersion is applied to list calls. It is highly recommended that resourceVersionMatch be set for list calls where resourceVersion is set See https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions for details.\n\nDefaults to unset",
|
||||
Type: []string{"string"},
|
||||
Format: "",
|
||||
},
|
||||
|
@ -61,6 +61,7 @@ go_test(
|
||||
"//staging/src/k8s.io/client-go/discovery/cached/disk:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/typed/apps/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/metadata:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library",
|
||||
@ -68,6 +69,7 @@ go_test(
|
||||
"//staging/src/k8s.io/client-go/tools/pager:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
||||
"//staging/src/k8s.io/kubectl/pkg/cmd/util:go_default_library",
|
||||
"//test/integration:go_default_library",
|
||||
"//test/integration/framework:go_default_library",
|
||||
"//vendor/github.com/google/uuid:go_default_library",
|
||||
"//vendor/k8s.io/gengo/examples/set-gen/sets:go_default_library",
|
||||
|
@ -54,12 +54,15 @@ import (
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/dynamic"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
|
||||
"k8s.io/client-go/metadata"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/pager"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"k8s.io/kubernetes/pkg/master"
|
||||
"k8s.io/kubernetes/test/integration"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
|
||||
@ -86,7 +89,7 @@ func setupWithResourcesWithOptions(t *testing.T, opts *framework.MasterConfigOpt
|
||||
masterConfig.GenericConfig.OpenAPIConfig = framework.DefaultOpenAPIConfig()
|
||||
_, s, closeFn := framework.RunAMaster(masterConfig)
|
||||
|
||||
clientSet, err := clientset.NewForConfig(&restclient.Config{Host: s.URL})
|
||||
clientSet, err := clientset.NewForConfig(&restclient.Config{Host: s.URL, QPS: -1})
|
||||
if err != nil {
|
||||
t.Fatalf("Error in create clientset: %v", err)
|
||||
}
|
||||
@ -309,6 +312,207 @@ func Test202StatusCode(t *testing.T) {
|
||||
verifyStatusCode(t, "DELETE", s.URL+path.Join("/apis/apps/v1/namespaces", ns.Name, "replicasets", rs.Name), cascDel, 202)
|
||||
}
|
||||
|
||||
var (
|
||||
invalidContinueToken = "invalidContinueToken"
|
||||
invalidResourceVersion = "invalid"
|
||||
invalidResourceVersionMatch = metav1.ResourceVersionMatch("InvalidMatch")
|
||||
)
|
||||
|
||||
// TestListOptions ensures that list works as expected for valid and invalid combinations of limit, continue,
|
||||
// resourceVersion and resourceVersionMatch.
|
||||
func TestListOptions(t *testing.T) {
|
||||
for _, watchCacheEnabled := range []bool{true, false} {
|
||||
t.Run(fmt.Sprintf("watchCacheEnabled=%t", watchCacheEnabled), func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIListChunking, true)()
|
||||
etcdOptions := framework.DefaultEtcdOptions()
|
||||
etcdOptions.EnableWatchCache = watchCacheEnabled
|
||||
s, clientSet, closeFn := setupWithOptions(t, &framework.MasterConfigOptions{EtcdOptions: etcdOptions})
|
||||
defer closeFn()
|
||||
|
||||
ns := framework.CreateTestingNamespace("list-options", s, t)
|
||||
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||
|
||||
rsClient := clientSet.AppsV1().ReplicaSets(ns.Name)
|
||||
|
||||
var compactedRv, oldestUncompactedRv string
|
||||
for i := 0; i < 15; i++ {
|
||||
rs := newRS(ns.Name)
|
||||
rs.Name = fmt.Sprintf("test-%d", i)
|
||||
created, err := rsClient.Create(context.Background(), rs, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if i == 0 {
|
||||
compactedRv = created.ResourceVersion // We compact this first resource version below
|
||||
}
|
||||
// delete the first 5, and then compact them
|
||||
if i < 5 {
|
||||
var zero int64
|
||||
if err := rsClient.Delete(context.Background(), rs.Name, metav1.DeleteOptions{GracePeriodSeconds: &zero}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
oldestUncompactedRv = created.ResourceVersion
|
||||
}
|
||||
}
|
||||
|
||||
// compact some of the revision history in etcd so we can test "too old" resource versions
|
||||
_, kvClient, err := integration.GetEtcdClients(etcdOptions.StorageConfig.Transport)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
revision, err := strconv.Atoi(oldestUncompactedRv)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = kvClient.Compact(context.Background(), int64(revision))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
listObj, err := rsClient.List(context.Background(), metav1.ListOptions{
|
||||
Limit: 6,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
validContinueToken := listObj.Continue
|
||||
|
||||
// test all combinations of these, for both watch cache enabled and disabled:
|
||||
limits := []int64{0, 6}
|
||||
continueTokens := []string{"", validContinueToken, invalidContinueToken}
|
||||
rvs := []string{"", "0", compactedRv, invalidResourceVersion}
|
||||
rvMatches := []metav1.ResourceVersionMatch{
|
||||
"",
|
||||
metav1.ResourceVersionMatchNotOlderThan,
|
||||
metav1.ResourceVersionMatchExact,
|
||||
invalidResourceVersionMatch,
|
||||
}
|
||||
|
||||
for _, limit := range limits {
|
||||
for _, continueToken := range continueTokens {
|
||||
for _, rv := range rvs {
|
||||
for _, rvMatch := range rvMatches {
|
||||
name := fmt.Sprintf("limit=%d continue=%s rv=%s rvMatch=%s", limit, continueToken, rv, rvMatch)
|
||||
t.Run(name, func(t *testing.T) {
|
||||
opts := metav1.ListOptions{
|
||||
ResourceVersion: rv,
|
||||
ResourceVersionMatch: rvMatch,
|
||||
Continue: continueToken,
|
||||
Limit: limit,
|
||||
}
|
||||
testListOptionsCase(t, rsClient, watchCacheEnabled, opts, compactedRv)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testListOptionsCase(t *testing.T, rsClient appsv1.ReplicaSetInterface, watchCacheEnabled bool, opts metav1.ListOptions, compactedRv string) {
|
||||
listObj, err := rsClient.List(context.Background(), opts)
|
||||
|
||||
// check for expected validation errors
|
||||
if opts.ResourceVersion == "" && opts.ResourceVersionMatch != "" {
|
||||
if err == nil || !strings.Contains(err.Error(), "resourceVersionMatch is forbidden unless resourceVersion is provided") {
|
||||
t.Fatalf("expected forbidden error, but got: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if opts.Continue != "" && opts.ResourceVersionMatch != "" {
|
||||
if err == nil || !strings.Contains(err.Error(), "resourceVersionMatch is forbidden when continue is provided") {
|
||||
t.Fatalf("expected forbidden error, but got: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if opts.ResourceVersionMatch == invalidResourceVersionMatch {
|
||||
if err == nil || !strings.Contains(err.Error(), "supported values") {
|
||||
t.Fatalf("expected not supported error, but got: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact && opts.ResourceVersion == "0" {
|
||||
if err == nil || !strings.Contains(err.Error(), "resourceVersionMatch \"exact\" is forbidden for resourceVersion \"0\"") {
|
||||
t.Fatalf("expected forbidden error, but got: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if opts.ResourceVersion == invalidResourceVersion {
|
||||
if err == nil || !strings.Contains(err.Error(), "Invalid value") {
|
||||
t.Fatalf("expecting invalid value error, but got: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
if opts.Continue == invalidContinueToken {
|
||||
if err == nil || !strings.Contains(err.Error(), "continue key is not valid") {
|
||||
t.Fatalf("expected continue key not valid error, but got: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
// Should not be allowed for any resource version, but tightening the validation would be a breaking change
|
||||
if opts.Continue != "" && !(opts.ResourceVersion == "" || opts.ResourceVersion == "0") {
|
||||
if err == nil || !strings.Contains(err.Error(), "specifying resource version is not allowed when using continue") {
|
||||
t.Fatalf("expected not allowed error, but got: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Check for too old errors
|
||||
isExact := opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact
|
||||
// Legacy corner cases that can be avoided by using an explicit resourceVersionMatch value
|
||||
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter
|
||||
isLegacyExact := opts.Limit > 0 && opts.ResourceVersionMatch == ""
|
||||
|
||||
if opts.ResourceVersion == compactedRv && (isExact || isLegacyExact) {
|
||||
if err == nil || !strings.Contains(err.Error(), "The resourceVersion for the provided list is too old") {
|
||||
t.Fatalf("expected too old error, but got: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// test successful responses
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
items, err := meta.ExtractList(listObj)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to extract list from %v", listObj)
|
||||
}
|
||||
count := int64(len(items))
|
||||
|
||||
// Cacher.GetToList defines this for logic to decide if the watch cache is skipped. We need to know it to know if
|
||||
// the limit is respected when testing here.
|
||||
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
|
||||
hasContinuation := pagingEnabled && len(opts.Continue) > 0
|
||||
hasLimit := pagingEnabled && opts.Limit > 0 && opts.ResourceVersion != "0"
|
||||
skipWatchCache := opts.ResourceVersion == "" || hasContinuation || hasLimit || isExact
|
||||
usingWatchCache := watchCacheEnabled && !skipWatchCache
|
||||
|
||||
if usingWatchCache { // watch cache does not respect limit and is not used for continue
|
||||
if count != 10 {
|
||||
t.Errorf("Expected list size to be 10 but got %d", count) // limit is ignored if watch cache is hit
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if opts.Continue != "" {
|
||||
if count != 4 {
|
||||
t.Errorf("Expected list size of 4 but got %d", count)
|
||||
}
|
||||
return
|
||||
}
|
||||
if opts.Limit > 0 {
|
||||
if count != opts.Limit {
|
||||
t.Errorf("Expected list size to be limited to %d but got %d", opts.Limit, count)
|
||||
}
|
||||
return
|
||||
}
|
||||
if count != 10 {
|
||||
t.Errorf("Expected list size to be 10 but got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestListResourceVersion0(t *testing.T) {
|
||||
var testcases = []struct {
|
||||
name string
|
||||
|
1
vendor/modules.txt
vendored
1
vendor/modules.txt
vendored
@ -1632,6 +1632,7 @@ k8s.io/apimachinery/pkg/api/validation/path
|
||||
k8s.io/apimachinery/pkg/apis/meta/fuzzer
|
||||
k8s.io/apimachinery/pkg/apis/meta/internalversion
|
||||
k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme
|
||||
k8s.io/apimachinery/pkg/apis/meta/internalversion/validation
|
||||
k8s.io/apimachinery/pkg/apis/meta/v1
|
||||
k8s.io/apimachinery/pkg/apis/meta/v1/unstructured
|
||||
k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/unstructuredscheme
|
||||
|
Loading…
Reference in New Issue
Block a user