Replace go-grpc-prometheus with go-grpc-middleware/providers/prometheus

Fixes #9806

go-grpc-prometheus is deprecated. The new location it was moved to also introduced
an entirely new api, but afaict this matches what we have at the moment.

Signed-off-by: Danny Canter <danny@dcantah.dev>
This commit is contained in:
Danny Canter
2024-02-20 00:27:31 -08:00
parent b87d78f456
commit 6a21c96b55
45 changed files with 1224 additions and 2348 deletions

View File

@@ -0,0 +1,83 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.
// Go gRPC Middleware monitoring interceptors for client-side gRPC.
package interceptors
import (
"context"
"io"
"time"
"google.golang.org/grpc"
)
// UnaryClientInterceptor is a gRPC client-side interceptor that provides reporting for Unary RPCs.
func UnaryClientInterceptor(reportable ClientReportable) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
r := newReport(Unary, method)
reporter, newCtx := reportable.ClientReporter(ctx, CallMeta{ReqProtoOrNil: req, Typ: r.rpcType, Service: r.service, Method: r.method})
reporter.PostMsgSend(req, nil, time.Since(r.startTime))
err := invoker(newCtx, method, req, reply, cc, opts...)
reporter.PostMsgReceive(reply, err, time.Since(r.startTime))
reporter.PostCall(err, time.Since(r.startTime))
return err
}
}
// StreamClientInterceptor is a gRPC client-side interceptor that provides reporting for Stream RPCs.
func StreamClientInterceptor(reportable ClientReportable) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
r := newReport(clientStreamType(desc), method)
reporter, newCtx := reportable.ClientReporter(ctx, CallMeta{ReqProtoOrNil: nil, Typ: r.rpcType, Service: r.service, Method: r.method})
clientStream, err := streamer(newCtx, desc, cc, method, opts...)
if err != nil {
reporter.PostCall(err, time.Since(r.startTime))
return nil, err
}
return &monitoredClientStream{ClientStream: clientStream, startTime: r.startTime, reporter: reporter}, nil
}
}
func clientStreamType(desc *grpc.StreamDesc) GRPCType {
if desc.ClientStreams && !desc.ServerStreams {
return ClientStream
} else if !desc.ClientStreams && desc.ServerStreams {
return ServerStream
}
return BidiStream
}
// monitoredClientStream wraps grpc.ClientStream allowing each Sent/Recv of message to report.
type monitoredClientStream struct {
grpc.ClientStream
startTime time.Time
reporter Reporter
}
func (s *monitoredClientStream) SendMsg(m interface{}) error {
start := time.Now()
err := s.ClientStream.SendMsg(m)
s.reporter.PostMsgSend(m, err, time.Since(start))
return err
}
func (s *monitoredClientStream) RecvMsg(m interface{}) error {
start := time.Now()
err := s.ClientStream.RecvMsg(m)
s.reporter.PostMsgReceive(m, err, time.Since(start))
if err == nil {
return nil
}
var postErr error
if err != io.EOF {
postErr = err
}
s.reporter.PostCall(postErr, time.Since(s.startTime))
return err
}

View File

@@ -0,0 +1,12 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.
//
/*
interceptor is an internal package used by higher level middlewares. It allows injecting custom code in various
places of the gRPC lifecycle.
This particular package is intended for use by other middleware, metric, logging or otherwise.
This allows code to be shared between different implementations.
*/
package interceptors

View File

@@ -0,0 +1,116 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.
package interceptors
import (
"context"
"fmt"
"strings"
"time"
"google.golang.org/grpc/codes"
)
type GRPCType string
// Timer is a helper interface to time functions.
// Useful for interceptors to record the total
// time elapsed since completion of a call.
type Timer interface {
ObserveDuration() time.Duration
}
// zeroTimer.
type zeroTimer struct {
}
func (zeroTimer) ObserveDuration() time.Duration {
return 0
}
var EmptyTimer = &zeroTimer{}
const (
Unary GRPCType = "unary"
ClientStream GRPCType = "client_stream"
ServerStream GRPCType = "server_stream"
BidiStream GRPCType = "bidi_stream"
)
var (
AllCodes = []codes.Code{
codes.OK, codes.Canceled, codes.Unknown, codes.InvalidArgument, codes.DeadlineExceeded, codes.NotFound,
codes.AlreadyExists, codes.PermissionDenied, codes.Unauthenticated, codes.ResourceExhausted,
codes.FailedPrecondition, codes.Aborted, codes.OutOfRange, codes.Unimplemented, codes.Internal,
codes.Unavailable, codes.DataLoss,
}
)
func SplitMethodName(fullMethod string) (string, string) {
fullMethod = strings.TrimPrefix(fullMethod, "/") // remove leading slash
if i := strings.Index(fullMethod, "/"); i >= 0 {
return fullMethod[:i], fullMethod[i+1:]
}
return "unknown", "unknown"
}
type CallMeta struct {
ReqProtoOrNil interface{}
Typ GRPCType
Service string
Method string
}
func (c CallMeta) FullMethod() string {
return fmt.Sprintf("/%s/%s", c.Service, c.Method)
}
type ClientReportable interface {
ClientReporter(context.Context, CallMeta) (Reporter, context.Context)
}
type ServerReportable interface {
ServerReporter(context.Context, CallMeta) (Reporter, context.Context)
}
// CommonReportableFunc helper allows an easy way to implement reporter with common client and server logic.
type CommonReportableFunc func(ctx context.Context, c CallMeta, isClient bool) (Reporter, context.Context)
func (f CommonReportableFunc) ClientReporter(ctx context.Context, c CallMeta) (Reporter, context.Context) {
return f(ctx, c, true)
}
func (f CommonReportableFunc) ServerReporter(ctx context.Context, c CallMeta) (Reporter, context.Context) {
return f(ctx, c, false)
}
type Reporter interface {
PostCall(err error, rpcDuration time.Duration)
PostMsgSend(reqProto interface{}, err error, sendDuration time.Duration)
PostMsgReceive(replyProto interface{}, err error, recvDuration time.Duration)
}
var _ Reporter = NoopReporter{}
type NoopReporter struct{}
func (NoopReporter) PostCall(error, time.Duration) {}
func (NoopReporter) PostMsgSend(interface{}, error, time.Duration) {}
func (NoopReporter) PostMsgReceive(interface{}, error, time.Duration) {}
type report struct {
rpcType GRPCType
service string
method string
startTime time.Time
}
func newReport(typ GRPCType, fullMethod string) report {
r := report{
startTime: time.Now(),
rpcType: typ,
}
r.service, r.method = SplitMethodName(fullMethod)
return r
}

View File

@@ -0,0 +1,74 @@
// Copyright (c) The go-grpc-middleware Authors.
// Licensed under the Apache License 2.0.
// Go gRPC Middleware monitoring interceptors for server-side gRPC.
package interceptors
import (
"context"
"time"
"google.golang.org/grpc"
)
// UnaryServerInterceptor is a gRPC server-side interceptor that provides reporting for Unary RPCs.
func UnaryServerInterceptor(reportable ServerReportable) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
r := newReport(Unary, info.FullMethod)
reporter, newCtx := reportable.ServerReporter(ctx, CallMeta{ReqProtoOrNil: req, Typ: r.rpcType, Service: r.service, Method: r.method})
reporter.PostMsgReceive(req, nil, time.Since(r.startTime))
resp, err := handler(newCtx, req)
reporter.PostMsgSend(resp, err, time.Since(r.startTime))
reporter.PostCall(err, time.Since(r.startTime))
return resp, err
}
}
// StreamServerInterceptor is a gRPC server-side interceptor that provides reporting for Streaming RPCs.
func StreamServerInterceptor(reportable ServerReportable) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
r := newReport(ServerStream, info.FullMethod)
reporter, newCtx := reportable.ServerReporter(ss.Context(), CallMeta{ReqProtoOrNil: nil, Typ: StreamRPCType(info), Service: r.service, Method: r.method})
err := handler(srv, &monitoredServerStream{ServerStream: ss, newCtx: newCtx, reporter: reporter})
reporter.PostCall(err, time.Since(r.startTime))
return err
}
}
func StreamRPCType(info *grpc.StreamServerInfo) GRPCType {
if info.IsClientStream && !info.IsServerStream {
return ClientStream
} else if !info.IsClientStream && info.IsServerStream {
return ServerStream
}
return BidiStream
}
// monitoredStream wraps grpc.ServerStream allowing each Sent/Recv of message to report.
type monitoredServerStream struct {
grpc.ServerStream
newCtx context.Context
reporter Reporter
}
func (s *monitoredServerStream) Context() context.Context {
return s.newCtx
}
func (s *monitoredServerStream) SendMsg(m interface{}) error {
start := time.Now()
err := s.ServerStream.SendMsg(m)
s.reporter.PostMsgSend(m, err, time.Since(start))
return err
}
func (s *monitoredServerStream) RecvMsg(m interface{}) error {
start := time.Now()
err := s.ServerStream.RecvMsg(m)
s.reporter.PostMsgReceive(m, err, time.Since(start))
return err
}