Merge pull request #75953 from wojtek-t/add_etcd_metrics
Expose etcd client latency metrics
This commit is contained in:
		| @@ -62,6 +62,7 @@ go_library( | |||||||
|         "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", |         "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", | ||||||
|         "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", |         "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", | ||||||
|         "//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library", |         "//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library", | ||||||
|  |         "//staging/src/k8s.io/apiserver/pkg/storage/etcd/metrics:go_default_library", | ||||||
|         "//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library", |         "//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library", | ||||||
|         "//vendor/github.com/coreos/etcd/clientv3:go_default_library", |         "//vendor/github.com/coreos/etcd/clientv3:go_default_library", | ||||||
|         "//vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes:go_default_library", |         "//vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes:go_default_library", | ||||||
|   | |||||||
| @@ -38,6 +38,7 @@ import ( | |||||||
| 	"k8s.io/apimachinery/pkg/watch" | 	"k8s.io/apimachinery/pkg/watch" | ||||||
| 	"k8s.io/apiserver/pkg/storage" | 	"k8s.io/apiserver/pkg/storage" | ||||||
| 	"k8s.io/apiserver/pkg/storage/etcd" | 	"k8s.io/apiserver/pkg/storage/etcd" | ||||||
|  | 	"k8s.io/apiserver/pkg/storage/etcd/metrics" | ||||||
| 	"k8s.io/apiserver/pkg/storage/value" | 	"k8s.io/apiserver/pkg/storage/value" | ||||||
| 	utiltrace "k8s.io/utils/trace" | 	utiltrace "k8s.io/utils/trace" | ||||||
| ) | ) | ||||||
| @@ -111,7 +112,9 @@ func (s *store) Versioner() storage.Versioner { | |||||||
| // Get implements storage.Interface.Get. | // Get implements storage.Interface.Get. | ||||||
| func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error { | func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error { | ||||||
| 	key = path.Join(s.pathPrefix, key) | 	key = path.Join(s.pathPrefix, key) | ||||||
|  | 	startTime := time.Now() | ||||||
| 	getResp, err := s.client.KV.Get(ctx, key, s.getOps...) | 	getResp, err := s.client.KV.Get(ctx, key, s.getOps...) | ||||||
|  | 	metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @@ -156,11 +159,13 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, | |||||||
| 		return storage.NewInternalError(err.Error()) | 		return storage.NewInternalError(err.Error()) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	startTime := time.Now() | ||||||
| 	txnResp, err := s.client.KV.Txn(ctx).If( | 	txnResp, err := s.client.KV.Txn(ctx).If( | ||||||
| 		notFound(key), | 		notFound(key), | ||||||
| 	).Then( | 	).Then( | ||||||
| 		clientv3.OpPut(key, string(newData), opts...), | 		clientv3.OpPut(key, string(newData), opts...), | ||||||
| 	).Commit() | 	).Commit() | ||||||
|  | 	metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @@ -191,10 +196,12 @@ func (s *store) Delete(ctx context.Context, key string, out runtime.Object, prec | |||||||
| func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime.Object) error { | func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime.Object) error { | ||||||
| 	// We need to do get and delete in single transaction in order to | 	// We need to do get and delete in single transaction in order to | ||||||
| 	// know the value and revision before deleting it. | 	// know the value and revision before deleting it. | ||||||
|  | 	startTime := time.Now() | ||||||
| 	txnResp, err := s.client.KV.Txn(ctx).If().Then( | 	txnResp, err := s.client.KV.Txn(ctx).If().Then( | ||||||
| 		clientv3.OpGet(key), | 		clientv3.OpGet(key), | ||||||
| 		clientv3.OpDelete(key), | 		clientv3.OpDelete(key), | ||||||
| 	).Commit() | 	).Commit() | ||||||
|  | 	metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @@ -212,7 +219,9 @@ func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime | |||||||
| } | } | ||||||
|  |  | ||||||
| func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions) error { | func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions) error { | ||||||
|  | 	startTime := time.Now() | ||||||
| 	getResp, err := s.client.KV.Get(ctx, key) | 	getResp, err := s.client.KV.Get(ctx, key) | ||||||
|  | 	metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @@ -224,6 +233,7 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O | |||||||
| 		if err := preconditions.Check(key, origState.obj); err != nil { | 		if err := preconditions.Check(key, origState.obj); err != nil { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
|  | 		startTime := time.Now() | ||||||
| 		txnResp, err := s.client.KV.Txn(ctx).If( | 		txnResp, err := s.client.KV.Txn(ctx).If( | ||||||
| 			clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), | 			clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), | ||||||
| 		).Then( | 		).Then( | ||||||
| @@ -231,6 +241,7 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O | |||||||
| 		).Else( | 		).Else( | ||||||
| 			clientv3.OpGet(key), | 			clientv3.OpGet(key), | ||||||
| 		).Commit() | 		).Commit() | ||||||
|  | 		metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| @@ -247,7 +258,7 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O | |||||||
| func (s *store) GuaranteedUpdate( | func (s *store) GuaranteedUpdate( | ||||||
| 	ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, | 	ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, | ||||||
| 	preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error { | 	preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error { | ||||||
| 	trace := utiltrace.New(fmt.Sprintf("GuaranteedUpdate etcd3: %s", reflect.TypeOf(out).String())) | 	trace := utiltrace.New(fmt.Sprintf("GuaranteedUpdate etcd3: %s", getTypeName(out))) | ||||||
| 	defer trace.LogIfLong(500 * time.Millisecond) | 	defer trace.LogIfLong(500 * time.Millisecond) | ||||||
|  |  | ||||||
| 	v, err := conversion.EnforcePtr(out) | 	v, err := conversion.EnforcePtr(out) | ||||||
| @@ -257,7 +268,9 @@ func (s *store) GuaranteedUpdate( | |||||||
| 	key = path.Join(s.pathPrefix, key) | 	key = path.Join(s.pathPrefix, key) | ||||||
|  |  | ||||||
| 	getCurrentState := func() (*objState, error) { | 	getCurrentState := func() (*objState, error) { | ||||||
|  | 		startTime := time.Now() | ||||||
| 		getResp, err := s.client.KV.Get(ctx, key, s.getOps...) | 		getResp, err := s.client.KV.Get(ctx, key, s.getOps...) | ||||||
|  | 		metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
| @@ -339,6 +352,7 @@ func (s *store) GuaranteedUpdate( | |||||||
| 		} | 		} | ||||||
| 		trace.Step("Transaction prepared") | 		trace.Step("Transaction prepared") | ||||||
|  |  | ||||||
|  | 		startTime := time.Now() | ||||||
| 		txnResp, err := s.client.KV.Txn(ctx).If( | 		txnResp, err := s.client.KV.Txn(ctx).If( | ||||||
| 			clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), | 			clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev), | ||||||
| 		).Then( | 		).Then( | ||||||
| @@ -346,6 +360,7 @@ func (s *store) GuaranteedUpdate( | |||||||
| 		).Else( | 		).Else( | ||||||
| 			clientv3.OpGet(key), | 			clientv3.OpGet(key), | ||||||
| 		).Commit() | 		).Commit() | ||||||
|  | 		metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| @@ -379,7 +394,9 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	key = path.Join(s.pathPrefix, key) | 	key = path.Join(s.pathPrefix, key) | ||||||
|  | 	startTime := time.Now() | ||||||
| 	getResp, err := s.client.KV.Get(ctx, key, s.getOps...) | 	getResp, err := s.client.KV.Get(ctx, key, s.getOps...) | ||||||
|  | 	metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @@ -399,7 +416,9 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin | |||||||
|  |  | ||||||
| func (s *store) Count(key string) (int64, error) { | func (s *store) Count(key string) (int64, error) { | ||||||
| 	key = path.Join(s.pathPrefix, key) | 	key = path.Join(s.pathPrefix, key) | ||||||
|  | 	startTime := time.Now() | ||||||
| 	getResp, err := s.client.KV.Get(context.Background(), key, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)), clientv3.WithCountOnly()) | 	getResp, err := s.client.KV.Get(context.Background(), key, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)), clientv3.WithCountOnly()) | ||||||
|  | 	metrics.RecordEtcdRequestLatency("listWithCount", key, startTime) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return 0, err | 		return 0, err | ||||||
| 	} | 	} | ||||||
| @@ -554,7 +573,9 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor | |||||||
| 	var lastKey []byte | 	var lastKey []byte | ||||||
| 	var hasMore bool | 	var hasMore bool | ||||||
| 	for { | 	for { | ||||||
|  | 		startTime := time.Now() | ||||||
| 		getResp, err := s.client.KV.Get(ctx, key, options...) | 		getResp, err := s.client.KV.Get(ctx, key, options...) | ||||||
|  | 		metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix) | 			return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix) | ||||||
| 		} | 		} | ||||||
| @@ -786,3 +807,8 @@ func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.Selec | |||||||
| func notFound(key string) clientv3.Cmp { | func notFound(key string) clientv3.Cmp { | ||||||
| 	return clientv3.Compare(clientv3.ModRevision(key), "=", 0) | 	return clientv3.Compare(clientv3.ModRevision(key), "=", 0) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // getTypeName returns type name of an object for reporting purposes. | ||||||
|  | func getTypeName(obj interface{}) string { | ||||||
|  | 	return reflect.TypeOf(obj).String() | ||||||
|  | } | ||||||
|   | |||||||
| @@ -61,6 +61,9 @@ func scrapeMetrics(s *httptest.Server) ([]*prometheuspb.MetricFamily, error) { | |||||||
| 	// Each line in the response body should contain all the data for a single metric. | 	// Each line in the response body should contain all the data for a single metric. | ||||||
| 	var metrics []*prometheuspb.MetricFamily | 	var metrics []*prometheuspb.MetricFamily | ||||||
| 	scanner := bufio.NewScanner(resp.Body) | 	scanner := bufio.NewScanner(resp.Body) | ||||||
|  | 	// Increase buffer size, since default one is too small for reading | ||||||
|  | 	// the /metrics contents. | ||||||
|  | 	scanner.Buffer(make([]byte, 10), 131072) | ||||||
| 	for scanner.Scan() { | 	for scanner.Scan() { | ||||||
| 		var metric prometheuspb.MetricFamily | 		var metric prometheuspb.MetricFamily | ||||||
| 		if err := proto.UnmarshalText(scanner.Text(), &metric); err != nil { | 		if err := proto.UnmarshalText(scanner.Text(), &metric); err != nil { | ||||||
| @@ -122,5 +125,6 @@ func TestApiserverMetrics(t *testing.T) { | |||||||
| 	checkForExpectedMetrics(t, metrics, []string{ | 	checkForExpectedMetrics(t, metrics, []string{ | ||||||
| 		"apiserver_request_total", | 		"apiserver_request_total", | ||||||
| 		"apiserver_request_duration_seconds", | 		"apiserver_request_duration_seconds", | ||||||
|  | 		"etcd_request_duration_seconds", | ||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot