Merge pull request #9845 from dcantah/prometheus-middleware-deprecated

Replace go-grpc-prometheus with go-grpc-middleware/providers/prometheus
This commit is contained in:
Phil Estes
2024-02-21 18:55:34 +00:00
committed by GitHub
45 changed files with 1224 additions and 2348 deletions

View File

@@ -1,204 +0,0 @@
# Created by .ignore support plugin (hsz.mobi)
### Go template
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
*.test
*.prof
### Windows template
# Windows image file caches
Thumbs.db
ehthumbs.db
# Folder config file
Desktop.ini
# Recycle Bin used on file shares
$RECYCLE.BIN/
# Windows Installer files
*.cab
*.msi
*.msm
*.msp
# Windows shortcuts
*.lnk
### Kate template
# Swap Files #
.*.kate-swp
.swp.*
### SublimeText template
# cache files for sublime text
*.tmlanguage.cache
*.tmPreferences.cache
*.stTheme.cache
# workspace files are user-specific
*.sublime-workspace
# project files should be checked into the repository, unless a significant
# proportion of contributors will probably not be using SublimeText
# *.sublime-project
# sftp configuration file
sftp-config.json
### Linux template
*~
# temporary files which can be created if a process still has a handle open of a deleted file
.fuse_hidden*
# KDE directory preferences
.directory
# Linux trash folder which might appear on any partition or disk
.Trash-*
### JetBrains template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff:
.idea
.idea/tasks.xml
.idea/dictionaries
.idea/vcs.xml
.idea/jsLibraryMappings.xml
# Sensitive or high-churn files:
.idea/dataSources.ids
.idea/dataSources.xml
.idea/dataSources.local.xml
.idea/sqlDataSources.xml
.idea/dynamic.xml
.idea/uiDesigner.xml
# Gradle:
.idea/gradle.xml
.idea/libraries
# Mongo Explorer plugin:
.idea/mongoSettings.xml
## File-based project format:
*.iws
## Plugin-specific files:
# IntelliJ
/out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
### Xcode template
# Xcode
#
# gitignore contributors: remember to update Global/Xcode.gitignore, Objective-C.gitignore & Swift.gitignore
## Build generated
build/
DerivedData/
## Various settings
*.pbxuser
!default.pbxuser
*.mode1v3
!default.mode1v3
*.mode2v3
!default.mode2v3
*.perspectivev3
!default.perspectivev3
xcuserdata/
## Other
*.moved-aside
*.xccheckout
*.xcscmblueprint
### Eclipse template
.metadata
bin/
tmp/
*.tmp
*.bak
*.swp
*~.nib
local.properties
.settings/
.loadpath
.recommenders
# Eclipse Core
.project
# External tool builders
.externalToolBuilders/
# Locally stored "Eclipse launch configurations"
*.launch
# PyDev specific (Python IDE for Eclipse)
*.pydevproject
# CDT-specific (C/C++ Development Tooling)
.cproject
# JDT-specific (Eclipse Java Development Tools)
.classpath
# Java annotation processor (APT)
.factorypath
# PDT-specific (PHP Development Tools)
.buildpath
# sbteclipse plugin
.target
# Tern plugin
.tern-project
# TeXlipse plugin
.texlipse
# STS (Spring Tool Suite)
.springBeans
# Code Recommenders
.recommenders/
coverage.txt
#vendor
vendor/
.envrc

View File

@@ -1,20 +0,0 @@
# Contributing
We would love to have people submit pull requests and help make `grpc-ecosystem/go-grpc-middleware` even better 👍.
Fork, then clone the repo:
```bash
git clone git@github.com:your-username/go-grpc-middleware.git
```
Before checking in please run the following:
```bash
make all
```
This will `vet`, `fmt`, regenerate documentation and run all tests.
Push to your fork and open a pull request.

View File

@@ -1,93 +0,0 @@
# Go gRPC Middleware
[![Travis Build](https://travis-ci.org/grpc-ecosystem/go-grpc-middleware.svg?branch=master)](https://travis-ci.org/grpc-ecosystem/go-grpc-middleware)
[![Go Report Card](https://goreportcard.com/badge/github.com/grpc-ecosystem/go-grpc-middleware)](https://goreportcard.com/report/github.com/grpc-ecosystem/go-grpc-middleware)
[![GoDoc](http://img.shields.io/badge/GoDoc-Reference-blue.svg)](https://godoc.org/github.com/grpc-ecosystem/go-grpc-middleware)
[![SourceGraph](https://sourcegraph.com/github.com/grpc-ecosystem/go-grpc-middleware/-/badge.svg)](https://sourcegraph.com/github.com/grpc-ecosystem/go-grpc-middleware/?badge)
[![codecov](https://codecov.io/gh/grpc-ecosystem/go-grpc-middleware/branch/master/graph/badge.svg)](https://codecov.io/gh/grpc-ecosystem/go-grpc-middleware)
[![Apache 2.0 License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE)
[![quality: production](https://img.shields.io/badge/quality-production-orange.svg)](#status)
[![Slack](https://img.shields.io/badge/slack-%23grpc--middleware-brightgreen)](https://gophers.slack.com/archives/CNJL30P4P)
[gRPC Go](https://github.com/grpc/grpc-go) Middleware: interceptors, helpers, utilities.
## ⚠️ Status
Version [v2](https://github.com/grpc-ecosystem/go-grpc-middleware/tree/v2) is about to be released, with migration guide, which will replace v1. Try v2 and give us feedback!
Version v1 is currently in deprecation mode, which means only critical and safety bug fixes will be merged.
## Middleware
[gRPC Go](https://github.com/grpc/grpc-go) recently acquired support for
Interceptors, i.e. [middleware](https://medium.com/@matryer/writing-middleware-in-golang-and-how-go-makes-it-so-much-fun-4375c1246e81#.gv7tdlghs)
that is executed either on the gRPC Server before the request is passed onto the user's application logic, or on the gRPC client around the user call. It is a perfect way to implement
common patterns: auth, logging, message, validation, retries, or monitoring.
These are generic building blocks that make it easy to build multiple microservices easily.
The purpose of this repository is to act as a go-to point for such reusable functionality. It contains
some of them itself, but also will link to useful external repos.
`grpc_middleware` itself provides support for chaining interceptors, here's an example:
```go
import "github.com/grpc-ecosystem/go-grpc-middleware"
myServer := grpc.NewServer(
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
grpc_ctxtags.StreamServerInterceptor(),
grpc_opentracing.StreamServerInterceptor(),
grpc_prometheus.StreamServerInterceptor,
grpc_zap.StreamServerInterceptor(zapLogger),
grpc_auth.StreamServerInterceptor(myAuthFunction),
grpc_recovery.StreamServerInterceptor(),
)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_ctxtags.UnaryServerInterceptor(),
grpc_opentracing.UnaryServerInterceptor(),
grpc_prometheus.UnaryServerInterceptor,
grpc_zap.UnaryServerInterceptor(zapLogger),
grpc_auth.UnaryServerInterceptor(myAuthFunction),
grpc_recovery.UnaryServerInterceptor(),
)),
)
```
## Interceptors
_Please send a PR to add new interceptors or middleware to this list_
#### Auth
- [`grpc_auth`](auth) - a customizable (via `AuthFunc`) piece of auth middleware
#### Logging
- [`grpc_ctxtags`](tags/) - a library that adds a `Tag` map to context, with data populated from request body
- [`grpc_zap`](logging/zap/) - integration of [zap](https://github.com/uber-go/zap) logging library into gRPC handlers.
- [`grpc_logrus`](logging/logrus/) - integration of [logrus](https://github.com/sirupsen/logrus) logging library into gRPC handlers.
- [`grpc_kit`](logging/kit/) - integration of [go-kit/log](https://github.com/go-kit/log) logging library into gRPC handlers.
- [`grpc_grpc_logsettable`](logging/settable/) - a wrapper around `grpclog.LoggerV2` that allows to replace loggers in runtime (thread-safe).
#### Monitoring
- [`grpc_prometheus`⚡](https://github.com/grpc-ecosystem/go-grpc-prometheus) - Prometheus client-side and server-side monitoring middleware
- [`otgrpc`⚡](https://github.com/grpc-ecosystem/grpc-opentracing/tree/master/go/otgrpc) - [OpenTracing](http://opentracing.io/) client-side and server-side interceptors
- [`grpc_opentracing`](tracing/opentracing) - [OpenTracing](http://opentracing.io/) client-side and server-side interceptors with support for streaming and handler-returned tags
- [`otelgrpc`](https://github.com/open-telemetry/opentelemetry-go-contrib/tree/main/instrumentation/google.golang.org/grpc/otelgrpc) - [OpenTelemetry](https://opentelemetry.io/) client-side and server-side interceptors
#### Client
- [`grpc_retry`](retry/) - a generic gRPC response code retry mechanism, client-side middleware
#### Server
- [`grpc_validator`](validator/) - codegen inbound message validation from `.proto` options
- [`grpc_recovery`](recovery/) - turn panics into gRPC errors
- [`ratelimit`](ratelimit/) - grpc rate limiting by your own limiter
## License
`go-grpc-middleware` is released under the Apache 2.0 license. See the [LICENSE](LICENSE) file for details.

View File

@@ -1,166 +0,0 @@
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
// gRPC Server Interceptor chaining middleware.
package grpc_middleware
import (
"context"
"google.golang.org/grpc"
)
// ChainUnaryServer creates a single interceptor out of a chain of many interceptors.
//
// Execution is done in left-to-right order, including passing of context.
// For example ChainUnaryServer(one, two, three) will execute one before two before three, and three
// will see context changes of one and two.
//
// While this can be useful in some scenarios, it is generally advisable to use google.golang.org/grpc.ChainUnaryInterceptor directly.
func ChainUnaryServer(interceptors ...grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
n := len(interceptors)
// Dummy interceptor maintained for backward compatibility to avoid returning nil.
if n == 0 {
return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return handler(ctx, req)
}
}
// The degenerate case, just return the single wrapped interceptor directly.
if n == 1 {
return interceptors[0]
}
// Return a function which satisfies the interceptor interface, and which is
// a closure over the given list of interceptors to be chained.
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
currHandler := handler
// Iterate backwards through all interceptors except the first (outermost).
// Wrap each one in a function which satisfies the handler interface, but
// is also a closure over the `info` and `handler` parameters. Then pass
// each pseudo-handler to the next outer interceptor as the handler to be called.
for i := n - 1; i > 0; i-- {
// Rebind to loop-local vars so they can be closed over.
innerHandler, i := currHandler, i
currHandler = func(currentCtx context.Context, currentReq interface{}) (interface{}, error) {
return interceptors[i](currentCtx, currentReq, info, innerHandler)
}
}
// Finally return the result of calling the outermost interceptor with the
// outermost pseudo-handler created above as its handler.
return interceptors[0](ctx, req, info, currHandler)
}
}
// ChainStreamServer creates a single interceptor out of a chain of many interceptors.
//
// Execution is done in left-to-right order, including passing of context.
// For example ChainUnaryServer(one, two, three) will execute one before two before three.
// If you want to pass context between interceptors, use WrapServerStream.
//
// While this can be useful in some scenarios, it is generally advisable to use google.golang.org/grpc.ChainStreamInterceptor directly.
func ChainStreamServer(interceptors ...grpc.StreamServerInterceptor) grpc.StreamServerInterceptor {
n := len(interceptors)
// Dummy interceptor maintained for backward compatibility to avoid returning nil.
if n == 0 {
return func(srv interface{}, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return handler(srv, stream)
}
}
if n == 1 {
return interceptors[0]
}
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
currHandler := handler
for i := n - 1; i > 0; i-- {
innerHandler, i := currHandler, i
currHandler = func(currentSrv interface{}, currentStream grpc.ServerStream) error {
return interceptors[i](currentSrv, currentStream, info, innerHandler)
}
}
return interceptors[0](srv, stream, info, currHandler)
}
}
// ChainUnaryClient creates a single interceptor out of a chain of many interceptors.
//
// Execution is done in left-to-right order, including passing of context.
// For example ChainUnaryClient(one, two, three) will execute one before two before three.
func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
n := len(interceptors)
// Dummy interceptor maintained for backward compatibility to avoid returning nil.
if n == 0 {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(ctx, method, req, reply, cc, opts...)
}
}
if n == 1 {
return interceptors[0]
}
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
currInvoker := invoker
for i := n - 1; i > 0; i-- {
innerInvoker, i := currInvoker, i
currInvoker = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {
return interceptors[i](currentCtx, currentMethod, currentReq, currentRepl, currentConn, innerInvoker, currentOpts...)
}
}
return interceptors[0](ctx, method, req, reply, cc, currInvoker, opts...)
}
}
// ChainStreamClient creates a single interceptor out of a chain of many interceptors.
//
// Execution is done in left-to-right order, including passing of context.
// For example ChainStreamClient(one, two, three) will execute one before two before three.
func ChainStreamClient(interceptors ...grpc.StreamClientInterceptor) grpc.StreamClientInterceptor {
n := len(interceptors)
// Dummy interceptor maintained for backward compatibility to avoid returning nil.
if n == 0 {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return streamer(ctx, desc, cc, method, opts...)
}
}
if n == 1 {
return interceptors[0]
}
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
currStreamer := streamer
for i := n - 1; i > 0; i-- {
innerStreamer, i := currStreamer, i
currStreamer = func(currentCtx context.Context, currentDesc *grpc.StreamDesc, currentConn *grpc.ClientConn, currentMethod string, currentOpts ...grpc.CallOption) (grpc.ClientStream, error) {
return interceptors[i](currentCtx, currentDesc, currentConn, currentMethod, innerStreamer, currentOpts...)
}
}
return interceptors[0](ctx, desc, cc, method, currStreamer, opts...)
}
}
// Chain creates a single interceptor out of a chain of many interceptors.
//
// WithUnaryServerChain is a grpc.Server config option that accepts multiple unary interceptors.
// Basically syntactic sugar.
//
// Deprecated: use google.golang.org/grpc.ChainUnaryInterceptor instead.
func WithUnaryServerChain(interceptors ...grpc.UnaryServerInterceptor) grpc.ServerOption {
return grpc.ChainUnaryInterceptor(interceptors...)
}
// WithStreamServerChain is a grpc.Server config option that accepts multiple stream interceptors.
// Basically syntactic sugar.
//
// Deprecated: use google.golang.org/grpc.ChainStreamInterceptor instead.
func WithStreamServerChain(interceptors ...grpc.StreamServerInterceptor) grpc.ServerOption {
return grpc.ChainStreamInterceptor(interceptors...)
}

View File

@@ -1,69 +0,0 @@
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
/*
`grpc_middleware` is a collection of gRPC middleware packages: interceptors, helpers and tools.
Middleware
gRPC is a fantastic RPC middleware, which sees a lot of adoption in the Golang world. However, the
upstream gRPC codebase is relatively bare bones.
This package, and most of its child packages provides commonly needed middleware for gRPC:
client-side interceptors for retires, server-side interceptors for input validation and auth,
functions for chaining said interceptors, metadata convenience methods and more.
Chaining
By default, gRPC doesn't allow one to have more than one interceptor either on the client nor on
the server side. `grpc_middleware` provides convenient chaining methods
Simple way of turning a multiple interceptors into a single interceptor. Here's an example for
server chaining:
myServer := grpc.NewServer(
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(loggingStream, monitoringStream, authStream)),
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(loggingUnary, monitoringUnary, authUnary)),
)
These interceptors will be executed from left to right: logging, monitoring and auth.
Here's an example for client side chaining:
clientConn, err = grpc.Dial(
address,
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(monitoringClientUnary, retryUnary)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(monitoringClientStream, retryStream)),
)
client = pb_testproto.NewTestServiceClient(clientConn)
resp, err := client.PingEmpty(s.ctx, &myservice.Request{Msg: "hello"})
These interceptors will be executed from left to right: monitoring and then retry logic.
The retry interceptor will call every interceptor that follows it whenever when a retry happens.
Writing Your Own
Implementing your own interceptor is pretty trivial: there are interfaces for that. But the interesting
bit exposing common data to handlers (and other middleware), similarly to HTTP Middleware design.
For example, you may want to pass the identity of the caller from the auth interceptor all the way
to the handling function.
For example, a client side interceptor example for auth looks like:
func FakeAuthUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
newCtx := context.WithValue(ctx, "user_id", "john@example.com")
return handler(newCtx, req)
}
Unfortunately, it's not as easy for streaming RPCs. These have the `context.Context` embedded within
the `grpc.ServerStream` object. To pass values through context, a wrapper (`WrappedServerStream`) is
needed. For example:
func FakeAuthStreamingInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
newStream := grpc_middleware.WrapServerStream(stream)
newStream.WrappedContext = context.WithValue(ctx, "user_id", "john@example.com")
return handler(srv, newStream)
}
*/
package grpc_middleware

View File

@@ -1,17 +0,0 @@
SHELL=/bin/bash
GOFILES_NOVENDOR = $(shell go list ./... | grep -v /vendor/)
all: vet fmt test
fmt:
go fmt $(GOFILES_NOVENDOR)
vet:
# do not check lostcancel, they are intentional.
go vet -lostcancel=false $(GOFILES_NOVENDOR)
test: vet
./scripts/test_all.sh
.PHONY: all test

View File

@@ -0,0 +1,117 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.
package prometheus
import (
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
)
// ClientMetrics represents a collection of metrics to be registered on a
// Prometheus metrics registry for a gRPC client.
type ClientMetrics struct {
clientStartedCounter *prometheus.CounterVec
clientHandledCounter *prometheus.CounterVec
clientStreamMsgReceived *prometheus.CounterVec
clientStreamMsgSent *prometheus.CounterVec
// clientHandledHistogram can be nil
clientHandledHistogram *prometheus.HistogramVec
// clientStreamRecvHistogram can be nil
clientStreamRecvHistogram *prometheus.HistogramVec
// clientStreamSendHistogram can be nil
clientStreamSendHistogram *prometheus.HistogramVec
}
// NewClientMetrics returns a new ClientMetrics object.
// NOTE: Remember to register ClientMetrics object using prometheus registry
// e.g. prometheus.MustRegister(myClientMetrics).
func NewClientMetrics(opts ...ClientMetricsOption) *ClientMetrics {
var config clientMetricsConfig
config.apply(opts)
return &ClientMetrics{
clientStartedCounter: prometheus.NewCounterVec(
config.counterOpts.apply(prometheus.CounterOpts{
Name: "grpc_client_started_total",
Help: "Total number of RPCs started on the client.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
clientHandledCounter: prometheus.NewCounterVec(
config.counterOpts.apply(prometheus.CounterOpts{
Name: "grpc_client_handled_total",
Help: "Total number of RPCs completed by the client, regardless of success or failure.",
}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
clientStreamMsgReceived: prometheus.NewCounterVec(
config.counterOpts.apply(prometheus.CounterOpts{
Name: "grpc_client_msg_received_total",
Help: "Total number of RPC stream messages received by the client.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
clientStreamMsgSent: prometheus.NewCounterVec(
config.counterOpts.apply(prometheus.CounterOpts{
Name: "grpc_client_msg_sent_total",
Help: "Total number of gRPC stream messages sent by the client.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
clientHandledHistogram: config.clientHandledHistogram,
clientStreamRecvHistogram: config.clientStreamRecvHistogram,
clientStreamSendHistogram: config.clientStreamSendHistogram,
}
}
// Describe sends the super-set of all possible descriptors of metrics
// collected by this Collector to the provided channel and returns once
// the last descriptor has been sent.
func (m *ClientMetrics) Describe(ch chan<- *prometheus.Desc) {
m.clientStartedCounter.Describe(ch)
m.clientHandledCounter.Describe(ch)
m.clientStreamMsgReceived.Describe(ch)
m.clientStreamMsgSent.Describe(ch)
if m.clientHandledHistogram != nil {
m.clientHandledHistogram.Describe(ch)
}
if m.clientStreamRecvHistogram != nil {
m.clientStreamRecvHistogram.Describe(ch)
}
if m.clientStreamSendHistogram != nil {
m.clientStreamSendHistogram.Describe(ch)
}
}
// Collect is called by the Prometheus registry when collecting
// metrics. The implementation sends each collected metric via the
// provided channel and returns once the last metric has been sent.
func (m *ClientMetrics) Collect(ch chan<- prometheus.Metric) {
m.clientStartedCounter.Collect(ch)
m.clientHandledCounter.Collect(ch)
m.clientStreamMsgReceived.Collect(ch)
m.clientStreamMsgSent.Collect(ch)
if m.clientHandledHistogram != nil {
m.clientHandledHistogram.Collect(ch)
}
if m.clientStreamRecvHistogram != nil {
m.clientStreamRecvHistogram.Collect(ch)
}
if m.clientStreamSendHistogram != nil {
m.clientStreamSendHistogram.Collect(ch)
}
}
// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
func (m *ClientMetrics) UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
return interceptors.UnaryClientInterceptor(&reportable{
opts: opts,
clientMetrics: m,
})
}
// StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
func (m *ClientMetrics) StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
return interceptors.StreamClientInterceptor(&reportable{
opts: opts,
clientMetrics: m,
})
}

View File

@@ -0,0 +1,77 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.
package prometheus
import (
"github.com/prometheus/client_golang/prometheus"
)
type clientMetricsConfig struct {
counterOpts counterOptions
// clientHandledHistogram can be nil.
clientHandledHistogram *prometheus.HistogramVec
// clientStreamRecvHistogram can be nil.
clientStreamRecvHistogram *prometheus.HistogramVec
// clientStreamSendHistogram can be nil.
clientStreamSendHistogram *prometheus.HistogramVec
}
type ClientMetricsOption func(*clientMetricsConfig)
func (c *clientMetricsConfig) apply(opts []ClientMetricsOption) {
for _, o := range opts {
o(c)
}
}
func WithClientCounterOptions(opts ...CounterOption) ClientMetricsOption {
return func(o *clientMetricsConfig) {
o.counterOpts = opts
}
}
// WithClientHandlingTimeHistogram turns on recording of handling time of RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query.
func WithClientHandlingTimeHistogram(opts ...HistogramOption) ClientMetricsOption {
return func(o *clientMetricsConfig) {
o.clientHandledHistogram = prometheus.NewHistogramVec(
histogramOptions(opts).apply(prometheus.HistogramOpts{
Name: "grpc_client_handling_seconds",
Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
Buckets: prometheus.DefBuckets,
}),
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
}
}
// WithClientStreamRecvHistogram turns on recording of single message receive time of streaming RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query.
func WithClientStreamRecvHistogram(opts ...HistogramOption) ClientMetricsOption {
return func(o *clientMetricsConfig) {
o.clientStreamRecvHistogram = prometheus.NewHistogramVec(
histogramOptions(opts).apply(prometheus.HistogramOpts{
Name: "grpc_client_msg_recv_handling_seconds",
Help: "Histogram of response latency (seconds) of the gRPC single message receive.",
Buckets: prometheus.DefBuckets,
}),
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
}
}
// WithClientStreamSendHistogram turns on recording of single message send time of streaming RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query.
func WithClientStreamSendHistogram(opts ...HistogramOption) ClientMetricsOption {
return func(o *clientMetricsConfig) {
o.clientStreamSendHistogram = prometheus.NewHistogramVec(
histogramOptions(opts).apply(prometheus.HistogramOpts{
Name: "grpc_client_msg_send_handling_seconds",
Help: "Histogram of response latency (seconds) of the gRPC single message send.",
Buckets: prometheus.DefBuckets,
}),
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
}
}

View File

@@ -0,0 +1,23 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.
package prometheus
type grpcType string
// grpcType describes all types of grpc connection.
const (
Unary grpcType = "unary"
ClientStream grpcType = "client_stream"
ServerStream grpcType = "server_stream"
BidiStream grpcType = "bidi_stream"
)
// Kind describes whether interceptor is a client or server type.
type Kind string
// Enum for Client and Server Kind.
const (
KindClient Kind = "client"
KindServer Kind = "server"
)

View File

@@ -0,0 +1,8 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.
/*
Package prometheus provides a standalone interceptor for metrics. It's next iteration of deprecated https://github.com/grpc-ecosystem/go-grpc-prometheus.
See https://github.com/grpc-ecosystem/go-grpc-middleware/tree/main/examples for example.
*/
package prometheus

View File

@@ -0,0 +1,129 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.
package prometheus
import (
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
)
// FromError returns a grpc status. If the error code is neither a valid grpc status nor a context error, codes.Unknown
// will be set.
func FromError(err error) *status.Status {
s, ok := status.FromError(err)
// Mirror what the grpc server itself does, i.e. also convert context errors to status
if !ok {
s = status.FromContextError(err)
}
return s
}
// A CounterOption lets you add options to Counter metrics using With* funcs.
type CounterOption func(*prometheus.CounterOpts)
type counterOptions []CounterOption
func (co counterOptions) apply(o prometheus.CounterOpts) prometheus.CounterOpts {
for _, f := range co {
f(&o)
}
return o
}
// WithConstLabels allows you to add ConstLabels to Counter metrics.
func WithConstLabels(labels prometheus.Labels) CounterOption {
return func(o *prometheus.CounterOpts) {
o.ConstLabels = labels
}
}
// WithSubsystem allows you to add a Subsystem to Counter metrics.
func WithSubsystem(subsystem string) CounterOption {
return func(o *prometheus.CounterOpts) {
o.Subsystem = subsystem
}
}
// A HistogramOption lets you add options to Histogram metrics using With*
// funcs.
type HistogramOption func(*prometheus.HistogramOpts)
type histogramOptions []HistogramOption
func (ho histogramOptions) apply(o prometheus.HistogramOpts) prometheus.HistogramOpts {
for _, f := range ho {
f(&o)
}
return o
}
// WithHistogramBuckets allows you to specify custom bucket ranges for histograms if EnableHandlingTimeHistogram is on.
func WithHistogramBuckets(buckets []float64) HistogramOption {
return func(o *prometheus.HistogramOpts) { o.Buckets = buckets }
}
// WithHistogramOpts allows you to specify HistogramOpts but makes sure the correct name and label is used.
// This function is helpful when specifying more than just the buckets, like using NativeHistograms.
func WithHistogramOpts(opts *prometheus.HistogramOpts) HistogramOption {
// TODO: This isn't ideal either if new fields are added to prometheus.HistogramOpts.
// Maybe we can change the interface to accept abitrary HistogramOpts and
// only make sure to overwrite the necessary fields (name, labels).
return func(o *prometheus.HistogramOpts) {
o.Buckets = opts.Buckets
o.NativeHistogramBucketFactor = opts.NativeHistogramBucketFactor
o.NativeHistogramZeroThreshold = opts.NativeHistogramZeroThreshold
o.NativeHistogramMaxBucketNumber = opts.NativeHistogramMaxBucketNumber
o.NativeHistogramMinResetDuration = opts.NativeHistogramMinResetDuration
o.NativeHistogramMaxZeroThreshold = opts.NativeHistogramMaxZeroThreshold
}
}
// WithHistogramConstLabels allows you to add custom ConstLabels to
// histograms metrics.
func WithHistogramConstLabels(labels prometheus.Labels) HistogramOption {
return func(o *prometheus.HistogramOpts) {
o.ConstLabels = labels
}
}
// WithHistogramSubsystem allows you to add a Subsystem to histograms metrics.
func WithHistogramSubsystem(subsystem string) HistogramOption {
return func(o *prometheus.HistogramOpts) {
o.Subsystem = subsystem
}
}
func typeFromMethodInfo(mInfo *grpc.MethodInfo) grpcType {
if !mInfo.IsClientStream && !mInfo.IsServerStream {
return Unary
}
if mInfo.IsClientStream && !mInfo.IsServerStream {
return ClientStream
}
if !mInfo.IsClientStream && mInfo.IsServerStream {
return ServerStream
}
return BidiStream
}
// An Option lets you add options to prometheus interceptors using With* funcs.
type Option func(*config)
type config struct {
exemplarFn exemplarFromCtxFn
}
func (c *config) apply(opts []Option) {
for _, o := range opts {
o(c)
}
}
// WithExemplarFromContext sets function that will be used to deduce exemplar for all counter and histogram metrics.
func WithExemplarFromContext(exemplarFn exemplarFromCtxFn) Option {
return func(o *config) {
o.exemplarFn = exemplarFn
}
}

View File

@@ -0,0 +1,113 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.
package prometheus
import (
"context"
"time"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors"
"github.com/prometheus/client_golang/prometheus"
)
type reporter struct {
clientMetrics *ClientMetrics
serverMetrics *ServerMetrics
typ interceptors.GRPCType
service, method string
kind Kind
exemplar prometheus.Labels
}
func (r *reporter) PostCall(err error, rpcDuration time.Duration) {
// get status code from error
status := FromError(err)
code := status.Code()
// perform handling of metrics from code
switch r.kind {
case KindServer:
r.incrementWithExemplar(r.serverMetrics.serverHandledCounter, string(r.typ), r.service, r.method, code.String())
if r.serverMetrics.serverHandledHistogram != nil {
r.observeWithExemplar(r.serverMetrics.serverHandledHistogram, rpcDuration.Seconds(), string(r.typ), r.service, r.method)
}
case KindClient:
r.incrementWithExemplar(r.clientMetrics.clientHandledCounter, string(r.typ), r.service, r.method, code.String())
if r.clientMetrics.clientHandledHistogram != nil {
r.observeWithExemplar(r.clientMetrics.clientHandledHistogram, rpcDuration.Seconds(), string(r.typ), r.service, r.method)
}
}
}
func (r *reporter) PostMsgSend(_ any, _ error, sendDuration time.Duration) {
switch r.kind {
case KindServer:
r.incrementWithExemplar(r.serverMetrics.serverStreamMsgSent, string(r.typ), r.service, r.method)
case KindClient:
r.incrementWithExemplar(r.clientMetrics.clientStreamMsgSent, string(r.typ), r.service, r.method)
if r.clientMetrics.clientStreamSendHistogram != nil {
r.observeWithExemplar(r.clientMetrics.clientStreamSendHistogram, sendDuration.Seconds(), string(r.typ), r.service, r.method)
}
}
}
func (r *reporter) PostMsgReceive(_ any, _ error, recvDuration time.Duration) {
switch r.kind {
case KindServer:
r.incrementWithExemplar(r.serverMetrics.serverStreamMsgReceived, string(r.typ), r.service, r.method)
case KindClient:
r.incrementWithExemplar(r.clientMetrics.clientStreamMsgReceived, string(r.typ), r.service, r.method)
if r.clientMetrics.clientStreamRecvHistogram != nil {
r.observeWithExemplar(r.clientMetrics.clientStreamRecvHistogram, recvDuration.Seconds(), string(r.typ), r.service, r.method)
}
}
}
type reportable struct {
clientMetrics *ClientMetrics
serverMetrics *ServerMetrics
opts []Option
}
func (rep *reportable) ServerReporter(ctx context.Context, meta interceptors.CallMeta) (interceptors.Reporter, context.Context) {
return rep.reporter(ctx, rep.serverMetrics, nil, meta, KindServer)
}
func (rep *reportable) ClientReporter(ctx context.Context, meta interceptors.CallMeta) (interceptors.Reporter, context.Context) {
return rep.reporter(ctx, nil, rep.clientMetrics, meta, KindClient)
}
func (rep *reportable) reporter(ctx context.Context, sm *ServerMetrics, cm *ClientMetrics, meta interceptors.CallMeta, kind Kind) (interceptors.Reporter, context.Context) {
var c config
c.apply(rep.opts)
r := &reporter{
clientMetrics: cm,
serverMetrics: sm,
typ: meta.Typ,
service: meta.Service,
method: meta.Method,
kind: kind,
}
if c.exemplarFn != nil {
r.exemplar = c.exemplarFn(ctx)
}
switch kind {
case KindClient:
r.incrementWithExemplar(r.clientMetrics.clientStartedCounter, string(r.typ), r.service, r.method)
case KindServer:
r.incrementWithExemplar(r.serverMetrics.serverStartedCounter, string(r.typ), r.service, r.method)
}
return r, ctx
}
func (r *reporter) incrementWithExemplar(c *prometheus.CounterVec, lvals ...string) {
c.WithLabelValues(lvals...).(prometheus.ExemplarAdder).AddWithExemplar(1, r.exemplar)
}
func (r *reporter) observeWithExemplar(h *prometheus.HistogramVec, value float64, lvals ...string) {
h.WithLabelValues(lvals...).(prometheus.ExemplarObserver).ObserveWithExemplar(value, r.exemplar)
}

View File

@@ -0,0 +1,123 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.
package prometheus
import (
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
)
// ServerMetrics represents a collection of metrics to be registered on a
// Prometheus metrics registry for a gRPC server.
type ServerMetrics struct {
serverStartedCounter *prometheus.CounterVec
serverHandledCounter *prometheus.CounterVec
serverStreamMsgReceived *prometheus.CounterVec
serverStreamMsgSent *prometheus.CounterVec
// serverHandledHistogram can be nil.
serverHandledHistogram *prometheus.HistogramVec
}
// NewServerMetrics returns a new ServerMetrics object that has server interceptor methods.
// NOTE: Remember to register ServerMetrics object by using prometheus registry
// e.g. prometheus.MustRegister(myServerMetrics).
func NewServerMetrics(opts ...ServerMetricsOption) *ServerMetrics {
var config serverMetricsConfig
config.apply(opts)
return &ServerMetrics{
serverStartedCounter: prometheus.NewCounterVec(
config.counterOpts.apply(prometheus.CounterOpts{
Name: "grpc_server_started_total",
Help: "Total number of RPCs started on the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
serverHandledCounter: prometheus.NewCounterVec(
config.counterOpts.apply(prometheus.CounterOpts{
Name: "grpc_server_handled_total",
Help: "Total number of RPCs completed on the server, regardless of success or failure.",
}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
serverStreamMsgReceived: prometheus.NewCounterVec(
config.counterOpts.apply(prometheus.CounterOpts{
Name: "grpc_server_msg_received_total",
Help: "Total number of RPC stream messages received on the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
serverStreamMsgSent: prometheus.NewCounterVec(
config.counterOpts.apply(prometheus.CounterOpts{
Name: "grpc_server_msg_sent_total",
Help: "Total number of gRPC stream messages sent by the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
serverHandledHistogram: config.serverHandledHistogram,
}
}
// Describe sends the super-set of all possible descriptors of metrics
// collected by this Collector to the provided channel and returns once
// the last descriptor has been sent.
func (m *ServerMetrics) Describe(ch chan<- *prometheus.Desc) {
m.serverStartedCounter.Describe(ch)
m.serverHandledCounter.Describe(ch)
m.serverStreamMsgReceived.Describe(ch)
m.serverStreamMsgSent.Describe(ch)
if m.serverHandledHistogram != nil {
m.serverHandledHistogram.Describe(ch)
}
}
// Collect is called by the Prometheus registry when collecting
// metrics. The implementation sends each collected metric via the
// provided channel and returns once the last metric has been sent.
func (m *ServerMetrics) Collect(ch chan<- prometheus.Metric) {
m.serverStartedCounter.Collect(ch)
m.serverHandledCounter.Collect(ch)
m.serverStreamMsgReceived.Collect(ch)
m.serverStreamMsgSent.Collect(ch)
if m.serverHandledHistogram != nil {
m.serverHandledHistogram.Collect(ch)
}
}
// InitializeMetrics initializes all metrics, with their appropriate null
// value, for all gRPC methods registered on a gRPC server. This is useful, to
// ensure that all metrics exist when collecting and querying.
// NOTE: This might add significant cardinality and might not be needed in future version of Prometheus (created timestamp).
func (m *ServerMetrics) InitializeMetrics(server *grpc.Server) {
serviceInfo := server.GetServiceInfo()
for serviceName, info := range serviceInfo {
for _, mInfo := range info.Methods {
m.preRegisterMethod(serviceName, &mInfo)
}
}
}
// preRegisterMethod is invoked on Register of a Server, allowing all gRPC services labels to be pre-populated.
func (m *ServerMetrics) preRegisterMethod(serviceName string, mInfo *grpc.MethodInfo) {
methodName := mInfo.Name
methodType := string(typeFromMethodInfo(mInfo))
// These are just references (no increments), as just referencing will create the labels but not set values.
_, _ = m.serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName)
_, _ = m.serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName)
_, _ = m.serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName)
if m.serverHandledHistogram != nil {
_, _ = m.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName)
}
for _, code := range interceptors.AllCodes {
_, _ = m.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String())
}
}
// UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
func (m *ServerMetrics) UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
return interceptors.UnaryServerInterceptor(&reportable{
opts: opts,
serverMetrics: m,
})
}
// StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
func (m *ServerMetrics) StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
return interceptors.StreamServerInterceptor(&reportable{
opts: opts,
serverMetrics: m,
})
}

View File

@@ -0,0 +1,48 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.
package prometheus
import (
"context"
"github.com/prometheus/client_golang/prometheus"
)
type exemplarFromCtxFn func(ctx context.Context) prometheus.Labels
type serverMetricsConfig struct {
counterOpts counterOptions
// serverHandledHistogram can be nil.
serverHandledHistogram *prometheus.HistogramVec
}
type ServerMetricsOption func(*serverMetricsConfig)
func (c *serverMetricsConfig) apply(opts []ServerMetricsOption) {
for _, o := range opts {
o(c)
}
}
// WithServerCounterOptions sets counter options.
func WithServerCounterOptions(opts ...CounterOption) ServerMetricsOption {
return func(o *serverMetricsConfig) {
o.counterOpts = opts
}
}
// WithServerHandlingTimeHistogram turns on recording of handling time of RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query.
func WithServerHandlingTimeHistogram(opts ...HistogramOption) ServerMetricsOption {
return func(o *serverMetricsConfig) {
o.serverHandledHistogram = prometheus.NewHistogramVec(
histogramOptions(opts).apply(prometheus.HistogramOpts{
Name: "grpc_server_handling_seconds",
Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
Buckets: prometheus.DefBuckets,
}),
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
}
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.0 KiB

View File

@@ -0,0 +1,2 @@
Copyright (c) The go-grpc-middleware Authors.
Licensed under the Apache License 2.0.

View File

@@ -0,0 +1,83 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.
// Go gRPC Middleware monitoring interceptors for client-side gRPC.
package interceptors
import (
"context"
"io"
"time"
"google.golang.org/grpc"
)
// UnaryClientInterceptor is a gRPC client-side interceptor that provides reporting for Unary RPCs.
func UnaryClientInterceptor(reportable ClientReportable) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
r := newReport(Unary, method)
reporter, newCtx := reportable.ClientReporter(ctx, CallMeta{ReqProtoOrNil: req, Typ: r.rpcType, Service: r.service, Method: r.method})
reporter.PostMsgSend(req, nil, time.Since(r.startTime))
err := invoker(newCtx, method, req, reply, cc, opts...)
reporter.PostMsgReceive(reply, err, time.Since(r.startTime))
reporter.PostCall(err, time.Since(r.startTime))
return err
}
}
// StreamClientInterceptor is a gRPC client-side interceptor that provides reporting for Stream RPCs.
func StreamClientInterceptor(reportable ClientReportable) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
r := newReport(clientStreamType(desc), method)
reporter, newCtx := reportable.ClientReporter(ctx, CallMeta{ReqProtoOrNil: nil, Typ: r.rpcType, Service: r.service, Method: r.method})
clientStream, err := streamer(newCtx, desc, cc, method, opts...)
if err != nil {
reporter.PostCall(err, time.Since(r.startTime))
return nil, err
}
return &monitoredClientStream{ClientStream: clientStream, startTime: r.startTime, reporter: reporter}, nil
}
}
func clientStreamType(desc *grpc.StreamDesc) GRPCType {
if desc.ClientStreams && !desc.ServerStreams {
return ClientStream
} else if !desc.ClientStreams && desc.ServerStreams {
return ServerStream
}
return BidiStream
}
// monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to report.
type monitoredClientStream struct {
grpc.ClientStream
startTime time.Time
reporter Reporter
}
func (s *monitoredClientStream) SendMsg(m interface{}) error {
start := time.Now()
err := s.ClientStream.SendMsg(m)
s.reporter.PostMsgSend(m, err, time.Since(start))
return err
}
func (s *monitoredClientStream) RecvMsg(m interface{}) error {
start := time.Now()
err := s.ClientStream.RecvMsg(m)
s.reporter.PostMsgReceive(m, err, time.Since(start))
if err == nil {
return nil
}
var postErr error
if err != io.EOF {
postErr = err
}
s.reporter.PostCall(postErr, time.Since(s.startTime))
return err
}

View File

@@ -0,0 +1,12 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.
//
/*
interceptor is an internal package used by higher level middlewares. It allows injecting custom code in various
places of the gRPC lifecycle.
This particular package is intended for use by other middleware, metric, logging or otherwise.
This allows code to be shared between different implementations.
*/
package interceptors

View File

@@ -0,0 +1,116 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.
package interceptors
import (
"context"
"fmt"
"strings"
"time"
"google.golang.org/grpc/codes"
)
type GRPCType string
// Timer is a helper interface to time functions.
// Useful for interceptors to record the total
// time elapsed since completion of a call.
type Timer interface {
ObserveDuration() time.Duration
}
// zeroTimer.
type zeroTimer struct {
}
func (zeroTimer) ObserveDuration() time.Duration {
return 0
}
var EmptyTimer = &zeroTimer{}
const (
Unary GRPCType = "unary"
ClientStream GRPCType = "client_stream"
ServerStream GRPCType = "server_stream"
BidiStream GRPCType = "bidi_stream"
)
var (
AllCodes = []codes.Code{
codes.OK, codes.Canceled, codes.Unknown, codes.InvalidArgument, codes.DeadlineExceeded, codes.NotFound,
codes.AlreadyExists, codes.PermissionDenied, codes.Unauthenticated, codes.ResourceExhausted,
codes.FailedPrecondition, codes.Aborted, codes.OutOfRange, codes.Unimplemented, codes.Internal,
codes.Unavailable, codes.DataLoss,
}
)
func SplitMethodName(fullMethod string) (string, string) {
fullMethod = strings.TrimPrefix(fullMethod, "/") // remove leading slash
if i := strings.Index(fullMethod, "/"); i >= 0 {
return fullMethod[:i], fullMethod[i+1:]
}
return "unknown", "unknown"
}
type CallMeta struct {
ReqProtoOrNil interface{}
Typ GRPCType
Service string
Method string
}
func (c CallMeta) FullMethod() string {
return fmt.Sprintf("/%s/%s", c.Service, c.Method)
}
type ClientReportable interface {
ClientReporter(context.Context, CallMeta) (Reporter, context.Context)
}
type ServerReportable interface {
ServerReporter(context.Context, CallMeta) (Reporter, context.Context)
}
// CommonReportableFunc helper allows an easy way to implement reporter with common client and server logic.
type CommonReportableFunc func(ctx context.Context, c CallMeta, isClient bool) (Reporter, context.Context)
func (f CommonReportableFunc) ClientReporter(ctx context.Context, c CallMeta) (Reporter, context.Context) {
return f(ctx, c, true)
}
func (f CommonReportableFunc) ServerReporter(ctx context.Context, c CallMeta) (Reporter, context.Context) {
return f(ctx, c, false)
}
type Reporter interface {
PostCall(err error, rpcDuration time.Duration)
PostMsgSend(reqProto interface{}, err error, sendDuration time.Duration)
PostMsgReceive(replyProto interface{}, err error, recvDuration time.Duration)
}
var _ Reporter = NoopReporter{}
type NoopReporter struct{}
func (NoopReporter) PostCall(error, time.Duration) {}
func (NoopReporter) PostMsgSend(interface{}, error, time.Duration) {}
func (NoopReporter) PostMsgReceive(interface{}, error, time.Duration) {}
type report struct {
rpcType GRPCType
service string
method string
startTime time.Time
}
func newReport(typ GRPCType, fullMethod string) report {
r := report{
startTime: time.Now(),
rpcType: typ,
}
r.service, r.method = SplitMethodName(fullMethod)
return r
}

View File

@@ -0,0 +1,74 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.
// Go gRPC Middleware monitoring interceptors for server-side gRPC.
package interceptors
import (
"context"
"time"
"google.golang.org/grpc"
)
// UnaryServerInterceptor is a gRPC server-side interceptor that provides reporting for Unary RPCs.
func UnaryServerInterceptor(reportable ServerReportable) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
r := newReport(Unary, info.FullMethod)
reporter, newCtx := reportable.ServerReporter(ctx, CallMeta{ReqProtoOrNil: req, Typ: r.rpcType, Service: r.service, Method: r.method})
reporter.PostMsgReceive(req, nil, time.Since(r.startTime))
resp, err := handler(newCtx, req)
reporter.PostMsgSend(resp, err, time.Since(r.startTime))
reporter.PostCall(err, time.Since(r.startTime))
return resp, err
}
}
// StreamServerInterceptor is a gRPC server-side interceptor that provides reporting for Streaming RPCs.
func StreamServerInterceptor(reportable ServerReportable) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
r := newReport(ServerStream, info.FullMethod)
reporter, newCtx := reportable.ServerReporter(ss.Context(), CallMeta{ReqProtoOrNil: nil, Typ: StreamRPCType(info), Service: r.service, Method: r.method})
err := handler(srv, &monitoredServerStream{ServerStream: ss, newCtx: newCtx, reporter: reporter})
reporter.PostCall(err, time.Since(r.startTime))
return err
}
}
func StreamRPCType(info *grpc.StreamServerInfo) GRPCType {
if info.IsClientStream && !info.IsServerStream {
return ClientStream
} else if !info.IsClientStream && info.IsServerStream {
return ServerStream
}
return BidiStream
}
// monitoredStream wraps grpc.ServerStream allowing each Sent/Recv of message to report.
type monitoredServerStream struct {
grpc.ServerStream
newCtx context.Context
reporter Reporter
}
func (s *monitoredServerStream) Context() context.Context {
return s.newCtx
}
func (s *monitoredServerStream) SendMsg(m interface{}) error {
start := time.Now()
err := s.ServerStream.SendMsg(m)
s.reporter.PostMsgSend(m, err, time.Since(start))
return err
}
func (s *monitoredServerStream) RecvMsg(m interface{}) error {
start := time.Now()
err := s.ServerStream.RecvMsg(m)
s.reporter.PostMsgReceive(m, err, time.Since(start))
return err
}

View File

@@ -1,30 +0,0 @@
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
package grpc_middleware
import (
"context"
"google.golang.org/grpc"
)
// WrappedServerStream is a thin wrapper around grpc.ServerStream that allows modifying context.
type WrappedServerStream struct {
grpc.ServerStream
// WrappedContext is the wrapper's own Context. You can assign it.
WrappedContext context.Context
}
// Context returns the wrapper's WrappedContext, overwriting the nested grpc.ServerStream.Context()
func (w *WrappedServerStream) Context() context.Context {
return w.WrappedContext
}
// WrapServerStream returns a ServerStream that has the ability to overwrite context.
func WrapServerStream(stream grpc.ServerStream) *WrappedServerStream {
if existing, ok := stream.(*WrappedServerStream); ok {
return existing
}
return &WrappedServerStream{ServerStream: stream, WrappedContext: stream.Context()}
}

View File

@@ -1,201 +0,0 @@
#vendor
vendor/
# Created by .ignore support plugin (hsz.mobi)
coverage.txt
### Go template
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
*.test
*.prof
### Windows template
# Windows image file caches
Thumbs.db
ehthumbs.db
# Folder config file
Desktop.ini
# Recycle Bin used on file shares
$RECYCLE.BIN/
# Windows Installer files
*.cab
*.msi
*.msm
*.msp
# Windows shortcuts
*.lnk
### Kate template
# Swap Files #
.*.kate-swp
.swp.*
### SublimeText template
# cache files for sublime text
*.tmlanguage.cache
*.tmPreferences.cache
*.stTheme.cache
# workspace files are user-specific
*.sublime-workspace
# project files should be checked into the repository, unless a significant
# proportion of contributors will probably not be using SublimeText
# *.sublime-project
# sftp configuration file
sftp-config.json
### Linux template
*~
# temporary files which can be created if a process still has a handle open of a deleted file
.fuse_hidden*
# KDE directory preferences
.directory
# Linux trash folder which might appear on any partition or disk
.Trash-*
### JetBrains template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff:
.idea
.idea/tasks.xml
.idea/dictionaries
.idea/vcs.xml
.idea/jsLibraryMappings.xml
# Sensitive or high-churn files:
.idea/dataSources.ids
.idea/dataSources.xml
.idea/dataSources.local.xml
.idea/sqlDataSources.xml
.idea/dynamic.xml
.idea/uiDesigner.xml
# Gradle:
.idea/gradle.xml
.idea/libraries
# Mongo Explorer plugin:
.idea/mongoSettings.xml
## File-based project format:
*.iws
## Plugin-specific files:
# IntelliJ
/out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
### Xcode template
# Xcode
#
# gitignore contributors: remember to update Global/Xcode.gitignore, Objective-C.gitignore & Swift.gitignore
## Build generated
build/
DerivedData/
## Various settings
*.pbxuser
!default.pbxuser
*.mode1v3
!default.mode1v3
*.mode2v3
!default.mode2v3
*.perspectivev3
!default.perspectivev3
xcuserdata/
## Other
*.moved-aside
*.xccheckout
*.xcscmblueprint
### Eclipse template
.metadata
bin/
tmp/
*.tmp
*.bak
*.swp
*~.nib
local.properties
.settings/
.loadpath
.recommenders
# Eclipse Core
.project
# External tool builders
.externalToolBuilders/
# Locally stored "Eclipse launch configurations"
*.launch
# PyDev specific (Python IDE for Eclipse)
*.pydevproject
# CDT-specific (C/C++ Development Tooling)
.cproject
# JDT-specific (Eclipse Java Development Tools)
.classpath
# Java annotation processor (APT)
.factorypath
# PDT-specific (PHP Development Tools)
.buildpath
# sbteclipse plugin
.target
# Tern plugin
.tern-project
# TeXlipse plugin
.texlipse
# STS (Spring Tool Suite)
.springBeans
# Code Recommenders
.recommenders/

View File

@@ -1,25 +0,0 @@
sudo: false
language: go
# * github.com/grpc/grpc-go still supports go1.6
# - When we drop support for go1.6 we can remove golang.org/x/net/context
# below as it is part of the Go std library since go1.7
# * github.com/prometheus/client_golang already requires at least go1.7 since
# September 2017
go:
- 1.6.x
- 1.7.x
- 1.8.x
- 1.9.x
- 1.10.x
- master
install:
- go get github.com/prometheus/client_golang/prometheus
- go get google.golang.org/grpc
- go get golang.org/x/net/context
- go get github.com/stretchr/testify
script:
- make test
after_success:
- bash <(curl -s https://codecov.io/bash)

View File

@@ -1,24 +0,0 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [1.2.0](https://github.com/grpc-ecosystem/go-grpc-prometheus/releases/tag/v1.2.0) - 2018-06-04
### Added
* Provide metrics object as `prometheus.Collector`, for conventional metric registration.
* Support non-default/global Prometheus registry.
* Allow configuring counters with `prometheus.CounterOpts`.
### Changed
* Remove usage of deprecated `grpc.Code()`.
* Remove usage of deprecated `grpc.Errorf` and replace with `status.Errorf`.
---
This changelog was started with version `v1.2.0`, for earlier versions refer to the respective [GitHub releases](https://github.com/grpc-ecosystem/go-grpc-prometheus/releases).

View File

@@ -1,247 +0,0 @@
# Go gRPC Interceptors for Prometheus monitoring
[![Travis Build](https://travis-ci.org/grpc-ecosystem/go-grpc-prometheus.svg)](https://travis-ci.org/grpc-ecosystem/go-grpc-prometheus)
[![Go Report Card](https://goreportcard.com/badge/github.com/grpc-ecosystem/go-grpc-prometheus)](http://goreportcard.com/report/grpc-ecosystem/go-grpc-prometheus)
[![GoDoc](http://img.shields.io/badge/GoDoc-Reference-blue.svg)](https://godoc.org/github.com/grpc-ecosystem/go-grpc-prometheus)
[![SourceGraph](https://sourcegraph.com/github.com/grpc-ecosystem/go-grpc-prometheus/-/badge.svg)](https://sourcegraph.com/github.com/grpc-ecosystem/go-grpc-prometheus/?badge)
[![codecov](https://codecov.io/gh/grpc-ecosystem/go-grpc-prometheus/branch/master/graph/badge.svg)](https://codecov.io/gh/grpc-ecosystem/go-grpc-prometheus)
[![Apache 2.0 License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE)
[Prometheus](https://prometheus.io/) monitoring for your [gRPC Go](https://github.com/grpc/grpc-go) servers and clients.
A sister implementation for [gRPC Java](https://github.com/grpc/grpc-java) (same metrics, same semantics) is in [grpc-ecosystem/java-grpc-prometheus](https://github.com/grpc-ecosystem/java-grpc-prometheus).
## Interceptors
[gRPC Go](https://github.com/grpc/grpc-go) recently acquired support for Interceptors, i.e. middleware that is executed
by a gRPC Server before the request is passed onto the user's application logic. It is a perfect way to implement
common patterns: auth, logging and... monitoring.
To use Interceptors in chains, please see [`go-grpc-middleware`](https://github.com/mwitkow/go-grpc-middleware).
## Usage
There are two types of interceptors: client-side and server-side. This package provides monitoring Interceptors for both.
### Server-side
```go
import "github.com/grpc-ecosystem/go-grpc-prometheus"
...
// Initialize your gRPC server's interceptor.
myServer := grpc.NewServer(
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
)
// Register your gRPC service implementations.
myservice.RegisterMyServiceServer(s.server, &myServiceImpl{})
// After all your registrations, make sure all of the Prometheus metrics are initialized.
grpc_prometheus.Register(myServer)
// Register Prometheus metrics handler.
http.Handle("/metrics", promhttp.Handler())
...
```
### Client-side
```go
import "github.com/grpc-ecosystem/go-grpc-prometheus"
...
clientConn, err = grpc.Dial(
address,
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor)
)
client = pb_testproto.NewTestServiceClient(clientConn)
resp, err := client.PingEmpty(s.ctx, &myservice.Request{Msg: "hello"})
...
```
# Metrics
## Labels
All server-side metrics start with `grpc_server` as Prometheus subsystem name. All client-side metrics start with `grpc_client`. Both of them have mirror-concepts. Similarly all methods
contain the same rich labels:
* `grpc_service` - the [gRPC service](http://www.grpc.io/docs/#defining-a-service) name, which is the combination of protobuf `package` and
the `grpc_service` section name. E.g. for `package = mwitkow.testproto` and
`service TestService` the label will be `grpc_service="mwitkow.testproto.TestService"`
* `grpc_method` - the name of the method called on the gRPC service. E.g.
`grpc_method="Ping"`
* `grpc_type` - the gRPC [type of request](http://www.grpc.io/docs/guides/concepts.html#rpc-life-cycle).
Differentiating between the two is important especially for latency measurements.
- `unary` is single request, single response RPC
- `client_stream` is a multi-request, single response RPC
- `server_stream` is a single request, multi-response RPC
- `bidi_stream` is a multi-request, multi-response RPC
Additionally for completed RPCs, the following labels are used:
* `grpc_code` - the human-readable [gRPC status code](https://github.com/grpc/grpc-go/blob/master/codes/codes.go).
The list of all statuses is to long, but here are some common ones:
- `OK` - means the RPC was successful
- `IllegalArgument` - RPC contained bad values
- `Internal` - server-side error not disclosed to the clients
## Counters
The counters and their up to date documentation is in [server_reporter.go](server_reporter.go) and [client_reporter.go](client_reporter.go)
the respective Prometheus handler (usually `/metrics`).
For the purpose of this documentation we will only discuss `grpc_server` metrics. The `grpc_client` ones contain mirror concepts.
For simplicity, let's assume we're tracking a single server-side RPC call of [`mwitkow.testproto.TestService`](examples/testproto/test.proto),
calling the method `PingList`. The call succeeds and returns 20 messages in the stream.
First, immediately after the server receives the call it will increment the
`grpc_server_started_total` and start the handling time clock (if histograms are enabled).
```jsoniq
grpc_server_started_total{grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream"} 1
```
Then the user logic gets invoked. It receives one message from the client containing the request
(it's a `server_stream`):
```jsoniq
grpc_server_msg_received_total{grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream"} 1
```
The user logic may return an error, or send multiple messages back to the client. In this case, on
each of the 20 messages sent back, a counter will be incremented:
```jsoniq
grpc_server_msg_sent_total{grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream"} 20
```
After the call completes, its status (`OK` or other [gRPC status code](https://github.com/grpc/grpc-go/blob/master/codes/codes.go))
and the relevant call labels increment the `grpc_server_handled_total` counter.
```jsoniq
grpc_server_handled_total{grpc_code="OK",grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream"} 1
```
## Histograms
[Prometheus histograms](https://prometheus.io/docs/concepts/metric_types/#histogram) are a great way
to measure latency distributions of your RPCs. However, since it is bad practice to have metrics
of [high cardinality](https://prometheus.io/docs/practices/instrumentation/#do-not-overuse-labels)
the latency monitoring metrics are disabled by default. To enable them please call the following
in your server initialization code:
```jsoniq
grpc_prometheus.EnableHandlingTimeHistogram()
```
After the call completes, its handling time will be recorded in a [Prometheus histogram](https://prometheus.io/docs/concepts/metric_types/#histogram)
variable `grpc_server_handling_seconds`. The histogram variable contains three sub-metrics:
* `grpc_server_handling_seconds_count` - the count of all completed RPCs by status and method
* `grpc_server_handling_seconds_sum` - cumulative time of RPCs by status and method, useful for
calculating average handling times
* `grpc_server_handling_seconds_bucket` - contains the counts of RPCs by status and method in respective
handling-time buckets. These buckets can be used by Prometheus to estimate SLAs (see [here](https://prometheus.io/docs/practices/histograms/))
The counter values will look as follows:
```jsoniq
grpc_server_handling_seconds_bucket{grpc_code="OK",grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream",le="0.005"} 1
grpc_server_handling_seconds_bucket{grpc_code="OK",grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream",le="0.01"} 1
grpc_server_handling_seconds_bucket{grpc_code="OK",grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream",le="0.025"} 1
grpc_server_handling_seconds_bucket{grpc_code="OK",grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream",le="0.05"} 1
grpc_server_handling_seconds_bucket{grpc_code="OK",grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream",le="0.1"} 1
grpc_server_handling_seconds_bucket{grpc_code="OK",grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream",le="0.25"} 1
grpc_server_handling_seconds_bucket{grpc_code="OK",grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream",le="0.5"} 1
grpc_server_handling_seconds_bucket{grpc_code="OK",grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream",le="1"} 1
grpc_server_handling_seconds_bucket{grpc_code="OK",grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream",le="2.5"} 1
grpc_server_handling_seconds_bucket{grpc_code="OK",grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream",le="5"} 1
grpc_server_handling_seconds_bucket{grpc_code="OK",grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream",le="10"} 1
grpc_server_handling_seconds_bucket{grpc_code="OK",grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream",le="+Inf"} 1
grpc_server_handling_seconds_sum{grpc_code="OK",grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream"} 0.0003866430000000001
grpc_server_handling_seconds_count{grpc_code="OK",grpc_method="PingList",grpc_service="mwitkow.testproto.TestService",grpc_type="server_stream"} 1
```
## Useful query examples
Prometheus philosophy is to provide raw metrics to the monitoring system, and
let the aggregations be handled there. The verbosity of above metrics make it possible to have that
flexibility. Here's a couple of useful monitoring queries:
### request inbound rate
```jsoniq
sum(rate(grpc_server_started_total{job="foo"}[1m])) by (grpc_service)
```
For `job="foo"` (common label to differentiate between Prometheus monitoring targets), calculate the
rate of requests per second (1 minute window) for each gRPC `grpc_service` that the job has. Please note
how the `grpc_method` is being omitted here: all methods of a given gRPC service will be summed together.
### unary request error rate
```jsoniq
sum(rate(grpc_server_handled_total{job="foo",grpc_type="unary",grpc_code!="OK"}[1m])) by (grpc_service)
```
For `job="foo"`, calculate the per-`grpc_service` rate of `unary` (1:1) RPCs that failed, i.e. the
ones that didn't finish with `OK` code.
### unary request error percentage
```jsoniq
sum(rate(grpc_server_handled_total{job="foo",grpc_type="unary",grpc_code!="OK"}[1m])) by (grpc_service)
/
sum(rate(grpc_server_started_total{job="foo",grpc_type="unary"}[1m])) by (grpc_service)
* 100.0
```
For `job="foo"`, calculate the percentage of failed requests by service. It's easy to notice that
this is a combination of the two above examples. This is an example of a query you would like to
[alert on](https://prometheus.io/docs/alerting/rules/) in your system for SLA violations, e.g.
"no more than 1% requests should fail".
### average response stream size
```jsoniq
sum(rate(grpc_server_msg_sent_total{job="foo",grpc_type="server_stream"}[10m])) by (grpc_service)
/
sum(rate(grpc_server_started_total{job="foo",grpc_type="server_stream"}[10m])) by (grpc_service)
```
For `job="foo"` what is the `grpc_service`-wide `10m` average of messages returned for all `
server_stream` RPCs. This allows you to track the stream sizes returned by your system, e.g. allows
you to track when clients started to send "wide" queries that ret
Note the divisor is the number of started RPCs, in order to account for in-flight requests.
### 99%-tile latency of unary requests
```jsoniq
histogram_quantile(0.99,
sum(rate(grpc_server_handling_seconds_bucket{job="foo",grpc_type="unary"}[5m])) by (grpc_service,le)
)
```
For `job="foo"`, returns an 99%-tile [quantile estimation](https://prometheus.io/docs/practices/histograms/#quantiles)
of the handling time of RPCs per service. Please note the `5m` rate, this means that the quantile
estimation will take samples in a rolling `5m` window. When combined with other quantiles
(e.g. 50%, 90%), this query gives you tremendous insight into the responsiveness of your system
(e.g. impact of caching).
### percentage of slow unary queries (>250ms)
```jsoniq
100.0 - (
sum(rate(grpc_server_handling_seconds_bucket{job="foo",grpc_type="unary",le="0.25"}[5m])) by (grpc_service)
/
sum(rate(grpc_server_handling_seconds_count{job="foo",grpc_type="unary"}[5m])) by (grpc_service)
) * 100.0
```
For `job="foo"` calculate the by-`grpc_service` fraction of slow requests that took longer than `0.25`
seconds. This query is relatively complex, since the Prometheus aggregations use `le` (less or equal)
buckets, meaning that counting "fast" requests fractions is easier. However, simple maths helps.
This is an example of a query you would like to alert on in your system for SLA violations,
e.g. "less than 1% of requests are slower than 250ms".
## Status
This code has been used since August 2015 as the basis for monitoring of *production* gRPC micro services at [Improbable](https://improbable.io).
## License
`go-grpc-prometheus` is released under the Apache 2.0 license. See the [LICENSE](LICENSE) file for details.

View File

@@ -1,39 +0,0 @@
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
// gRPC Prometheus monitoring interceptors for client-side gRPC.
package grpc_prometheus
import (
prom "github.com/prometheus/client_golang/prometheus"
)
var (
// DefaultClientMetrics is the default instance of ClientMetrics. It is
// intended to be used in conjunction the default Prometheus metrics
// registry.
DefaultClientMetrics = NewClientMetrics()
// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
UnaryClientInterceptor = DefaultClientMetrics.UnaryClientInterceptor()
// StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
StreamClientInterceptor = DefaultClientMetrics.StreamClientInterceptor()
)
func init() {
prom.MustRegister(DefaultClientMetrics.clientStartedCounter)
prom.MustRegister(DefaultClientMetrics.clientHandledCounter)
prom.MustRegister(DefaultClientMetrics.clientStreamMsgReceived)
prom.MustRegister(DefaultClientMetrics.clientStreamMsgSent)
}
// EnableClientHandlingTimeHistogram turns on recording of handling time of
// RPCs. Histogram metrics can be very expensive for Prometheus to retain and
// query. This function acts on the DefaultClientMetrics variable and the
// default Prometheus metrics registry.
func EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
DefaultClientMetrics.EnableClientHandlingTimeHistogram(opts...)
prom.Register(DefaultClientMetrics.clientHandledHistogram)
}

View File

@@ -1,170 +0,0 @@
package grpc_prometheus
import (
"io"
prom "github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// ClientMetrics represents a collection of metrics to be registered on a
// Prometheus metrics registry for a gRPC client.
type ClientMetrics struct {
clientStartedCounter *prom.CounterVec
clientHandledCounter *prom.CounterVec
clientStreamMsgReceived *prom.CounterVec
clientStreamMsgSent *prom.CounterVec
clientHandledHistogramEnabled bool
clientHandledHistogramOpts prom.HistogramOpts
clientHandledHistogram *prom.HistogramVec
}
// NewClientMetrics returns a ClientMetrics object. Use a new instance of
// ClientMetrics when not using the default Prometheus metrics registry, for
// example when wanting to control which metrics are added to a registry as
// opposed to automatically adding metrics via init functions.
func NewClientMetrics(counterOpts ...CounterOption) *ClientMetrics {
opts := counterOptions(counterOpts)
return &ClientMetrics{
clientStartedCounter: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_client_started_total",
Help: "Total number of RPCs started on the client.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
clientHandledCounter: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_client_handled_total",
Help: "Total number of RPCs completed by the client, regardless of success or failure.",
}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
clientStreamMsgReceived: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_client_msg_received_total",
Help: "Total number of RPC stream messages received by the client.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
clientStreamMsgSent: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_client_msg_sent_total",
Help: "Total number of gRPC stream messages sent by the client.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
clientHandledHistogramEnabled: false,
clientHandledHistogramOpts: prom.HistogramOpts{
Name: "grpc_client_handling_seconds",
Help: "Histogram of response latency (seconds) of the gRPC until it is finished by the application.",
Buckets: prom.DefBuckets,
},
clientHandledHistogram: nil,
}
}
// Describe sends the super-set of all possible descriptors of metrics
// collected by this Collector to the provided channel and returns once
// the last descriptor has been sent.
func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) {
m.clientStartedCounter.Describe(ch)
m.clientHandledCounter.Describe(ch)
m.clientStreamMsgReceived.Describe(ch)
m.clientStreamMsgSent.Describe(ch)
if m.clientHandledHistogramEnabled {
m.clientHandledHistogram.Describe(ch)
}
}
// Collect is called by the Prometheus registry when collecting
// metrics. The implementation sends each collected metric via the
// provided channel and returns once the last metric has been sent.
func (m *ClientMetrics) Collect(ch chan<- prom.Metric) {
m.clientStartedCounter.Collect(ch)
m.clientHandledCounter.Collect(ch)
m.clientStreamMsgReceived.Collect(ch)
m.clientStreamMsgSent.Collect(ch)
if m.clientHandledHistogramEnabled {
m.clientHandledHistogram.Collect(ch)
}
}
// EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query.
func (m *ClientMetrics) EnableClientHandlingTimeHistogram(opts ...HistogramOption) {
for _, o := range opts {
o(&m.clientHandledHistogramOpts)
}
if !m.clientHandledHistogramEnabled {
m.clientHandledHistogram = prom.NewHistogramVec(
m.clientHandledHistogramOpts,
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
}
m.clientHandledHistogramEnabled = true
}
// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
monitor := newClientReporter(m, Unary, method)
monitor.SentMessage()
err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
monitor.ReceivedMessage()
}
st, _ := status.FromError(err)
monitor.Handled(st.Code())
return err
}
}
// StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
monitor := newClientReporter(m, clientStreamType(desc), method)
clientStream, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
st, _ := status.FromError(err)
monitor.Handled(st.Code())
return nil, err
}
return &monitoredClientStream{clientStream, monitor}, nil
}
}
func clientStreamType(desc *grpc.StreamDesc) grpcType {
if desc.ClientStreams && !desc.ServerStreams {
return ClientStream
} else if !desc.ClientStreams && desc.ServerStreams {
return ServerStream
}
return BidiStream
}
// monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to increment counters.
type monitoredClientStream struct {
grpc.ClientStream
monitor *clientReporter
}
func (s *monitoredClientStream) SendMsg(m interface{}) error {
err := s.ClientStream.SendMsg(m)
if err == nil {
s.monitor.SentMessage()
}
return err
}
func (s *monitoredClientStream) RecvMsg(m interface{}) error {
err := s.ClientStream.RecvMsg(m)
if err == nil {
s.monitor.ReceivedMessage()
} else if err == io.EOF {
s.monitor.Handled(codes.OK)
} else {
st, _ := status.FromError(err)
s.monitor.Handled(st.Code())
}
return err
}

View File

@@ -1,46 +0,0 @@
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
package grpc_prometheus
import (
"time"
"google.golang.org/grpc/codes"
)
type clientReporter struct {
metrics *ClientMetrics
rpcType grpcType
serviceName string
methodName string
startTime time.Time
}
func newClientReporter(m *ClientMetrics, rpcType grpcType, fullMethod string) *clientReporter {
r := &clientReporter{
metrics: m,
rpcType: rpcType,
}
if r.metrics.clientHandledHistogramEnabled {
r.startTime = time.Now()
}
r.serviceName, r.methodName = splitMethodName(fullMethod)
r.metrics.clientStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
return r
}
func (r *clientReporter) ReceivedMessage() {
r.metrics.clientStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}
func (r *clientReporter) SentMessage() {
r.metrics.clientStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}
func (r *clientReporter) Handled(code codes.Code) {
r.metrics.clientHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc()
if r.metrics.clientHandledHistogramEnabled {
r.metrics.clientHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Observe(time.Since(r.startTime).Seconds())
}
}

View File

@@ -1,16 +0,0 @@
SHELL="/bin/bash"
GOFILES_NOVENDOR = $(shell go list ./... | grep -v /vendor/)
all: vet fmt test
fmt:
go fmt $(GOFILES_NOVENDOR)
vet:
go vet $(GOFILES_NOVENDOR)
test: vet
./scripts/test_all.sh
.PHONY: all vet test

View File

@@ -1,41 +0,0 @@
package grpc_prometheus
import (
prom "github.com/prometheus/client_golang/prometheus"
)
// A CounterOption lets you add options to Counter metrics using With* funcs.
type CounterOption func(*prom.CounterOpts)
type counterOptions []CounterOption
func (co counterOptions) apply(o prom.CounterOpts) prom.CounterOpts {
for _, f := range co {
f(&o)
}
return o
}
// WithConstLabels allows you to add ConstLabels to Counter metrics.
func WithConstLabels(labels prom.Labels) CounterOption {
return func(o *prom.CounterOpts) {
o.ConstLabels = labels
}
}
// A HistogramOption lets you add options to Histogram metrics using With*
// funcs.
type HistogramOption func(*prom.HistogramOpts)
// WithHistogramBuckets allows you to specify custom bucket ranges for histograms if EnableHandlingTimeHistogram is on.
func WithHistogramBuckets(buckets []float64) HistogramOption {
return func(o *prom.HistogramOpts) { o.Buckets = buckets }
}
// WithHistogramConstLabels allows you to add custom ConstLabels to
// histograms metrics.
func WithHistogramConstLabels(labels prom.Labels) HistogramOption {
return func(o *prom.HistogramOpts) {
o.ConstLabels = labels
}
}

View File

@@ -1,48 +0,0 @@
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
// gRPC Prometheus monitoring interceptors for server-side gRPC.
package grpc_prometheus
import (
prom "github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
)
var (
// DefaultServerMetrics is the default instance of ServerMetrics. It is
// intended to be used in conjunction the default Prometheus metrics
// registry.
DefaultServerMetrics = NewServerMetrics()
// UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
UnaryServerInterceptor = DefaultServerMetrics.UnaryServerInterceptor()
// StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
StreamServerInterceptor = DefaultServerMetrics.StreamServerInterceptor()
)
func init() {
prom.MustRegister(DefaultServerMetrics.serverStartedCounter)
prom.MustRegister(DefaultServerMetrics.serverHandledCounter)
prom.MustRegister(DefaultServerMetrics.serverStreamMsgReceived)
prom.MustRegister(DefaultServerMetrics.serverStreamMsgSent)
}
// Register takes a gRPC server and pre-initializes all counters to 0. This
// allows for easier monitoring in Prometheus (no missing metrics), and should
// be called *after* all services have been registered with the server. This
// function acts on the DefaultServerMetrics variable.
func Register(server *grpc.Server) {
DefaultServerMetrics.InitializeMetrics(server)
}
// EnableHandlingTimeHistogram turns on recording of handling time
// of RPCs. Histogram metrics can be very expensive for Prometheus
// to retain and query. This function acts on the DefaultServerMetrics
// variable and the default Prometheus metrics registry.
func EnableHandlingTimeHistogram(opts ...HistogramOption) {
DefaultServerMetrics.EnableHandlingTimeHistogram(opts...)
prom.Register(DefaultServerMetrics.serverHandledHistogram)
}

View File

@@ -1,185 +0,0 @@
package grpc_prometheus
import (
prom "github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
)
// ServerMetrics represents a collection of metrics to be registered on a
// Prometheus metrics registry for a gRPC server.
type ServerMetrics struct {
serverStartedCounter *prom.CounterVec
serverHandledCounter *prom.CounterVec
serverStreamMsgReceived *prom.CounterVec
serverStreamMsgSent *prom.CounterVec
serverHandledHistogramEnabled bool
serverHandledHistogramOpts prom.HistogramOpts
serverHandledHistogram *prom.HistogramVec
}
// NewServerMetrics returns a ServerMetrics object. Use a new instance of
// ServerMetrics when not using the default Prometheus metrics registry, for
// example when wanting to control which metrics are added to a registry as
// opposed to automatically adding metrics via init functions.
func NewServerMetrics(counterOpts ...CounterOption) *ServerMetrics {
opts := counterOptions(counterOpts)
return &ServerMetrics{
serverStartedCounter: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_server_started_total",
Help: "Total number of RPCs started on the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
serverHandledCounter: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_server_handled_total",
Help: "Total number of RPCs completed on the server, regardless of success or failure.",
}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
serverStreamMsgReceived: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_server_msg_received_total",
Help: "Total number of RPC stream messages received on the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
serverStreamMsgSent: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_server_msg_sent_total",
Help: "Total number of gRPC stream messages sent by the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
serverHandledHistogramEnabled: false,
serverHandledHistogramOpts: prom.HistogramOpts{
Name: "grpc_server_handling_seconds",
Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
Buckets: prom.DefBuckets,
},
serverHandledHistogram: nil,
}
}
// EnableHandlingTimeHistogram enables histograms being registered when
// registering the ServerMetrics on a Prometheus registry. Histograms can be
// expensive on Prometheus servers. It takes options to configure histogram
// options such as the defined buckets.
func (m *ServerMetrics) EnableHandlingTimeHistogram(opts ...HistogramOption) {
for _, o := range opts {
o(&m.serverHandledHistogramOpts)
}
if !m.serverHandledHistogramEnabled {
m.serverHandledHistogram = prom.NewHistogramVec(
m.serverHandledHistogramOpts,
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
}
m.serverHandledHistogramEnabled = true
}
// Describe sends the super-set of all possible descriptors of metrics
// collected by this Collector to the provided channel and returns once
// the last descriptor has been sent.
func (m *ServerMetrics) Describe(ch chan<- *prom.Desc) {
m.serverStartedCounter.Describe(ch)
m.serverHandledCounter.Describe(ch)
m.serverStreamMsgReceived.Describe(ch)
m.serverStreamMsgSent.Describe(ch)
if m.serverHandledHistogramEnabled {
m.serverHandledHistogram.Describe(ch)
}
}
// Collect is called by the Prometheus registry when collecting
// metrics. The implementation sends each collected metric via the
// provided channel and returns once the last metric has been sent.
func (m *ServerMetrics) Collect(ch chan<- prom.Metric) {
m.serverStartedCounter.Collect(ch)
m.serverHandledCounter.Collect(ch)
m.serverStreamMsgReceived.Collect(ch)
m.serverStreamMsgSent.Collect(ch)
if m.serverHandledHistogramEnabled {
m.serverHandledHistogram.Collect(ch)
}
}
// UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
monitor := newServerReporter(m, Unary, info.FullMethod)
monitor.ReceivedMessage()
resp, err := handler(ctx, req)
st, _ := status.FromError(err)
monitor.Handled(st.Code())
if err == nil {
monitor.SentMessage()
}
return resp, err
}
}
// StreamServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Streaming RPCs.
func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
monitor := newServerReporter(m, streamRPCType(info), info.FullMethod)
err := handler(srv, &monitoredServerStream{ss, monitor})
st, _ := status.FromError(err)
monitor.Handled(st.Code())
return err
}
}
// InitializeMetrics initializes all metrics, with their appropriate null
// value, for all gRPC methods registered on a gRPC server. This is useful, to
// ensure that all metrics exist when collecting and querying.
func (m *ServerMetrics) InitializeMetrics(server *grpc.Server) {
serviceInfo := server.GetServiceInfo()
for serviceName, info := range serviceInfo {
for _, mInfo := range info.Methods {
preRegisterMethod(m, serviceName, &mInfo)
}
}
}
func streamRPCType(info *grpc.StreamServerInfo) grpcType {
if info.IsClientStream && !info.IsServerStream {
return ClientStream
} else if !info.IsClientStream && info.IsServerStream {
return ServerStream
}
return BidiStream
}
// monitoredStream wraps grpc.ServerStream allowing each Sent/Recv of message to increment counters.
type monitoredServerStream struct {
grpc.ServerStream
monitor *serverReporter
}
func (s *monitoredServerStream) SendMsg(m interface{}) error {
err := s.ServerStream.SendMsg(m)
if err == nil {
s.monitor.SentMessage()
}
return err
}
func (s *monitoredServerStream) RecvMsg(m interface{}) error {
err := s.ServerStream.RecvMsg(m)
if err == nil {
s.monitor.ReceivedMessage()
}
return err
}
// preRegisterMethod is invoked on Register of a Server, allowing all gRPC services labels to be pre-populated.
func preRegisterMethod(metrics *ServerMetrics, serviceName string, mInfo *grpc.MethodInfo) {
methodName := mInfo.Name
methodType := string(typeFromMethodInfo(mInfo))
// These are just references (no increments), as just referencing will create the labels but not set values.
metrics.serverStartedCounter.GetMetricWithLabelValues(methodType, serviceName, methodName)
metrics.serverStreamMsgReceived.GetMetricWithLabelValues(methodType, serviceName, methodName)
metrics.serverStreamMsgSent.GetMetricWithLabelValues(methodType, serviceName, methodName)
if metrics.serverHandledHistogramEnabled {
metrics.serverHandledHistogram.GetMetricWithLabelValues(methodType, serviceName, methodName)
}
for _, code := range allCodes {
metrics.serverHandledCounter.GetMetricWithLabelValues(methodType, serviceName, methodName, code.String())
}
}

View File

@@ -1,46 +0,0 @@
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
package grpc_prometheus
import (
"time"
"google.golang.org/grpc/codes"
)
type serverReporter struct {
metrics *ServerMetrics
rpcType grpcType
serviceName string
methodName string
startTime time.Time
}
func newServerReporter(m *ServerMetrics, rpcType grpcType, fullMethod string) *serverReporter {
r := &serverReporter{
metrics: m,
rpcType: rpcType,
}
if r.metrics.serverHandledHistogramEnabled {
r.startTime = time.Now()
}
r.serviceName, r.methodName = splitMethodName(fullMethod)
r.metrics.serverStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
return r
}
func (r *serverReporter) ReceivedMessage() {
r.metrics.serverStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}
func (r *serverReporter) SentMessage() {
r.metrics.serverStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}
func (r *serverReporter) Handled(code codes.Code) {
r.metrics.serverHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc()
if r.metrics.serverHandledHistogramEnabled {
r.metrics.serverHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Observe(time.Since(r.startTime).Seconds())
}
}

View File

@@ -1,50 +0,0 @@
// Copyright 2016 Michal Witkowski. All Rights Reserved.
// See LICENSE for licensing terms.
package grpc_prometheus
import (
"strings"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)
type grpcType string
const (
Unary grpcType = "unary"
ClientStream grpcType = "client_stream"
ServerStream grpcType = "server_stream"
BidiStream grpcType = "bidi_stream"
)
var (
allCodes = []codes.Code{
codes.OK, codes.Canceled, codes.Unknown, codes.InvalidArgument, codes.DeadlineExceeded, codes.NotFound,
codes.AlreadyExists, codes.PermissionDenied, codes.Unauthenticated, codes.ResourceExhausted,
codes.FailedPrecondition, codes.Aborted, codes.OutOfRange, codes.Unimplemented, codes.Internal,
codes.Unavailable, codes.DataLoss,
}
)
func splitMethodName(fullMethodName string) (string, string) {
fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash
if i := strings.Index(fullMethodName, "/"); i >= 0 {
return fullMethodName[:i], fullMethodName[i+1:]
}
return "unknown", "unknown"
}
func typeFromMethodInfo(mInfo *grpc.MethodInfo) grpcType {
if !mInfo.IsClientStream && !mInfo.IsServerStream {
return Unary
}
if mInfo.IsClientStream && !mInfo.IsServerStream {
return ClientStream
}
if !mInfo.IsClientStream && mInfo.IsServerStream {
return ServerStream
}
return BidiStream
}